Stxxl
1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/ksort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 #ifndef STXXL_KSORT_HEADER 00015 #define STXXL_KSORT_HEADER 00016 00017 #include <list> 00018 00019 #include <stxxl/bits/mng/mng.h> 00020 #include <stxxl/bits/common/rand.h> 00021 #include <stxxl/bits/mng/adaptor.h> 00022 #include <stxxl/bits/common/simple_vector.h> 00023 #include <stxxl/bits/common/switch.h> 00024 #include <stxxl/bits/mng/block_alloc_interleaved.h> 00025 #include <stxxl/bits/algo/intksort.h> 00026 #include <stxxl/bits/algo/adaptor.h> 00027 #include <stxxl/bits/algo/async_schedule.h> 00028 #include <stxxl/bits/mng/block_prefetcher.h> 00029 #include <stxxl/bits/mng/buf_writer.h> 00030 #include <stxxl/bits/algo/run_cursor.h> 00031 #include <stxxl/bits/algo/losertree.h> 00032 #include <stxxl/bits/algo/inmemsort.h> 00033 00034 00035 //#define SORT_OPTIMAL_PREFETCHING 00036 //#define INTERLEAVED_ALLOC 00037 00038 #define OPT_MERGING 00039 00040 __STXXL_BEGIN_NAMESPACE 00041 00044 00049 00052 namespace ksort_local 00053 { 00054 template <typename _BIDTp, typename _KeyTp> 00055 struct trigger_entry 00056 { 00057 typedef _BIDTp bid_type; 00058 typedef _KeyTp key_type; 00059 00060 bid_type bid; 00061 key_type key; 00062 00063 operator bid_type () 00064 { 00065 return bid; 00066 } 00067 }; 00068 00069 00070 template <typename _BIDTp, typename _KeyTp> 00071 inline bool operator < (const trigger_entry<_BIDTp, _KeyTp> & a, 00072 const trigger_entry<_BIDTp, _KeyTp> & b) 00073 { 00074 return (a.key < b.key); 00075 } 00076 00077 template <typename _BIDTp, typename _KeyTp> 00078 inline bool operator > (const trigger_entry<_BIDTp, _KeyTp> & a, 00079 const trigger_entry<_BIDTp, _KeyTp> & b) 00080 { 00081 return (a.key > b.key); 00082 } 00083 00084 template <typename type, typename key_type1> 00085 struct type_key 00086 { 00087 typedef key_type1 key_type; 00088 key_type key; 00089 type * ptr; 00090 00091 type_key() { } 00092 type_key(key_type k, type * p) : key(k), ptr(p) 00093 { } 00094 }; 00095 00096 template <typename type, typename key1> 00097 bool operator < (const type_key<type, key1> & a, const type_key<type, key1> & b) 00098 { 00099 return a.key < b.key; 00100 } 00101 00102 template <typename type, typename key1> 00103 bool operator > (const type_key<type, key1> & a, const type_key<type, key1> & b) 00104 { 00105 return a.key > b.key; 00106 } 00107 00108 00109 template <typename block_type, typename bid_type> 00110 struct write_completion_handler 00111 { 00112 block_type * block; 00113 bid_type bid; 00114 request_ptr * req; 00115 void operator () (request * /*completed_req*/) 00116 { 00117 * req = block->read(bid); 00118 } 00119 }; 00120 00121 template <typename type_key_, 00122 typename block_type, 00123 typename run_type, 00124 typename input_bid_iterator, 00125 typename key_extractor> 00126 inline void write_out( 00127 type_key_ * begin, 00128 type_key_ * end, 00129 block_type * & cur_blk, 00130 const block_type * end_blk, 00131 int_type & out_block, 00132 int_type & out_pos, 00133 run_type & run, 00134 write_completion_handler<block_type, typename block_type::bid_type> * & next_read, 00135 typename block_type::bid_type * & bids, 00136 request_ptr * write_reqs, 00137 request_ptr * read_reqs, 00138 input_bid_iterator & it, 00139 key_extractor keyobj) 00140 { 00141 typedef typename block_type::bid_type bid_type; 00142 typedef typename block_type::type type; 00143 00144 type * elem = cur_blk->elem; 00145 for (type_key_ * p = begin; p < end; p++) 00146 { 00147 elem[out_pos++] = *(p->ptr); 00148 00149 if (out_pos >= block_type::size) 00150 { 00151 run[out_block].key = keyobj(*(cur_blk->elem)); 00152 00153 if (cur_blk < end_blk) 00154 { 00155 next_read->block = cur_blk; 00156 next_read->req = read_reqs + out_block; 00157 read_reqs[out_block] = NULL; 00158 bids[out_block] = next_read->bid = *(it++); 00159 00160 write_reqs[out_block] = cur_blk->write( 00161 run[out_block].bid, 00162 // postpone read of block from next run 00163 // after write of block from this run 00164 *(next_read++)); 00165 } 00166 else 00167 { 00168 write_reqs[out_block] = cur_blk->write(run[out_block].bid); 00169 } 00170 00171 cur_blk++; 00172 elem = cur_blk->elem; 00173 out_block++; 00174 out_pos = 0; 00175 } 00176 } 00177 } 00178 00179 template < 00180 typename block_type, 00181 typename run_type, 00182 typename input_bid_iterator, 00183 typename key_extractor> 00184 void 00185 create_runs( 00186 input_bid_iterator it, 00187 run_type ** runs, 00188 const unsigned_type nruns, 00189 const unsigned_type m2, 00190 key_extractor keyobj) 00191 { 00192 typedef typename block_type::value_type type; 00193 typedef typename block_type::bid_type bid_type; 00194 typedef typename key_extractor::key_type key_type; 00195 typedef type_key<type, key_type> type_key_; 00196 00197 block_manager * bm = block_manager::get_instance(); 00198 block_type * Blocks1 = new block_type[m2]; 00199 block_type * Blocks2 = new block_type[m2]; 00200 bid_type * bids = new bid_type[m2]; 00201 type_key_ * refs1 = new type_key_[m2 * Blocks1->size]; 00202 type_key_ * refs2 = new type_key_[m2 * Blocks1->size]; 00203 request_ptr * read_reqs = new request_ptr[m2]; 00204 request_ptr * write_reqs = new request_ptr[m2]; 00205 write_completion_handler<block_type, bid_type> * next_run_reads = 00206 new write_completion_handler<block_type, bid_type>[m2]; 00207 00208 run_type * run; 00209 run = *runs; 00210 int_type run_size = (*runs)->size(); 00211 key_type offset = 0; 00212 const int log_k1 = 00213 static_cast<int>(ceil(log2((m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) ? 00214 (double(m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE)) : 2.))); 00215 const int log_k2 = int(log2(double(m2 * Blocks1->size))) - log_k1 - 1; 00216 STXXL_VERBOSE("log_k1: " << log_k1 << " log_k2:" << log_k2); 00217 const int_type k1 = 1 << log_k1; 00218 const int_type k2 = 1 << log_k2; 00219 int_type * bucket1 = new int_type[k1]; 00220 int_type * bucket2 = new int_type[k2]; 00221 int_type i; 00222 00223 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00224 00225 for (i = 0; i < run_size; i++) 00226 { 00227 bids[i] = *(it++); 00228 read_reqs[i] = Blocks1[i].read(bids[i]); 00229 } 00230 00231 unsigned_type k = 0; 00232 const int shift1 = sizeof(key_type) * 8 - log_k1; 00233 const int shift2 = shift1 - log_k2; 00234 STXXL_VERBOSE("shift1: " << shift1 << " shift2:" << shift2); 00235 00236 for ( ; k < nruns; k++) 00237 { 00238 run = runs[k]; 00239 run_size = run->size(); 00240 00241 std::fill(bucket1, bucket1 + k1, 0); 00242 00243 type_key_ * ref_ptr = refs1; 00244 for (i = 0; i < run_size; i++) 00245 { 00246 if (k) 00247 write_reqs[i]->wait(); 00248 00249 read_reqs[i]->wait(); 00250 bm->delete_block(bids[i]); 00251 00252 classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj); 00253 } 00254 00255 exclusive_prefix_sum(bucket1, k1); 00256 classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1, 00257 offset, shift1); 00258 00259 int_type out_block = 0; 00260 int_type out_pos = 0; 00261 unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0; 00262 00263 // recurse on each bucket 00264 type_key_ * c = refs2; 00265 type_key_ * d = refs1; 00266 block_type * cur_blk = Blocks2; 00267 block_type * end_blk = Blocks2 + next_run_size; 00268 write_completion_handler<block_type, bid_type> * next_read = next_run_reads; 00269 00270 for (i = 0; i < k1; i++) 00271 { 00272 type_key_ * cEnd = refs2 + bucket1[i]; 00273 type_key_ * dEnd = refs1 + bucket1[i]; 00274 00275 l1sort(c, cEnd, d, bucket2, k2, 00276 offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2); // key_type,key_type,... paranoia 00277 00278 write_out( 00279 d, dEnd, cur_blk, end_blk, 00280 out_block, out_pos, *run, next_read, bids, 00281 write_reqs, read_reqs, it, keyobj); 00282 00283 c = cEnd; 00284 d = dEnd; 00285 } 00286 00287 std::swap(Blocks1, Blocks2); 00288 } 00289 00290 wait_all(write_reqs, m2); 00291 00292 delete[] bucket1; 00293 delete[] bucket2; 00294 delete[] refs1; 00295 delete[] refs2; 00296 delete[] Blocks1; 00297 delete[] Blocks2; 00298 delete[] bids; 00299 delete[] next_run_reads; 00300 delete[] read_reqs; 00301 delete[] write_reqs; 00302 } 00303 00304 template <typename block_type, 00305 typename prefetcher_type, 00306 typename key_extractor> 00307 struct run_cursor2_cmp 00308 { 00309 typedef run_cursor2<block_type, prefetcher_type> cursor_type; 00310 key_extractor keyobj; 00311 run_cursor2_cmp(key_extractor keyobj_) 00312 { 00313 keyobj = keyobj_; 00314 } 00315 inline bool operator () (const cursor_type & a, const cursor_type & b) 00316 { 00317 if (UNLIKELY(b.empty())) 00318 return true; 00319 // sentinel emulation 00320 if (UNLIKELY(a.empty())) 00321 return false; 00322 //sentinel emulation 00323 00324 return (keyobj(a.current()) < keyobj(b.current())); 00325 } 00326 00327 private: 00328 run_cursor2_cmp() { } 00329 }; 00330 00331 00332 template <typename record_type, typename key_extractor> 00333 class key_comparison : public std::binary_function<record_type, record_type, bool> 00334 { 00335 key_extractor ke; 00336 00337 public: 00338 key_comparison() { } 00339 key_comparison(key_extractor ke_) : ke(ke_) { } 00340 bool operator () (const record_type & a, const record_type & b) const 00341 { 00342 return ke(a) < ke(b); 00343 } 00344 }; 00345 00346 00347 template <typename block_type, typename run_type, typename key_ext_> 00348 bool check_ksorted_runs(run_type ** runs, 00349 unsigned_type nruns, 00350 unsigned_type m, 00351 key_ext_ keyext) 00352 { 00353 typedef typename block_type::value_type value_type; 00354 00355 //STXXL_VERBOSE1("check_sorted_runs Runs: "<<nruns); 00356 STXXL_MSG("check_sorted_runs Runs: " << nruns); 00357 unsigned_type irun = 0; 00358 for (irun = 0; irun < nruns; ++irun) 00359 { 00360 const unsigned_type nblocks_per_run = runs[irun]->size(); 00361 unsigned_type blocks_left = nblocks_per_run; 00362 block_type * blocks = new block_type[m]; 00363 request_ptr * reqs = new request_ptr[m]; 00364 value_type last = keyext.min_value(); 00365 00366 for (unsigned_type off = 0; off < nblocks_per_run; off += m) 00367 { 00368 const unsigned_type nblocks = STXXL_MIN(blocks_left, m); 00369 const unsigned_type nelements = nblocks * block_type::size; 00370 blocks_left -= nblocks; 00371 00372 for (unsigned_type j = 0; j < nblocks; ++j) 00373 { 00374 reqs[j] = blocks[j].read((*runs[irun])[off + j].bid); 00375 } 00376 wait_all(reqs, reqs + nblocks); 00377 00378 if (off && (keyext(blocks[0][0]) < keyext(last))) 00379 { 00380 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun); 00381 STXXL_MSG(" first value: " << blocks[0][0] << " with key" << keyext(blocks[0][0])); 00382 STXXL_MSG(" last value: " << last << " with key" << keyext(last)); 00383 for (unsigned_type k = 0; k < block_type::size; ++k) 00384 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k] << " key: " << keyext(blocks[0][k])); 00385 return false; 00386 } 00387 00388 for (unsigned_type j = 0; j < nblocks; ++j) 00389 { 00390 if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key) 00391 { 00392 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (off + j)); 00393 STXXL_MSG(" trigger value: " << (*runs[irun])[off + j].key); 00394 STXXL_MSG("Data in the block:"); 00395 for (unsigned_type k = 0; k < block_type::size; ++k) 00396 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k])); 00397 00398 STXXL_MSG("BIDS:"); 00399 for (unsigned_type k = 0; k < nblocks; ++k) 00400 { 00401 if (k == j) 00402 STXXL_MSG("Bad one comes next."); 00403 STXXL_MSG("BID " << (off + k) << " is: " << ((*runs[irun])[off + k].bid)); 00404 } 00405 00406 return false; 00407 } 00408 } 00409 if (!stxxl::is_sorted( 00410 #if 1 00411 ArrayOfSequencesIterator< 00412 block_type, typename block_type::value_type, block_type::size 00413 >(blocks, 0), 00414 ArrayOfSequencesIterator< 00415 block_type, typename block_type::value_type, block_type::size 00416 >(blocks, nelements), 00417 #else 00418 TwoToOneDimArrayRowAdaptor< 00419 block_type, value_type, block_type::size 00420 >(blocks, 0), 00421 TwoToOneDimArrayRowAdaptor< 00422 block_type, value_type, block_type::size 00423 >(blocks, nelements), 00424 #endif 00425 key_comparison<value_type, key_ext_>())) 00426 { 00427 STXXL_MSG("check_sorted_runs wrong order in the run " << irun); 00428 STXXL_MSG("Data in blocks:"); 00429 for (unsigned_type j = 0; j < nblocks; ++j) 00430 { 00431 for (unsigned_type k = 0; k < block_type::size; ++k) 00432 STXXL_MSG(" Element " << k << " in block " << (off + j) << " is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k])); 00433 } 00434 STXXL_MSG("BIDS:"); 00435 for (unsigned_type k = 0; k < nblocks; ++k) 00436 { 00437 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00438 } 00439 00440 return false; 00441 } 00442 last = blocks[nblocks - 1][block_type::size - 1]; 00443 } 00444 00445 assert(blocks_left == 0); 00446 delete[] reqs; 00447 delete[] blocks; 00448 } 00449 00450 return true; 00451 } 00452 00453 00454 template <typename block_type, typename run_type, typename key_extractor> 00455 void merge_runs(run_type ** in_runs, unsigned_type nruns, run_type * out_run, unsigned_type _m, key_extractor keyobj) 00456 { 00457 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00458 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00459 00460 unsigned_type i; 00461 run_type consume_seq(out_run->size()); 00462 00463 int_type * prefetch_seq = new int_type[out_run->size()]; 00464 00465 typename run_type::iterator copy_start = consume_seq.begin(); 00466 for (i = 0; i < nruns; i++) 00467 { 00468 // TODO: try to avoid copy 00469 copy_start = std::copy( 00470 in_runs[i]->begin(), 00471 in_runs[i]->end(), 00472 copy_start); 00473 } 00474 std::stable_sort(consume_seq.begin(), consume_seq.end()); 00475 00476 unsigned disks_number = config::get_instance()->disks_number(); 00477 00478 #ifdef PLAY_WITH_OPT_PREF 00479 const int_type n_write_buffers = 4 * disks_number; 00480 #else 00481 const int_type n_prefetch_buffers = STXXL_MAX(int_type(2 * disks_number), (3 * (int_type(_m) - int_type(nruns)) / 4)); 00482 STXXL_VERBOSE("Prefetch buffers " << n_prefetch_buffers); 00483 const int_type n_write_buffers = STXXL_MAX(int_type(2 * disks_number), int_type(_m) - int_type(nruns) - int_type(n_prefetch_buffers)); 00484 STXXL_VERBOSE("Write buffers " << n_write_buffers); 00485 // heuristic 00486 const int_type n_opt_prefetch_buffers = 2 * int_type(disks_number) + (3 * (int_type(n_prefetch_buffers) - int_type(2 * disks_number))) / 10; 00487 STXXL_VERBOSE("Prefetch buffers " << n_opt_prefetch_buffers); 00488 #endif 00489 00490 #ifdef SORT_OPTIMAL_PREFETCHING 00491 compute_prefetch_schedule( 00492 consume_seq, 00493 prefetch_seq, 00494 n_opt_prefetch_buffers, 00495 disks_number); 00496 #else 00497 for (i = 0; i < out_run->size(); i++) 00498 prefetch_seq[i] = i; 00499 00500 #endif 00501 00502 00503 prefetcher_type prefetcher(consume_seq.begin(), 00504 consume_seq.end(), 00505 prefetch_seq, 00506 nruns + n_prefetch_buffers); 00507 00508 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2); 00509 00510 unsigned_type out_run_size = out_run->size(); 00511 00512 run_cursor2_cmp<block_type, prefetcher_type, key_extractor> cmp(keyobj); 00513 loser_tree< 00514 run_cursor_type, 00515 run_cursor2_cmp<block_type, prefetcher_type, key_extractor>, 00516 block_type::size> losers(&prefetcher, nruns, cmp); 00517 00518 00519 block_type * out_buffer = writer.get_free_block(); 00520 00521 for (i = 0; i < out_run_size; i++) 00522 { 00523 losers.multi_merge(out_buffer->elem); 00524 (*out_run)[i].key = keyobj(out_buffer->elem[0]); 00525 out_buffer = writer.write(out_buffer, (*out_run)[i].bid); 00526 } 00527 00528 delete[] prefetch_seq; 00529 00530 block_manager * bm = block_manager::get_instance(); 00531 for (i = 0; i < nruns; i++) 00532 { 00533 unsigned_type sz = in_runs[i]->size(); 00534 for (unsigned_type j = 0; j < sz; j++) 00535 bm->delete_block((*in_runs[i])[j].bid); 00536 00537 delete in_runs[i]; 00538 } 00539 } 00540 00541 00542 template <typename block_type, 00543 typename alloc_strategy, 00544 typename input_bid_iterator, 00545 typename key_extractor> 00546 00547 simple_vector<trigger_entry<typename block_type::bid_type, typename key_extractor::key_type> > * 00548 ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj) 00549 { 00550 typedef typename block_type::value_type type; 00551 typedef typename key_extractor::key_type key_type; 00552 typedef typename block_type::bid_type bid_type; 00553 typedef trigger_entry<bid_type, typename key_extractor::key_type> trigger_entry_type; 00554 typedef simple_vector<trigger_entry_type> run_type; 00555 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy; 00556 00557 unsigned_type m2 = div_and_round_up(_m, 2); 00558 const unsigned_type m2_rf = m2 * block_type::raw_size / 00559 (block_type::raw_size + block_type::size * sizeof(type_key<type, key_type>)); 00560 STXXL_VERBOSE("Reducing number of blocks in a run from " << m2 << " to " << 00561 m2_rf << " due to key size: " << sizeof(typename key_extractor::key_type) << " bytes"); 00562 m2 = m2_rf; 00563 unsigned_type full_runs = _n / m2; 00564 unsigned_type partial_runs = ((_n % m2) ? 1 : 0); 00565 unsigned_type nruns = full_runs + partial_runs; 00566 unsigned_type i; 00567 00568 config * cfg = config::get_instance(); 00569 block_manager * mng = block_manager::get_instance(); 00570 int ndisks = cfg->disks_number(); 00571 00572 STXXL_VERBOSE("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs); 00573 00574 double begin = timestamp(), after_runs_creation, end; 00575 00576 run_type ** runs = new run_type *[nruns]; 00577 00578 for (i = 0; i < full_runs; i++) 00579 runs[i] = new run_type(m2); 00580 00581 00582 #ifdef INTERLEAVED_ALLOC 00583 if (partial_runs) 00584 { 00585 unsigned_type last_run_size = _n - full_runs * m2; 00586 runs[i] = new run_type(last_run_size); 00587 00588 mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks), 00589 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type> 00590 (runs, 0, nruns, last_run_size), 00591 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type> 00592 (runs, _n, nruns, last_run_size)); 00593 } 00594 else 00595 mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks), 00596 RunsToBIDArrayAdaptor<block_type::raw_size, run_type> 00597 (runs, 0, nruns), 00598 RunsToBIDArrayAdaptor<block_type::raw_size, run_type> 00599 (runs, _n, nruns)); 00600 00601 #else 00602 if (partial_runs) 00603 runs[i] = new run_type(_n - full_runs * m2); 00604 00605 00606 for (i = 0; i < nruns; i++) 00607 { 00608 mng->new_blocks(alloc_strategy(0, ndisks), 00609 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()), 00610 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end())); 00611 } 00612 #endif 00613 00614 create_runs<block_type, 00615 run_type, 00616 input_bid_iterator, 00617 key_extractor>(input_bids, runs, nruns, m2, keyobj); 00618 00619 after_runs_creation = timestamp(); 00620 00621 double io_wait_after_rf = stats::get_instance()->get_io_wait_time(); 00622 00623 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00624 00625 // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) at once 00626 00627 const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(_m)))))); 00628 run_type ** new_runs; 00629 00630 while (nruns > 1) 00631 { 00632 int_type new_nruns = div_and_round_up(nruns, merge_factor); 00633 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 00634 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns); 00635 00636 new_runs = new run_type *[new_nruns]; 00637 00638 int_type runs_left = nruns; 00639 int_type cur_out_run = 0; 00640 int_type blocks_in_new_run = 0; 00641 00642 while (runs_left > 0) 00643 { 00644 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00645 blocks_in_new_run = 0; 00646 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++) 00647 blocks_in_new_run += runs[i]->size(); 00648 00649 // allocate run 00650 new_runs[cur_out_run++] = new run_type(blocks_in_new_run); 00651 runs_left -= runs2merge; 00652 } 00653 // allocate blocks in the new runs 00654 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1)) 00655 { 00656 // if we sort a file we can reuse the input bids for the output 00657 input_bid_iterator cur = input_bids; 00658 for (int_type i = 0; cur != (input_bids + _n); ++cur) 00659 { 00660 (*new_runs[0])[i++].bid = *cur; 00661 } 00662 00663 bid_type & firstBID = (*new_runs[0])[0].bid; 00664 if (firstBID.storage->get_id() != -1) 00665 { 00666 // the first block does not belong to the file 00667 // need to reallocate it 00668 mng->new_blocks(FR(), &firstBID, (&firstBID) + 1); 00669 } 00670 bid_type & lastBID = (*new_runs[0])[_n - 1].bid; 00671 if (lastBID.storage->get_id() != -1) 00672 { 00673 // the first block does not belong to the file 00674 // need to reallocate it 00675 mng->new_blocks(FR(), &lastBID, (&lastBID) + 1); 00676 } 00677 } 00678 else 00679 { 00680 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks), 00681 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run), 00682 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run)); 00683 } 00684 00685 00686 // merge all 00687 runs_left = nruns; 00688 cur_out_run = 0; 00689 while (runs_left > 0) 00690 { 00691 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00692 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00693 assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj))); 00694 #endif 00695 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 00696 merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, 00697 runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj); 00698 runs_left -= runs2merge; 00699 } 00700 00701 nruns = new_nruns; 00702 delete[] runs; 00703 runs = new_runs; 00704 } 00705 00706 run_type * result = *runs; 00707 delete[] runs; 00708 00709 end = timestamp(); 00710 00711 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " << 00712 after_runs_creation - begin << " s"); 00713 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s"); 00714 STXXL_VERBOSE(*stats::get_instance()); 00715 UNUSED(begin + io_wait_after_rf); 00716 00717 return result; 00718 } 00719 } 00720 00721 00757 00758 00759 00760 00761 00762 00763 00764 template <typename ExtIterator_, typename KeyExtractor_> 00765 void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__) 00766 { 00767 typedef simple_vector<ksort_local::trigger_entry<typename ExtIterator_::bid_type, 00768 typename KeyExtractor_::key_type> > run_type; 00769 typedef typename ExtIterator_::vector_type::value_type value_type; 00770 typedef typename ExtIterator_::block_type block_type; 00771 00772 00773 unsigned_type n = 0; 00774 block_manager * mng = block_manager::get_instance(); 00775 00776 first_.flush(); 00777 00778 if ((last_ - first_) * sizeof(value_type) < M__) 00779 { 00780 stl_in_memory_sort(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>(keyobj)); 00781 } 00782 else 00783 { 00784 assert(2 * block_type::raw_size <= M__); 00785 00786 if (first_.block_offset()) 00787 { 00788 if (last_.block_offset()) // first and last element reside 00789 // not in the beginning of the block 00790 { 00791 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00792 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00793 typename ExtIterator_::bid_type first_bid, last_bid; 00794 request_ptr req; 00795 00796 req = first_block->read(*first_.bid()); 00797 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap 00798 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1); 00799 req->wait(); 00800 00801 00802 req = last_block->read(*last_.bid()); 00803 00804 unsigned_type i = 0; 00805 for ( ; i < first_.block_offset(); i++) 00806 { 00807 first_block->elem[i] = keyobj.min_value(); 00808 } 00809 00810 req->wait(); 00811 00812 req = first_block->write(first_bid); 00813 for (i = last_.block_offset(); i < block_type::size; i++) 00814 { 00815 last_block->elem[i] = keyobj.max_value(); 00816 } 00817 00818 req->wait(); 00819 00820 req = last_block->write(last_bid); 00821 00822 n = last_.bid() - first_.bid() + 1; 00823 00824 std::swap(first_bid, *first_.bid()); 00825 std::swap(last_bid, *last_.bid()); 00826 00827 req->wait(); 00828 00829 delete first_block; 00830 delete last_block; 00831 00832 run_type * out = 00833 ksort_local::ksort_blocks< 00834 typename ExtIterator_::block_type, 00835 typename ExtIterator_::vector_type::alloc_strategy, 00836 typename ExtIterator_::bids_container_iterator, 00837 KeyExtractor_> 00838 (first_.bid(), n, M__ / block_type::raw_size, keyobj); 00839 00840 00841 first_block = new typename ExtIterator_::block_type; 00842 last_block = new typename ExtIterator_::block_type; 00843 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00844 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00845 request_ptr * reqs = new request_ptr[2]; 00846 00847 reqs[0] = first_block->read(first_bid); 00848 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00849 wait_all(reqs, 2); 00850 00851 reqs[0] = last_block->read(last_bid); 00852 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00853 00854 for (i = first_.block_offset(); i < block_type::size; i++) 00855 { 00856 first_block->elem[i] = sorted_first_block->elem[i]; 00857 } 00858 wait_all(reqs, 2); 00859 00860 req = first_block->write(first_bid); 00861 00862 for (i = 0; i < last_.block_offset(); i++) 00863 { 00864 last_block->elem[i] = sorted_last_block->elem[i]; 00865 } 00866 00867 req->wait(); 00868 00869 00870 req = last_block->write(last_bid); 00871 00872 mng->delete_block(out->begin()->bid); 00873 mng->delete_block((*out)[out->size() - 1].bid); 00874 00875 *first_.bid() = first_bid; 00876 *last_.bid() = last_bid; 00877 00878 typename run_type::iterator it = out->begin(); 00879 it++; 00880 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid(); 00881 cur_bid++; 00882 00883 for ( ; cur_bid != last_.bid(); cur_bid++, it++) 00884 { 00885 *cur_bid = (*it).bid; 00886 } 00887 00888 delete first_block; 00889 delete sorted_first_block; 00890 delete sorted_last_block; 00891 delete[] reqs; 00892 delete out; 00893 00894 req->wait(); 00895 00896 delete last_block; 00897 } 00898 else 00899 { 00900 // first element resides 00901 // not in the beginning of the block 00902 00903 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00904 typename ExtIterator_::bid_type first_bid; 00905 request_ptr req; 00906 00907 req = first_block->read(*first_.bid()); 00908 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap 00909 req->wait(); 00910 00911 00912 unsigned_type i = 0; 00913 for ( ; i < first_.block_offset(); i++) 00914 { 00915 first_block->elem[i] = keyobj.min_value(); 00916 } 00917 00918 req = first_block->write(first_bid); 00919 00920 n = last_.bid() - first_.bid(); 00921 00922 std::swap(first_bid, *first_.bid()); 00923 00924 req->wait(); 00925 00926 delete first_block; 00927 00928 run_type * out = 00929 ksort_local::ksort_blocks< 00930 typename ExtIterator_::block_type, 00931 typename ExtIterator_::vector_type::alloc_strategy, 00932 typename ExtIterator_::bids_container_iterator, 00933 KeyExtractor_> 00934 (first_.bid(), n, M__ / block_type::raw_size, keyobj); 00935 00936 00937 first_block = new typename ExtIterator_::block_type; 00938 00939 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00940 00941 request_ptr * reqs = new request_ptr[2]; 00942 00943 reqs[0] = first_block->read(first_bid); 00944 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00945 wait_all(reqs, 2); 00946 00947 for (i = first_.block_offset(); i < block_type::size; i++) 00948 { 00949 first_block->elem[i] = sorted_first_block->elem[i]; 00950 } 00951 00952 req = first_block->write(first_bid); 00953 00954 mng->delete_block(out->begin()->bid); 00955 00956 *first_.bid() = first_bid; 00957 00958 typename run_type::iterator it = out->begin(); 00959 it++; 00960 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid(); 00961 cur_bid++; 00962 00963 for ( ; cur_bid != last_.bid(); cur_bid++, it++) 00964 { 00965 *cur_bid = (*it).bid; 00966 } 00967 00968 *cur_bid = (*it).bid; 00969 00970 delete sorted_first_block; 00971 delete[] reqs; 00972 delete out; 00973 00974 req->wait(); 00975 00976 delete first_block; 00977 } 00978 } 00979 else 00980 { 00981 if (last_.block_offset()) // last element resides 00982 // not in the beginning of the block 00983 { 00984 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00985 typename ExtIterator_::bid_type last_bid; 00986 request_ptr req; 00987 unsigned_type i; 00988 00989 req = last_block->read(*last_.bid()); 00990 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1); 00991 req->wait(); 00992 00993 for (i = last_.block_offset(); i < block_type::size; i++) 00994 { 00995 last_block->elem[i] = keyobj.max_value(); 00996 } 00997 00998 req = last_block->write(last_bid); 00999 01000 n = last_.bid() - first_.bid() + 1; 01001 01002 std::swap(last_bid, *last_.bid()); 01003 01004 req->wait(); 01005 01006 delete last_block; 01007 01008 run_type * out = 01009 ksort_local::ksort_blocks< 01010 typename ExtIterator_::block_type, 01011 typename ExtIterator_::vector_type::alloc_strategy, 01012 typename ExtIterator_::bids_container_iterator, 01013 KeyExtractor_> 01014 (first_.bid(), n, M__ / block_type::raw_size, keyobj); 01015 01016 01017 last_block = new typename ExtIterator_::block_type; 01018 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 01019 request_ptr * reqs = new request_ptr[2]; 01020 01021 reqs[0] = last_block->read(last_bid); 01022 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 01023 wait_all(reqs, 2); 01024 01025 for (i = 0; i < last_.block_offset(); i++) 01026 { 01027 last_block->elem[i] = sorted_last_block->elem[i]; 01028 } 01029 01030 req = last_block->write(last_bid); 01031 01032 mng->delete_block((*out)[out->size() - 1].bid); 01033 01034 *last_.bid() = last_bid; 01035 01036 typename run_type::iterator it = out->begin(); 01037 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid(); 01038 01039 for ( ; cur_bid != last_.bid(); cur_bid++, it++) 01040 { 01041 *cur_bid = (*it).bid; 01042 } 01043 01044 delete sorted_last_block; 01045 delete[] reqs; 01046 delete out; 01047 01048 req->wait(); 01049 01050 delete last_block; 01051 } 01052 else 01053 { 01054 // first and last element reside in the beginning of blocks 01055 n = last_.bid() - first_.bid(); 01056 01057 run_type * out = 01058 ksort_local::ksort_blocks< 01059 typename ExtIterator_::block_type, 01060 typename ExtIterator_::vector_type::alloc_strategy, 01061 typename ExtIterator_::bids_container_iterator, 01062 KeyExtractor_> 01063 (first_.bid(), n, M__ / block_type::raw_size, keyobj); 01064 01065 typename run_type::iterator it = out->begin(); 01066 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid(); 01067 01068 for ( ; cur_bid != last_.bid(); cur_bid++, it++) 01069 { 01070 *cur_bid = (*it).bid; 01071 } 01072 01073 delete out; 01074 } 01075 } 01076 } 01077 01078 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01079 assert(stxxl::is_sorted(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>())); 01080 #endif 01081 } 01082 01083 template <typename record_type> 01084 struct ksort_defaultkey 01085 { 01086 typedef typename record_type::key_type key_type; 01087 key_type operator () (const record_type & obj) const 01088 { 01089 return obj.key(); 01090 } 01091 record_type max_value() const 01092 { 01093 return record_type::max_value(); 01094 } 01095 record_type min_value() const 01096 { 01097 return record_type::min_value(); 01098 } 01099 }; 01100 01101 01108 01117 template <typename ExtIterator_> 01118 void ksort(ExtIterator_ first_, ExtIterator_ last_, unsigned_type M__) 01119 { 01120 ksort(first_, last_, 01121 ksort_defaultkey<typename ExtIterator_::vector_type::value_type>(), M__); 01122 } 01123 01124 01126 01127 __STXXL_END_NAMESPACE 01128 01129 #endif // !STXXL_KSORT_HEADER 01130 // vim: et:ts=4:sw=4