SeqAn3  3.1.0-rc.1
The Modern C++ library for sequence analysis.
bgzf_ostream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: RenĂ© Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
20 
21 #pragma once
22 
26 
27 #if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
28 # error "This file cannot be used when building without GZip-support."
29 #endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
30 
31 #if defined(SEQAN3_HAS_ZLIB)
32 
33 namespace seqan3::contrib
34 {
35 
36 // --------------------------------------------------------------------------
37 // Class basic_bgzf_ostreambuf
38 // --------------------------------------------------------------------------
39 
40 template<
41  typename Elem,
42  typename Tr = std::char_traits<Elem>,
43  typename ElemA = std::allocator<Elem>,
44  typename ByteT = char,
45  typename ByteAT = std::allocator<ByteT>
46 >
47 class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
48 {
49 private:
50 
51  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
52  typedef ElemA char_allocator_type;
53  typedef ByteT byte_type;
54  typedef ByteAT byte_allocator_type;
55  typedef byte_type* byte_buffer_type;
56  typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
57 
58 public:
59 
60  typedef Tr traits_type;
61  typedef typename traits_type::char_type char_type;
62  typedef typename traits_type::int_type int_type;
63  typedef typename traits_type::pos_type pos_type;
64  typedef typename traits_type::off_type off_type;
65 
66  struct ScopedLock
67  {
68  ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
69  {}
70 
71  ~ScopedLock()
72  {
73  completion();
74  }
75 
76  std::function<void()> completion;
77  };
78 
79  // One compressed block.
80  struct OutputBuffer
81  {
82  char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
83  size_t size;
84  };
85 
86  // Writes the output to the underlying stream when invoked.
87  struct BufferWriter
88  {
89  ostream_reference ostream;
90 
91  BufferWriter(ostream_reference ostream) :
92  ostream(ostream)
93  {}
94 
95  bool operator() (OutputBuffer const & outputBuffer)
96  {
97  ostream.write(outputBuffer.buffer, outputBuffer.size);
98  return ostream.good();
99  }
100  };
101 
102  struct CompressionJob
103  {
104  typedef std::vector<char_type, char_allocator_type> TBuffer;
105 
106  TBuffer buffer;
107  size_t size;
108  OutputBuffer *outputBuffer;
109 
110  CompressionJob() :
111  buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
112  size(0),
113  outputBuffer(NULL)
114  {}
115  };
116 
117  // string of recycable jobs
118  size_t numThreads;
119  size_t numJobs;
120  std::vector<CompressionJob> jobs;
121  job_queue_type jobQueue;
122  job_queue_type idleQueue;
123  Serializer<OutputBuffer, BufferWriter> serializer;
124  size_t currentJobId;
125  bool currentJobAvail;
126 
127  struct CompressionThread
128  {
129  basic_bgzf_ostreambuf *streamBuf;
130  CompressionContext<detail::bgzf_compression> compressionCtx;
131 
132  void operator()()
133  {
134  ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
135  // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
136  ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
137  // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
138 
139  // wait for a new job to become available
140  bool success = true;
141  while (success)
142  {
143  size_t jobId = -1;
144  if (!popFront(jobId, streamBuf->jobQueue))
145  return;
146 
147  CompressionJob &job = streamBuf->jobs[jobId];
148 
149  // compress block with zlib
150  job.outputBuffer->size = _compressBlock(
151  job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
152  &job.buffer[0], job.size, compressionCtx);
153 
154  success = releaseValue(streamBuf->serializer, job.outputBuffer);
155  appendValue(streamBuf->idleQueue, jobId);
156  }
157  }
158  };
159 
160  // array of worker threads
161  // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
162  std::vector<std::thread> pool;
163 
164  basic_bgzf_ostreambuf(ostream_reference ostream_,
165  size_t numThreads = bgzf_thread_count,
166  size_t jobsPerThread = 8) :
167  numThreads(numThreads),
168  numJobs(numThreads * jobsPerThread),
169  jobQueue(numJobs),
170  idleQueue(numJobs),
171  serializer(ostream_, numThreads * jobsPerThread)
172  {
173  jobs.resize(numJobs);
174  currentJobId = 0;
175 
176  lockWriting(jobQueue);
177  lockReading(idleQueue);
178  setReaderWriterCount(jobQueue, numThreads, 1);
179  setReaderWriterCount(idleQueue, 1, numThreads);
180 
181  // Prepare idle queue.
182  for (size_t i = 0; i < numJobs; ++i)
183  {
184  [[maybe_unused]] bool success = appendValue(idleQueue, i);
185  assert(success);
186  }
187 
188  // Start off threads.
189  for (size_t i = 0; i < numThreads; ++i)
190  pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
191 
192  currentJobAvail = popFront(currentJobId, idleQueue);
193  assert(currentJobAvail);
194 
195  CompressionJob &job = jobs[currentJobId];
196  job.outputBuffer = aquireValue(serializer);
197  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
198  }
199 
200  ~basic_bgzf_ostreambuf()
201  {
202  // the buffer is now (after addFooter()) and flush will append the empty EOF marker
203  flush(true);
204 
205  unlockWriting(jobQueue);
206 
207  // Wait for threads to finish there active work.
208  for (auto & t : pool)
209  {
210  if (t.joinable())
211  t.join();
212  }
213 
214  unlockReading(idleQueue);
215  }
216 
217  bool compressBuffer(size_t size)
218  {
219  // submit current job
220  if (currentJobAvail)
221  {
222  jobs[currentJobId].size = size;
223  appendValue(jobQueue, currentJobId);
224  }
225 
226  // recycle existing idle job
227  if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
228  return false;
229 
230  jobs[currentJobId].outputBuffer = aquireValue(serializer);
231 
232  return serializer;
233  }
234 
235  int_type overflow(int_type c)
236  {
237  int w = static_cast<int>(this->pptr() - this->pbase());
238  if (c != static_cast<int_type>(EOF))
239  {
240  *this->pptr() = c;
241  ++w;
242  }
243  if (compressBuffer(w))
244  {
245  CompressionJob &job = jobs[currentJobId];
246  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
247  return c;
248  }
249  else
250  {
251  return EOF;
252  }
253  }
254 
255  std::streamsize flush(bool flushEmptyBuffer = false)
256  {
257  int w = static_cast<int>(this->pptr() - this->pbase());
258  if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
259  {
260  CompressionJob &job = jobs[currentJobId];
261  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
262  }
263  else
264  {
265  w = 0;
266  }
267 
268  // wait for running compressor threads
269  waitForMinSize(idleQueue, numJobs - 1);
270 
271  serializer.worker.ostream.flush();
272  return w;
273  }
274 
275  int sync()
276  {
277  if (this->pptr() != this->pbase())
278  {
279  int_type c = overflow(EOF);
280  if (c == static_cast<int_type>(EOF))
281  return -1;
282  }
283  return 0;
284  }
285 
286  void addFooter()
287  {
288  // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
289  if (this->pptr() != this->pbase())
290  overflow(EOF);
291  }
292 
293  // returns a reference to the output stream
294  ostream_reference get_ostream() const { return serializer.worker.ostream; };
295 };
296 
297 // --------------------------------------------------------------------------
298 // Class basic_bgzf_ostreambase
299 // --------------------------------------------------------------------------
300 
301 template<
302  typename Elem,
303  typename Tr = std::char_traits<Elem>,
304  typename ElemA = std::allocator<Elem>,
305  typename ByteT = char,
306  typename ByteAT = std::allocator<ByteT>
307 >
308 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
309 {
310 public:
311  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
312  typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
313 
314  basic_bgzf_ostreambase(ostream_reference ostream_)
315  : m_buf(ostream_)
316  {
317  this->init(&m_buf );
318  };
319 
320  // returns the underlying zip ostream object
321  bgzf_streambuf_type* rdbuf() { return &m_buf; };
322  // returns the bgzf error state
323  int get_zerr() const { return m_buf.get_err(); };
324  // returns the uncompressed data crc
325  long get_crc() const { return m_buf.get_crc(); };
326  // returns the compressed data size
327  long get_out_size() const { return m_buf.get_out_size(); };
328  // returns the uncompressed data size
329  long get_in_size() const { return m_buf.get_in_size(); };
330 
331 private:
332  bgzf_streambuf_type m_buf;
333 };
334 
335 // --------------------------------------------------------------------------
336 // Class basic_bgzf_ostream
337 // --------------------------------------------------------------------------
338 
339 template<
340  typename Elem,
341  typename Tr = std::char_traits<Elem>,
342  typename ElemA = std::allocator<Elem>,
343  typename ByteT = char,
344  typename ByteAT = std::allocator<ByteT>
345 >
346 class basic_bgzf_ostream :
347  public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
348  public std::basic_ostream<Elem,Tr>
349 {
350 public:
351  typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
352  typedef std::basic_ostream<Elem,Tr> ostream_type;
353  typedef ostream_type& ostream_reference;
354 
355  basic_bgzf_ostream(ostream_reference ostream_) :
356  bgzf_ostreambase_type(ostream_),
357  ostream_type(bgzf_ostreambase_type::rdbuf())
358  {}
359 
360  // flush inner buffer and zipper buffer
361  basic_bgzf_ostream<Elem,Tr>& flush()
362  {
363  ostream_type::flush(); this->rdbuf()->flush(); return *this;
364  };
365 
366  ~basic_bgzf_ostream()
367  {
368  this->rdbuf()->addFooter();
369  }
370 
371 private:
372  static void put_long(ostream_reference out_, unsigned long x_);
373 #ifdef _WIN32
374  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
375  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
376 #endif
377 };
378 
379 // ===========================================================================
380 // Typedefs
381 // ===========================================================================
382 
383 // A typedef for basic_bgzf_ostream<char>
384 typedef basic_bgzf_ostream<char> bgzf_ostream;
385 // A typedef for basic_bgzf_ostream<wchar_t>
386 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
387 
388 } // namespace seqan3::contrib
389 
390 #endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
constexpr size_t size
The size of a type pack.
Definition: traits.hpp:151
SeqAn specific customisations in the standard namespace.
Definition: affine_cell_proxy.hpp:438
Provides helper structs from SeqAn2 for the bgzf_ostream.
Provides seqan suspendable queue.