Stxxl  1.2.1
stable_ksort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/stable_ksort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef STXXL_STABLE_KSORT_HEADER
14 #define STXXL_STABLE_KSORT_HEADER
15 
16 // it is a first try: distribution sort without sampling
17 // I rework the stable_ksort when I would have a time
18 
19 
20 #include <cmath>
21 
22 #include <stxxl/bits/mng/mng.h>
23 #include <stxxl/bits/mng/buf_istream.h>
24 #include <stxxl/bits/mng/buf_ostream.h>
25 #include <stxxl/bits/common/simple_vector.h>
26 #include <stxxl/bits/algo/intksort.h>
27 
28 #ifndef STXXL_VERBOSE_STABLE_KSORT
29 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
30 #endif
31 
32 
33 __STXXL_BEGIN_NAMESPACE
34 
37 
40 namespace stable_ksort_local
41 {
42  template <class type_, class type_key>
43  void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
44  {
45  for (type_ * p = begin; p < end; p++, out++) // count & create references
46  {
47  out->ptr = p;
48  typename type_::key_type key = p->key();
49  int_type ibucket = (key - offset) >> shift;
50  out->key = key;
51  bucket[ibucket]++;
52  }
53  }
54 
55  template <typename type>
56  struct type_key
57  {
58  typedef typename type::key_type key_type;
59  key_type key;
60  type * ptr;
61 
62  type_key() { }
63  type_key(key_type k, type * p) : key(k), ptr(p)
64  { }
65  };
66 
67  template <typename type>
68  bool operator < (const type_key<type> & a, const type_key<type> & b)
69  {
70  return a.key < b.key;
71  }
72 
73  template <typename type>
74  bool operator > (const type_key<type> & a, const type_key<type> & b)
75  {
76  return a.key > b.key;
77  }
78 
79 
80  template <typename BIDType_, typename AllocStrategy_>
81  class bid_sequence
82  {
83  public:
84  typedef BIDType_ bid_type;
85  typedef bid_type & reference;
86  typedef AllocStrategy_ alloc_strategy;
87  typedef typename simple_vector<bid_type>::size_type size_type;
88  typedef typename simple_vector<bid_type>::iterator iterator;
89 
90  protected:
91  simple_vector<bid_type> * bids;
92  alloc_strategy alloc_strategy_;
93 
94  public:
95  bid_sequence() { }
96  bid_sequence(size_type size_)
97  {
98  bids = new simple_vector<bid_type>(size_);
99  block_manager * mng = block_manager::get_instance();
100  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
101  }
102  void init(size_type size_)
103  {
104  bids = new simple_vector<bid_type>(size_);
105  block_manager * mng = block_manager::get_instance();
106  mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
107  }
108  reference operator [] (size_type i)
109  {
110  size_type size_ = size(); // cache size in a register
111  if (i < size_)
112  return *(bids->begin() + i);
113 
114  block_manager * mng = block_manager::get_instance();
115  simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
116  std::copy(bids->begin(), bids->end(), larger_bids->begin());
117  mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
118  delete bids;
119  bids = larger_bids;
120  return *(larger_bids->begin() + i);
121  }
122  size_type size() { return bids->size(); }
123  iterator begin() { return bids->begin(); }
124  ~bid_sequence()
125  {
126  block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
127  delete bids;
128  }
129  };
130 
131  template <typename ExtIterator_>
132  void distribute(
133  bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
134  typename ExtIterator_::vector_type::alloc_strategy> * bucket_bids,
135  int64 * bucket_sizes,
136  const int_type nbuckets,
137  const int_type lognbuckets,
138  ExtIterator_ first,
139  ExtIterator_ last,
140  const int_type nread_buffers,
141  const int_type nwrite_buffers)
142  {
143  typedef typename ExtIterator_::vector_type::value_type value_type;
144  typedef typename value_type::key_type key_type;
145  typedef typename ExtIterator_::block_type block_type;
146  typedef typename block_type::bid_type bid_type;
147  typedef buf_istream<typename ExtIterator_::block_type,
148  typename ExtIterator_::bids_container_iterator> buf_istream_type;
149 
150  int_type i = 0;
151 
152  buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
153  nread_buffers);
154 
156  nbuckets + nwrite_buffers,
157  nwrite_buffers);
158 
159  unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
160  unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
161  block_type ** bucket_blocks = new block_type *[nbuckets];
162 
163  std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
164  std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
165  std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
166 
167  for (i = 0; i < nbuckets; i++)
168  bucket_blocks[i] = out.get_free_block();
169 
170 
171  ExtIterator_ cur = first - first.block_offset();
172 
173  // skip part of the block before first untouched
174  for ( ; cur != first; cur++)
175  ++in;
176 
177 
178  const int_type shift = sizeof(key_type) * 8 - lognbuckets;
179  // search in the the range [_begin,_end)
180  STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
181  for ( ; cur != last; cur++)
182  {
183  key_type cur_key = in.current().key();
184  int_type ibucket = cur_key >> shift;
185 
186  int_type block_offset = bucket_block_offsets[ibucket];
187  in >> (bucket_blocks[ibucket]->elem[block_offset++]);
188  if (block_offset == block_type::size)
189  {
190  block_offset = 0;
191  int_type iblock = bucket_iblock[ibucket]++;
192  bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
193  }
194  bucket_block_offsets[ibucket] = block_offset;
195  }
196  for (i = 0; i < nbuckets; i++)
197  {
198  if (bucket_block_offsets[i])
199  {
200  out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
201  }
202  bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
203  bucket_block_offsets[i];
204  STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
205  ", estimated size: " << ((last - first) / int64(nbuckets)));
206  }
207 
208  delete[] bucket_blocks;
209  delete[] bucket_block_offsets;
210  delete[] bucket_iblock;
211  }
212 }
213 
214 template <typename ExtIterator_>
215 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
216 {
217  STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
218  typedef typename ExtIterator_::vector_type::value_type value_type;
219  typedef typename value_type::key_type key_type;
220  typedef typename ExtIterator_::block_type block_type;
221  typedef typename block_type::bid_type bid_type;
222  typedef typename ExtIterator_::vector_type::alloc_strategy alloc_strategy;
223  typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
224  typedef stable_ksort_local::type_key<value_type> type_key_;
225 
226  first.flush(); // flush container
227 
228  double begin = timestamp();
229 
230  unsigned_type i = 0;
231  config * cfg = config::get_instance();
232  const unsigned_type m = M / block_type::raw_size;
233  assert(2 * block_type::raw_size <= M);
234  const unsigned_type write_buffers_multiple = 2;
235  const unsigned_type read_buffers_multiple = 2;
236  const unsigned_type ndisks = cfg->disks_number();
237  const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
238  const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
239  const unsigned_type lognbuckets = static_cast<unsigned_type>(log2(double(nmaxbuckets)));
240  const unsigned_type nbuckets = 1 << lognbuckets;
241  const unsigned_type est_bucket_size = div_and_round_up((last - first) / int64(nbuckets),
242  int64(block_type::size)); //in blocks
243 
244  if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
245  STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
246  ", required for r/w buffers: " << min_num_read_write_buffers <<
247  ", required for buckets: 2, nbuckets: " << nbuckets);
248  abort();
249  }
250  STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
251  STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
252  const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
253  const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
254 
255  STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
256  STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
257 
258  bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
259  for (i = 0; i < nbuckets; ++i)
260  bucket_bids[i].init(est_bucket_size);
261 
262  int64 * bucket_sizes = new int64[nbuckets];
263 
264  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
265 
266  stable_ksort_local::distribute(
267  bucket_bids,
268  bucket_sizes,
269  nbuckets,
270  lognbuckets,
271  first,
272  last,
273  nread_buffers,
274  nwrite_buffers);
275 
276  double dist_end = timestamp(), end;
277  double io_wait_after_d = stats::get_instance()->get_io_wait_time();
278 
279  {
280  // sort buckets
281  unsigned_type write_buffers_multiple_bs = 2;
282  unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
283  int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records
284  int64 max_bucket_size_act = 0; // actual max bucket size
285  // establish output stream
286 
287  for (i = 0; i < nbuckets; i++)
288  {
289  max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
290  if (bucket_sizes[i] > max_bucket_size_rec)
291  {
292  STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
293  " records, maximum: " << max_bucket_size_rec);
294  STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
295  abort();
296  }
297  }
298  // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
299  // ... and decrease max_bucket_size_bl
300  const int_type max_bucket_size_act_bl = div_and_round_up(max_bucket_size_act, block_type::size);
301  STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
302  max_bucket_size_bl << " to " << max_bucket_size_act_bl);
303  max_bucket_size_rec = max_bucket_size_act;
304  max_bucket_size_bl = max_bucket_size_act_bl;
305  const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
306  STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
307 
309  buf_ostream_type out(first.bid(), nwrite_buffers_bs);
310 
311  disk_queues::get_instance()->set_priority_op(disk_queue::READ);
312 
313  if (first.block_offset())
314  {
315  // has to skip part of the first block
316  block_type * block = new block_type;
317  request_ptr req;
318  req = block->read(*first.bid());
319  req->wait();
320 
321  for (i = 0; i < first.block_offset(); i++)
322  {
323  out << block->elem[i];
324  }
325  delete block;
326  }
327  block_type * blocks1 = new block_type[max_bucket_size_bl];
328  block_type * blocks2 = new block_type[max_bucket_size_bl];
329  request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
330  request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
331  type_key_ * refs1 = new type_key_[max_bucket_size_rec];
332  type_key_ * refs2 = new type_key_[max_bucket_size_rec];
333 
334  // submit reading first 2 buckets (Peter's scheme)
335  unsigned_type nbucket_blocks = div_and_round_up(bucket_sizes[0], block_type::size);
336  for (i = 0; i < nbucket_blocks; i++)
337  reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
338 
339 
340  nbucket_blocks = div_and_round_up(bucket_sizes[1], block_type::size);
341  for (i = 0; i < nbucket_blocks; i++)
342  reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
343 
344 
345  key_type offset = 0;
346  const unsigned log_k1 =
347  (std::max)(static_cast<unsigned>(ceil(log2(double(
348  max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
349  unsigned_type k1 = 1 << log_k1;
350  int_type * bucket1 = new int_type[k1];
351 
352  const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
353  const unsigned shift1 = shift - log_k1;
354 
355  STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
356  " block size:" << block_type::size << " log_k1:" << log_k1);
357 
358  for (unsigned_type k = 0; k < nbuckets; k++)
359  {
360  nbucket_blocks = div_and_round_up(bucket_sizes[k], block_type::size);
361  const unsigned log_k1_k =
362  (std::max)(static_cast<unsigned>(ceil(log2(
363  double(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
364  assert(log_k1_k <= log_k1);
365  k1 = 1 << log_k1_k;
366  std::fill(bucket1, bucket1 + k1, 0);
367 
368  STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
369  " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
370  // classify first nbucket_blocks-1 blocks, they are full
371  type_key_ * ref_ptr = refs1;
372  key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
373  for (i = 0; i < nbucket_blocks - 1; i++)
374  {
375  reqs1[i]->wait();
376  stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/);
377  }
378  // last block might be non-full
379  const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
380  reqs1[i]->wait();
381 
382  //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
383 
384  classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
385 
386  exclusive_prefix_sum(bucket1, k1);
387  classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
388 
389  type_key_ * c = refs2;
390  type_key_ * d = refs1;
391  for (i = 0; i < k1; i++)
392  {
393  type_key_ * cEnd = refs2 + bucket1[i];
394  type_key_ * dEnd = refs1 + bucket1[i];
395 
396  const unsigned log_k2 = static_cast<unsigned>(log2(double(bucket1[i]))) - 1; // adaptive bucket size
397  const unsigned_type k2 = 1 << log_k2;
398  int_type * bucket2 = new int_type[k2];
399  const unsigned shift2 = shift1 - log_k2;
400 
401  // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
402  l1sort(c, cEnd, d, bucket2, k2,
403  offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
404  shift2);
405 
406  // write out all
407  for (type_key_ * p = d; p < dEnd; p++)
408  out << (*(p->ptr));
409 
410 
411  delete[] bucket2;
412  c = cEnd;
413  d = dEnd;
414  }
415  // submit next read
416  const unsigned_type bucket2submit = k + 2;
417  if (bucket2submit < nbuckets)
418  {
419  nbucket_blocks = div_and_round_up(bucket_sizes[bucket2submit], block_type::size);
420  for (i = 0; i < nbucket_blocks; i++)
421  reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
422  }
423 
424  std::swap(blocks1, blocks2);
425  std::swap(reqs1, reqs2);
426  }
427 
428  delete[] bucket1;
429  delete[] refs1;
430  delete[] refs2;
431  delete[] blocks1;
432  delete[] blocks2;
433  delete[] reqs1;
434  delete[] reqs2;
435  delete[] bucket_bids;
436  delete[] bucket_sizes;
437 
438  if (last.block_offset())
439  {
440  // has to skip part of the first block
441  block_type * block = new block_type;
442  request_ptr req = block->read(*last.bid());
443  req->wait();
444 
445  for (i = last.block_offset(); i < block_type::size; i++)
446  {
447  out << block->elem[i];
448  }
449  delete block;
450  }
451 
452  end = timestamp();
453  }
454 
455  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
456  dist_end - begin << " s");
457  STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
458  STXXL_VERBOSE(*stats::get_instance());
459  UNUSED(begin + dist_end + io_wait_after_d);
460 }
461 
463 
464 __STXXL_END_NAMESPACE
465 
466 #endif // !STXXL_STABLE_KSORT_HEADER