13 #ifndef STXXL_STABLE_KSORT_HEADER
14 #define STXXL_STABLE_KSORT_HEADER
22 #include <stxxl/bits/mng/mng.h>
23 #include <stxxl/bits/mng/buf_istream.h>
24 #include <stxxl/bits/mng/buf_ostream.h>
25 #include <stxxl/bits/common/simple_vector.h>
26 #include <stxxl/bits/algo/intksort.h>
28 #ifndef STXXL_VERBOSE_STABLE_KSORT
29 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
33 __STXXL_BEGIN_NAMESPACE
40 namespace stable_ksort_local
42 template <
class type_,
class type_key>
43 void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset,
unsigned shift)
45 for (type_ * p = begin; p < end; p++, out++)
48 typename type_::key_type key = p->key();
49 int_type ibucket = (key - offset) >> shift;
55 template <
typename type>
58 typedef typename type::key_type key_type;
63 type_key(key_type k, type * p) : key(k), ptr(p)
67 template <
typename type>
68 bool operator < (const type_key<type> & a,
const type_key<type> & b)
73 template <
typename type>
74 bool operator > (
const type_key<type> & a,
const type_key<type> & b)
80 template <
typename BIDType_,
typename AllocStrategy_>
84 typedef BIDType_ bid_type;
85 typedef bid_type & reference;
86 typedef AllocStrategy_ alloc_strategy;
87 typedef typename simple_vector<bid_type>::size_type size_type;
88 typedef typename simple_vector<bid_type>::iterator iterator;
91 simple_vector<bid_type> * bids;
92 alloc_strategy alloc_strategy_;
96 bid_sequence(size_type size_)
98 bids =
new simple_vector<bid_type>(size_);
100 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
102 void init(size_type size_)
104 bids =
new simple_vector<bid_type>(size_);
106 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
108 reference operator [] (size_type i)
110 size_type size_ = size();
112 return *(bids->begin() + i);
115 simple_vector<bid_type> * larger_bids =
new simple_vector<bid_type>((i + 1) * 2);
116 std::copy(bids->begin(), bids->end(), larger_bids->begin());
117 mng->
new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
120 return *(larger_bids->begin() + i);
122 size_type size() {
return bids->size(); }
123 iterator begin() {
return bids->begin(); }
126 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
131 template <
typename ExtIterator_>
133 bid_sequence<
typename ExtIterator_::vector_type::block_type::bid_type,
134 typename ExtIterator_::vector_type::alloc_strategy> * bucket_bids,
135 int64 * bucket_sizes,
136 const int_type nbuckets,
137 const int_type lognbuckets,
140 const int_type nread_buffers,
141 const int_type nwrite_buffers)
143 typedef typename ExtIterator_::vector_type::value_type value_type;
144 typedef typename value_type::key_type key_type;
145 typedef typename ExtIterator_::block_type block_type;
146 typedef typename block_type::bid_type bid_type;
147 typedef buf_istream<
typename ExtIterator_::block_type,
148 typename ExtIterator_::bids_container_iterator> buf_istream_type;
152 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
156 nbuckets + nwrite_buffers,
159 unsigned_type * bucket_block_offsets =
new unsigned_type[nbuckets];
160 unsigned_type * bucket_iblock =
new unsigned_type[nbuckets];
161 block_type ** bucket_blocks =
new block_type *[nbuckets];
163 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
164 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
165 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
167 for (i = 0; i < nbuckets; i++)
168 bucket_blocks[i] = out.get_free_block();
171 ExtIterator_ cur = first - first.block_offset();
174 for ( ; cur != first; cur++)
178 const int_type shift =
sizeof(key_type) * 8 - lognbuckets;
180 STXXL_VERBOSE_STABLE_KSORT(
"Shift by: " << shift <<
" bits, lognbuckets: " << lognbuckets);
181 for ( ; cur != last; cur++)
183 key_type cur_key = in.current().key();
184 int_type ibucket = cur_key >> shift;
186 int_type block_offset = bucket_block_offsets[ibucket];
187 in >> (bucket_blocks[ibucket]->elem[block_offset++]);
188 if (block_offset == block_type::size)
191 int_type iblock = bucket_iblock[ibucket]++;
192 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
194 bucket_block_offsets[ibucket] = block_offset;
196 for (i = 0; i < nbuckets; i++)
198 if (bucket_block_offsets[i])
200 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
202 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
203 bucket_block_offsets[i];
204 STXXL_VERBOSE_STABLE_KSORT(
"Bucket " << i <<
" has size " << bucket_sizes[i] <<
205 ", estimated size: " << ((last - first) / int64(nbuckets)));
208 delete[] bucket_blocks;
209 delete[] bucket_block_offsets;
210 delete[] bucket_iblock;
214 template <
typename ExtIterator_>
215 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
217 STXXL_MSG(
"Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
218 typedef typename ExtIterator_::vector_type::value_type value_type;
219 typedef typename value_type::key_type key_type;
220 typedef typename ExtIterator_::block_type block_type;
221 typedef typename block_type::bid_type bid_type;
222 typedef typename ExtIterator_::vector_type::alloc_strategy alloc_strategy;
223 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
224 typedef stable_ksort_local::type_key<value_type> type_key_;
228 double begin = timestamp();
231 config * cfg = config::get_instance();
232 const unsigned_type m = M / block_type::raw_size;
233 assert(2 * block_type::raw_size <= M);
234 const unsigned_type write_buffers_multiple = 2;
235 const unsigned_type read_buffers_multiple = 2;
237 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
238 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
239 const unsigned_type lognbuckets =
static_cast<unsigned_type
>(log2(
double(nmaxbuckets)));
240 const unsigned_type nbuckets = 1 << lognbuckets;
241 const unsigned_type est_bucket_size = div_and_round_up((last - first) / int64(nbuckets),
242 int64(block_type::size));
244 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
245 STXXL_ERRMSG(
"stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
246 ", required for r/w buffers: " << min_num_read_write_buffers <<
247 ", required for buckets: 2, nbuckets: " << nbuckets);
250 STXXL_VERBOSE_STABLE_KSORT(
"Elements to sort: " << (last - first));
251 STXXL_VERBOSE_STABLE_KSORT(
"Number of buckets has to be reduced from " << nmaxbuckets <<
" to " << nbuckets);
252 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
253 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
255 STXXL_VERBOSE_STABLE_KSORT(
"Read buffers in distribution phase: " << nread_buffers);
256 STXXL_VERBOSE_STABLE_KSORT(
"Write buffers in distribution phase: " << nwrite_buffers);
258 bucket_bids_type * bucket_bids =
new bucket_bids_type[nbuckets];
259 for (i = 0; i < nbuckets; ++i)
260 bucket_bids[i].init(est_bucket_size);
262 int64 * bucket_sizes =
new int64[nbuckets];
264 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
266 stable_ksort_local::distribute(
276 double dist_end = timestamp(), end;
277 double io_wait_after_d = stats::get_instance()->get_io_wait_time();
281 unsigned_type write_buffers_multiple_bs = 2;
282 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2;
283 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size;
284 int64 max_bucket_size_act = 0;
287 for (i = 0; i < nbuckets; i++)
289 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
290 if (bucket_sizes[i] > max_bucket_size_rec)
292 STXXL_ERRMSG(
"Bucket " << i <<
" is too large: " << bucket_sizes[i] <<
293 " records, maximum: " << max_bucket_size_rec);
294 STXXL_ERRMSG(
"Recursion on buckets is not yet implemented, aborting.");
300 const int_type max_bucket_size_act_bl = div_and_round_up(max_bucket_size_act, block_type::size);
301 STXXL_VERBOSE_STABLE_KSORT(
"Reducing required number of required blocks per bucket from " <<
302 max_bucket_size_bl <<
" to " << max_bucket_size_act_bl);
303 max_bucket_size_rec = max_bucket_size_act;
304 max_bucket_size_bl = max_bucket_size_act_bl;
305 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
306 STXXL_VERBOSE_STABLE_KSORT(
"Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
309 buf_ostream_type out(first.bid(), nwrite_buffers_bs);
311 disk_queues::get_instance()->set_priority_op(disk_queue::READ);
313 if (first.block_offset())
316 block_type * block =
new block_type;
318 req = block->read(*first.bid());
321 for (i = 0; i < first.block_offset(); i++)
323 out << block->elem[i];
327 block_type * blocks1 =
new block_type[max_bucket_size_bl];
328 block_type * blocks2 =
new block_type[max_bucket_size_bl];
331 type_key_ * refs1 =
new type_key_[max_bucket_size_rec];
332 type_key_ * refs2 =
new type_key_[max_bucket_size_rec];
335 unsigned_type nbucket_blocks = div_and_round_up(bucket_sizes[0], block_type::size);
336 for (i = 0; i < nbucket_blocks; i++)
337 reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
340 nbucket_blocks = div_and_round_up(bucket_sizes[1], block_type::size);
341 for (i = 0; i < nbucket_blocks; i++)
342 reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
346 const unsigned log_k1 =
347 (std::max)(static_cast<unsigned>(ceil(log2(
double(
348 max_bucket_size_rec *
sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
349 unsigned_type k1 = 1 << log_k1;
350 int_type * bucket1 =
new int_type[k1];
352 const unsigned shift =
sizeof(key_type) * 8 - lognbuckets;
353 const unsigned shift1 = shift - log_k1;
355 STXXL_VERBOSE_STABLE_KSORT(
"Classifying " << nbuckets <<
" buckets, max size:" << max_bucket_size_rec <<
356 " block size:" << block_type::size <<
" log_k1:" << log_k1);
358 for (unsigned_type k = 0; k < nbuckets; k++)
360 nbucket_blocks = div_and_round_up(bucket_sizes[k], block_type::size);
361 const unsigned log_k1_k =
362 (std::max)(static_cast<unsigned>(ceil(log2(
363 double(bucket_sizes[k] *
sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
364 assert(log_k1_k <= log_k1);
366 std::fill(bucket1, bucket1 + k1, 0);
368 STXXL_VERBOSE_STABLE_KSORT(
"Classifying bucket " << k <<
" size:" << bucket_sizes[k] <<
369 " blocks:" << nbucket_blocks <<
" log_k1:" << log_k1_k);
371 type_key_ * ref_ptr = refs1;
372 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
373 for (i = 0; i < nbucket_blocks - 1; i++)
376 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 );
379 const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
384 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
386 exclusive_prefix_sum(bucket1, k1);
387 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
389 type_key_ * c = refs2;
390 type_key_ * d = refs1;
391 for (i = 0; i < k1; i++)
393 type_key_ * cEnd = refs2 + bucket1[i];
394 type_key_ * dEnd = refs1 + bucket1[i];
396 const unsigned log_k2 =
static_cast<unsigned>(log2(
double(bucket1[i]))) - 1;
397 const unsigned_type k2 = 1 << log_k2;
398 int_type * bucket2 =
new int_type[k2];
399 const unsigned shift2 = shift1 - log_k2;
402 l1sort(c, cEnd, d, bucket2, k2,
403 offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
407 for (type_key_ * p = d; p < dEnd; p++)
416 const unsigned_type bucket2submit = k + 2;
417 if (bucket2submit < nbuckets)
419 nbucket_blocks = div_and_round_up(bucket_sizes[bucket2submit], block_type::size);
420 for (i = 0; i < nbucket_blocks; i++)
421 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
424 std::swap(blocks1, blocks2);
425 std::swap(reqs1, reqs2);
435 delete[] bucket_bids;
436 delete[] bucket_sizes;
438 if (last.block_offset())
441 block_type * block =
new block_type;
445 for (i = last.block_offset(); i < block_type::size; i++)
447 out << block->elem[i];
455 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Distribution time: " <<
456 dist_end - begin <<
" s");
457 STXXL_VERBOSE(
"Time in I/O wait(ds): " << io_wait_after_d <<
" s");
458 STXXL_VERBOSE(*stats::get_instance());
459 UNUSED(begin + dist_end + io_wait_after_d);
464 __STXXL_END_NAMESPACE
466 #endif // !STXXL_STABLE_KSORT_HEADER