SeqAn3  3.1.0-rc.1
The Modern C++ library for sequence analysis.
bgzf_istream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: Rene Rahn, rene.rahn AT fu-berlin.de, 2019 (adaption to SeqAn library version 3)
20 
21 #pragma once
22 
23 #include <cstring>
24 #include <condition_variable>
25 #include <mutex>
26 
30 
31 #if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
32 # error "This file cannot be used when building without GZip-support."
33 #endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
34 
35 #if defined(SEQAN3_HAS_ZLIB)
36 namespace seqan3::contrib
37 {
38 
39 // ===========================================================================
40 // Classes
41 // ===========================================================================
42 
43 // --------------------------------------------------------------------------
44 // Class basic_bgzf_istreambuf
45 // --------------------------------------------------------------------------
46 
47 template<
48  typename Elem,
49  typename Tr = std::char_traits<Elem>,
50  typename ElemA = std::allocator<Elem>,
51  typename ByteT = char,
52  typename ByteAT = std::allocator<ByteT>
53 >
54 class basic_bgzf_istreambuf : public std::basic_streambuf<Elem, Tr>
55 {
56 public:
57 
58  typedef Tr traits_type;
59  typedef typename traits_type::char_type char_type;
60  typedef typename traits_type::int_type int_type;
61  typedef typename traits_type::off_type off_type;
62  typedef typename traits_type::pos_type pos_type;
63 
64 private:
65 
66  typedef std::basic_istream<Elem, Tr>& istream_reference;
67 
68  typedef ElemA char_allocator_type;
69  typedef ByteT byte_type;
70  typedef ByteAT byte_allocator_type;
71  typedef byte_type* byte_buffer_type;
72 
73  typedef std::vector<char_type, char_allocator_type> TBuffer;
74  typedef fixed_buffer_queue<int32_t> TJobQueue;
75 
76  static const size_t MAX_PUTBACK = 4;
77 
78  // Allows serialized access to the underlying buffer.
79  struct Serializer
80  {
81  istream_reference istream;
82  std::mutex lock;
83  io_error *error;
84  off_type fileOfs;
85 
86  Serializer(istream_reference istream) :
87  istream(istream),
88  error(NULL),
89  fileOfs(0u)
90  {}
91 
92  ~Serializer()
93  {
94  delete error;
95  }
96  };
97 
98  Serializer serializer;
99 
100  struct DecompressionJob
101  {
102  typedef std::vector<byte_type, byte_allocator_type> TInputBuffer;
103 
104  TInputBuffer inputBuffer;
105  TBuffer buffer;
106  off_type fileOfs;
107  int32_t size;
108  uint32_t compressedSize;
109 
110  std::mutex cs;
111  std::condition_variable readyEvent;
112  bool ready;
113  bool bgzfEofMarker;
114 
115  DecompressionJob() :
116  inputBuffer(DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE, 0),
117  buffer(MAX_PUTBACK + DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE / sizeof(char_type), 0),
118  fileOfs(),
119  size(0),
120  cs(),
121  readyEvent(),
122  ready(true),
123  bgzfEofMarker(false)
124  {}
125 
126  DecompressionJob(DecompressionJob const &other) :
127  inputBuffer(other.inputBuffer),
128  buffer(other.buffer),
129  fileOfs(other.fileOfs),
130  size(other.size),
131  cs(),
132  readyEvent(),
133  ready(other.ready),
134  bgzfEofMarker(other.bgzfEofMarker)
135  {}
136  };
137 
138  // string of recyclable jobs
139  size_t numThreads;
140  size_t numJobs;
141  std::vector<DecompressionJob> jobs;
142  TJobQueue runningQueue;
143  TJobQueue todoQueue;
144  detail::reader_writer_manager runningQueueManager; // synchronises reader, writer with running queue.
145  detail::reader_writer_manager todoQueueManager; // synchronises reader, writer with todo queue.
146  int currentJobId;
147 
148  struct DecompressionThread
149  {
150  basic_bgzf_istreambuf *streamBuf;
151  CompressionContext<detail::bgzf_compression> compressionCtx;
152 
153  void operator()()
154  {
155  // Active reader to consume from todo queue.
156  auto reader_raii = streamBuf->todoQueueManager.register_reader();
157  // Active writer to produce work for the decompression queue.
158  auto writer_raii = streamBuf->runningQueueManager.register_writer();
159 
160  // wait for a new job to become available
161  while (true)
162  {
163 
164  int jobId = -1;
165  if (streamBuf->todoQueue.wait_pop(jobId) == queue_op_status::closed)
166  return;
167 
168  DecompressionJob &job = streamBuf->jobs[jobId];
169  size_t tailLen = 0;
170 
171  // typically the idle queue contains only ready jobs
172  // however, if seek() fast forwards running jobs into the todoQueue
173  // the caller defers the task of waiting to the decompression threads
174  if (!job.ready)
175  {
176  std::unique_lock<std::mutex> lock(job.cs);
177  job.readyEvent.wait(lock, [&job]{return job.ready;});
178  assert(job.ready == true);
179  }
180 
181  {
182  std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
183 
184  job.bgzfEofMarker = false;
185  if (streamBuf->serializer.error != NULL)
186  return;
187 
188  // remember start offset (for tellg later)
189  job.fileOfs = streamBuf->serializer.fileOfs;
190  job.size = -1;
191  job.compressedSize = 0;
192 
193  // only load if not at EOF
194  if (job.fileOfs != -1)
195  {
196  // read header
197  streamBuf->serializer.istream.read(
198  (char_type*)&job.inputBuffer[0],
199  DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH);
200 
201  if (!streamBuf->serializer.istream.good())
202  {
203  streamBuf->serializer.fileOfs = -1;
204  if (streamBuf->serializer.istream.eof())
205  goto eofSkip;
206  streamBuf->serializer.error = new io_error("Stream read error.");
207  return;
208  }
209 
210  // check header
211  if (!detail::bgzf_compression::validate_header(std::span{job.inputBuffer}))
212  {
213  streamBuf->serializer.fileOfs = -1;
214  streamBuf->serializer.error = new io_error("Invalid BGZF block header.");
215  return;
216  }
217 
218  // extract length of compressed data
219  tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) +
220  1u - DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
221 
222  // read compressed data and tail
223  streamBuf->serializer.istream.read(
224  (char_type*)&job.inputBuffer[0] + DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH,
225  tailLen);
226 
227  // Check if end-of-file marker is set
228  if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
229  reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
230  28) == 0)
231  {
232  job.bgzfEofMarker = true;
233  }
234 
235  if (!streamBuf->serializer.istream.good())
236  {
237  streamBuf->serializer.fileOfs = -1;
238  if (streamBuf->serializer.istream.eof())
239  goto eofSkip;
240  streamBuf->serializer.error = new io_error("Stream read error.");
241  return;
242  }
243 
244  job.compressedSize = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH + tailLen;
245  streamBuf->serializer.fileOfs += job.compressedSize;
246  job.ready = false;
247 
248  eofSkip:
249  streamBuf->serializer.istream.clear(
250  streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
251  }
252 
253  if (streamBuf->runningQueue.try_push(jobId) != queue_op_status::success)
254  {
255  // signal that job is ready
256  {
257  std::unique_lock<std::mutex> lock(job.cs);
258  job.ready = true;
259  }
260  job.readyEvent.notify_all();
261  return; // Terminate this thread.
262  }
263  }
264 
265  if (!job.ready)
266  {
267  // decompress block
268  job.size = _decompressBlock(
269  &job.buffer[0] + MAX_PUTBACK, job.buffer.capacity(),
270  &job.inputBuffer[0], job.compressedSize, compressionCtx);
271 
272  // signal that job is ready
273  {
274  std::unique_lock<std::mutex> lock(job.cs);
275  job.ready = true;
276  }
277  job.readyEvent.notify_all();
278  }
279  }
280  }
281  };
282 
283  std::vector<std::thread> pool; // pool of worker threads
284  TBuffer putbackBuffer;
285 
286 public:
287 
288  basic_bgzf_istreambuf(istream_reference istream_,
289  size_t numThreads = bgzf_thread_count,
290  size_t jobsPerThread = 8) :
291  serializer(istream_),
292  numThreads(numThreads),
293  numJobs(numThreads * jobsPerThread),
294  runningQueue(numJobs),
295  todoQueue(numJobs),
296  runningQueueManager(detail::reader_count{1}, detail::writer_count{numThreads}, runningQueue),
297  todoQueueManager(detail::reader_count{numThreads}, detail::writer_count{1}, todoQueue),
298  putbackBuffer(MAX_PUTBACK)
299  {
300  jobs.resize(numJobs);
301  currentJobId = -1;
302 
303  // Prepare todo queue.
304  for (size_t i = 0; i < numJobs; ++i)
305  {
306  [[maybe_unused]] queue_op_status status = todoQueue.try_push(i);
307  assert(status == queue_op_status::success);
308  }
309 
310  // Start off the threads.
311  for (size_t i = 0; i < numThreads; ++i)
312  pool.emplace_back(DecompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
313  }
314 
315  ~basic_bgzf_istreambuf()
316  {
317  // Signal todoQueue that no more work is coming and close todo queue.
318  todoQueueManager.writer_arrive();
319 
320  // Wait for threads to finish there active work.
321  for (auto & t : pool)
322  {
323  if (t.joinable())
324  t.join();
325  }
326 
327  // Signal running queue that reader is done.
328  runningQueueManager.reader_arrive();
329  }
330 
331  int_type underflow()
332  {
333  // no need to use the next buffer?
334  if (this->gptr() && this->gptr() < this->egptr())
335  return traits_type::to_int_type(*this->gptr());
336 
337  size_t putback = this->gptr() - this->eback();
338  if (putback > MAX_PUTBACK)
339  putback = MAX_PUTBACK;
340 
341  // save at most MAX_PUTBACK characters from previous page to putback buffer
342  if (putback != 0)
343  std::copy(
344  this->gptr() - putback,
345  this->gptr(),
346  &putbackBuffer[0]);
347 
348  if (currentJobId >= 0)
349  todoQueue.wait_push(currentJobId);
350  // appendValue(todoQueue, currentJobId);
351 
352  while (true)
353  {
354  if (runningQueue.wait_pop(currentJobId) == queue_op_status::closed)
355  {
356  currentJobId = -1;
357  assert(serializer.error != NULL);
358  if (serializer.error != NULL)
359  throw *serializer.error;
360  return EOF;
361  }
362 
363  DecompressionJob &job = jobs[currentJobId];
364 
365  // restore putback buffer
366  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
367  if (putback != 0)
368  std::copy(
369  &putbackBuffer[0],
370  &putbackBuffer[0] + putback,
371  &job.buffer[0] + (MAX_PUTBACK - putback));
372 
373  // wait for the end of decompression
374  {
375  std::unique_lock<std::mutex> lock(job.cs);
376  job.readyEvent.wait(lock, [&job]{return job.ready;});
377  }
378 
379  size_t size = (job.size != -1)? job.size : 0;
380 
381  // reset buffer pointers
382  this->setg(
383  &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area
384  &job.buffer[0] + MAX_PUTBACK, // read position
385  &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer
386 
387  // The end of the bgzf file is reached, either if there was an error, or if the
388  // end-of-file marker was reached, while the uncompressed block had zero size.
389  if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
390  return EOF;
391  else if (job.size > 0)
392  return traits_type::to_int_type(*this->gptr()); // return next character
393 
394  throw io_error("BGZF: Invalid end condition in decompression. "
395  "Most likely due to an empty bgzf block without end-of-file marker.");
396  }
397  }
398 
399  pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
400  {
401  if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
402  {
403  if (dir == std::ios_base::cur && ofs >= 0)
404  {
405  // forward delta seek
406  while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
407  {
408  ofs -= this->egptr() - this->gptr();
409  if (this->underflow() == static_cast<int_type>(EOF))
410  break;
411  }
412 
413  if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
414  {
415  DecompressionJob &job = jobs[currentJobId];
416 
417  // reset buffer pointers
418  this->setg(
419  this->eback(), // beginning of putback area
420  this->gptr() + ofs, // read position
421  this->egptr()); // end of buffer
422 
423  if (this->gptr() != this->egptr())
424  return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
425  else
426  return pos_type((job.fileOfs + job.compressedSize) << 16);
427  }
428 
429  }
430  else if (dir == std::ios_base::beg)
431  {
432  // random seek
433  std::streampos destFileOfs = ofs >> 16;
434 
435  // are we in the same block?
436  if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
437  {
438  DecompressionJob &job = jobs[currentJobId];
439 
440  // reset buffer pointers
441  this->setg(
442  this->eback(), // beginning of putback area
443  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
444  this->egptr()); // end of buffer
445  return ofs;
446  }
447 
448  // ok, different block
449  {
450  std::lock_guard<std::mutex> scopedLock(serializer.lock);
451 
452  // remove all running jobs and put them in the idle queue unless we
453  // find our seek target
454 
455  if (currentJobId >= 0)
456  todoQueue.wait_push(currentJobId);
457 
458  // Note that if we are here the current job does not represent the sought block.
459  // Hence if the running queue is empty we need to explicitly unset the jobId,
460  // otherwise we would not update the serializers istream pointer to the correct position.
461  if (runningQueue.is_empty())
462  currentJobId = -1;
463 
464  // empty is thread-safe in serializer.lock
465  while (!runningQueue.is_empty())
466  {
467  runningQueue.wait_pop(currentJobId);
468 
469  if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
470  break;
471 
472  // push back useless job
473  todoQueue.wait_push(currentJobId);
474  currentJobId = -1;
475  }
476 
477  if (currentJobId == -1)
478  {
479  assert(runningQueue.is_empty());
480  serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
481  if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
482  serializer.fileOfs = destFileOfs;
483  else
484  currentJobId = -2; // temporarily signals a seek error
485  }
486  }
487 
488  // if our block wasn't in the running queue yet, it should now
489  // be the first that falls out after modifying serializer.fileOfs
490  if (currentJobId == -1)
491  runningQueue.wait_pop(currentJobId);
492  else if (currentJobId == -2)
493  currentJobId = -1;
494 
495  if (currentJobId >= 0)
496  {
497  // wait for the end of decompression
498  DecompressionJob &job = jobs[currentJobId];
499 
500  {
501  std::unique_lock<std::mutex> lock(job.cs);
502  job.readyEvent.wait(lock, [&job]{return job.ready;});
503  }
504 
505  assert(job.fileOfs == (off_type)destFileOfs);
506 
507  // reset buffer pointers
508  this->setg(
509  &job.buffer[0] + MAX_PUTBACK, // no putback area
510  &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position
511  &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer
512  return ofs;
513  }
514  }
515  }
516  return pos_type(off_type(-1));
517  }
518 
519  pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
520  {
521  return seekoff(off_type(pos), std::ios_base::beg, openMode);
522  }
523 
524  // returns the compressed input istream
525  istream_reference get_istream() { return serializer.istream; };
526 };
527 
528 // --------------------------------------------------------------------------
529 // Class basic_bgzf_istreambase
530 // --------------------------------------------------------------------------
531 
532 template<
533  typename Elem,
534  typename Tr = std::char_traits<Elem>,
535  typename ElemA = std::allocator<Elem>,
536  typename ByteT = char,
537  typename ByteAT = std::allocator<ByteT>
538 >
539 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
540 {
541 public:
542  typedef std::basic_istream<Elem, Tr>& istream_reference;
543  typedef basic_bgzf_istreambuf<Elem, Tr, ElemA, ByteT, ByteAT> decompression_bgzf_streambuf_type;
544 
545  basic_bgzf_istreambase(istream_reference istream_)
546  : m_buf(istream_)
547  {
548  this->init(&m_buf);
549  };
550 
551  // returns the underlying decompression bgzf istream object
552  decompression_bgzf_streambuf_type* rdbuf() { return &m_buf; };
553 
554  // returns the bgzf error state
555  int get_zerr() const { return m_buf.get_zerr(); };
556  // returns the uncompressed data crc
557  long get_crc() const { return m_buf.get_crc(); };
558  // returns the uncompressed data size
559  long get_out_size() const { return m_buf.get_out_size(); };
560  // returns the compressed data size
561  long get_in_size() const { return m_buf.get_in_size(); };
562 
563 private:
564  decompression_bgzf_streambuf_type m_buf;
565 };
566 
567 // --------------------------------------------------------------------------
568 // Class basic_bgzf_istream
569 // --------------------------------------------------------------------------
570 
571 template<
572  typename Elem,
573  typename Tr = std::char_traits<Elem>,
574  typename ElemA = std::allocator<Elem>,
575  typename ByteT = char,
576  typename ByteAT = std::allocator<ByteT>
577 >
578 class basic_bgzf_istream :
579  public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
580  public std::basic_istream<Elem,Tr>
581 {
582 public:
583  typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
584  typedef std::basic_istream<Elem,Tr> istream_type;
585  typedef istream_type & istream_reference;
586  typedef char byte_type;
587 
588  basic_bgzf_istream(istream_reference istream_) :
589  bgzf_istreambase_type(istream_),
590  istream_type(bgzf_istreambase_type::rdbuf()),
591  m_is_gzip(false),
592  m_gbgzf_data_size(0)
593  {};
594 
595  // returns true if it is a gzip file
596  bool is_gzip() const { return m_is_gzip; };
597  // return data size check
598  bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; };
599 
600  // return the data size in the file
601  long get_gbgzf_data_size() const { return m_gbgzf_data_size; };
602 
603 protected:
604  static void read_long(istream_reference in_, unsigned long& x_);
605 
606  int check_header();
607  bool m_is_gzip;
608  unsigned long m_gbgzf_data_size;
609 
610 #ifdef _WIN32
611 private:
612  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
613  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
614 #endif
615 };
616 
617 // ===========================================================================
618 // Typedefs
619 // ===========================================================================
620 
621 // A typedef for basic_bgzf_istream<char>
622 typedef basic_bgzf_istream<char> bgzf_istream;
623 // A typedef for basic_bgzf_istream<wchart>
624 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
625 
626 } // namespace seqan3::conrib
627 
628 #endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
Provides seqan3::buffer_queue.
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
Provides seqan3::detail::reader_writer_manager.