Stxxl  1.2.1
buf_writer.h
1 /***************************************************************************
2  * include/stxxl/bits/mng/buf_writer.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef STXXL_BUFFERED_WRITER_HEADER
14 #define STXXL_BUFFERED_WRITER_HEADER
15 
16 #include <vector>
17 
18 #include <stxxl/bits/mng/mng.h>
19 
20 
21 __STXXL_BEGIN_NAMESPACE
22 
29 
30 
34 template <typename block_type>
36 {
37  buffered_writer() { }
38 
39 protected:
40  typedef typename block_type::bid_type bid_type;
41 
42  const unsigned_type nwriteblocks;
43  block_type * write_buffers;
44  bid_type * write_bids;
45  request_ptr * write_reqs;
46  const unsigned_type writebatchsize;
47 
48  std::vector<int_type> free_write_blocks; // contains free write blocks
49  std::vector<int_type> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_
50  // an not in busy then block is not yet filled
51 
52  struct batch_entry
53  {
54  stxxl::int64 offset;
55  int_type ibuffer;
56  batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
57  };
58  struct batch_entry_cmp
59  {
60  bool operator () (const batch_entry & a, const batch_entry & b) const
61  {
62  return (a.offset > b.offset);
63  }
64  };
65 
66  typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
67  batch_type batch_write_blocks; // sorted sequence of blocks to write
68 
69 public:
74  buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
75  nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
76  writebatchsize(write_batch_size ? write_batch_size : 1)
77  {
78  write_buffers = new block_type[nwriteblocks];
79  write_reqs = new request_ptr[nwriteblocks];
80  write_bids = new bid_type[nwriteblocks];
81 
82  for (unsigned_type i = 0; i < nwriteblocks; i++)
83  free_write_blocks.push_back(i);
84 
85 
86  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
87  }
90  block_type * get_free_block()
91  {
92  int_type ibuffer;
93  for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
94  it != busy_write_blocks.end(); ++it)
95  {
96  if (write_reqs[ibuffer = (*it)]->poll())
97  {
98  busy_write_blocks.erase(it);
99  free_write_blocks.push_back(ibuffer);
100 
101  break;
102  }
103  }
104  if (UNLIKELY(free_write_blocks.empty()))
105  {
106  int_type size = busy_write_blocks.size();
107  request_ptr * reqs = new request_ptr[size];
108  int_type i = 0;
109  for ( ; i < size; ++i)
110  {
111  reqs[i] = write_reqs[busy_write_blocks[i]];
112  }
113  int_type completed = wait_any(reqs, size);
114  int_type completed_global = busy_write_blocks[completed];
115  delete[] reqs;
116  busy_write_blocks.erase(busy_write_blocks.begin() + completed);
117 
118  return (write_buffers + completed_global);
119  }
120  ibuffer = free_write_blocks.back();
121  free_write_blocks.pop_back();
122 
123  return (write_buffers + ibuffer);
124  }
130  block_type * write(block_type * filled_block, const bid_type & bid) // writes filled_block and returns a new block
131  {
132  if (batch_write_blocks.size() >= writebatchsize)
133  {
134  // flush batch
135  while (!batch_write_blocks.empty())
136  {
137  int_type ibuffer = batch_write_blocks.top().ibuffer;
138  batch_write_blocks.pop();
139 
140  if (write_reqs[ibuffer].valid())
141  write_reqs[ibuffer]->wait();
142 
143  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
144 
145  busy_write_blocks.push_back(ibuffer);
146  }
147  }
148  // STXXL_MSG("Adding write request to batch");
149 
150  int_type ibuffer = filled_block - write_buffers;
151  write_bids[ibuffer] = bid;
152  batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
153 
154  return get_free_block();
155  }
157  void flush()
158  {
159  int_type ibuffer;
160  while (!batch_write_blocks.empty())
161  {
162  ibuffer = batch_write_blocks.top().ibuffer;
163  batch_write_blocks.pop();
164 
165  if (write_reqs[ibuffer].valid())
166  write_reqs[ibuffer]->wait();
167 
168  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
169 
170  busy_write_blocks.push_back(ibuffer);
171  }
172  for (std::vector<int_type>::const_iterator it =
173  busy_write_blocks.begin();
174  it != busy_write_blocks.end(); it++)
175  {
176  ibuffer = *it;
177  write_reqs[ibuffer]->wait();
178  }
179 
180  assert(batch_write_blocks.empty());
181  free_write_blocks.clear();
182  busy_write_blocks.clear();
183 
184  for (unsigned_type i = 0; i < nwriteblocks; i++)
185  free_write_blocks.push_back(i);
186  }
187 
190  {
191  int_type ibuffer;
192  while (!batch_write_blocks.empty())
193  {
194  ibuffer = batch_write_blocks.top().ibuffer;
195  batch_write_blocks.pop();
196 
197  if (write_reqs[ibuffer].valid())
198  write_reqs[ibuffer]->wait();
199 
200  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
201 
202  busy_write_blocks.push_back(ibuffer);
203  }
204  for (std::vector<int_type>::const_iterator it =
205  busy_write_blocks.begin();
206  it != busy_write_blocks.end(); it++)
207  {
208  ibuffer = *it;
209  write_reqs[ibuffer]->wait();
210  }
211 
212  delete[] write_reqs;
213  delete[] write_buffers;
214  delete[] write_bids;
215  }
216 };
217 
219 
220 __STXXL_END_NAMESPACE
221 
222 #endif // !STXXL_BUFFERED_WRITER_HEADER