Stxxl  1.2.1
ksort.h
00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/ksort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00008  *
00009  *  Distributed under the Boost Software License, Version 1.0.
00010  *  (See accompanying file LICENSE_1_0.txt or copy at
00011  *  http://www.boost.org/LICENSE_1_0.txt)
00012  **************************************************************************/
00013 
00014 #ifndef STXXL_KSORT_HEADER
00015 #define STXXL_KSORT_HEADER
00016 
00017 #include <list>
00018 
00019 #include <stxxl/bits/mng/mng.h>
00020 #include <stxxl/bits/common/rand.h>
00021 #include <stxxl/bits/mng/adaptor.h>
00022 #include <stxxl/bits/common/simple_vector.h>
00023 #include <stxxl/bits/common/switch.h>
00024 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00025 #include <stxxl/bits/algo/intksort.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/async_schedule.h>
00028 #include <stxxl/bits/mng/block_prefetcher.h>
00029 #include <stxxl/bits/mng/buf_writer.h>
00030 #include <stxxl/bits/algo/run_cursor.h>
00031 #include <stxxl/bits/algo/losertree.h>
00032 #include <stxxl/bits/algo/inmemsort.h>
00033 
00034 
00035 //#define SORT_OPTIMAL_PREFETCHING
00036 //#define INTERLEAVED_ALLOC
00037 
00038 #define OPT_MERGING
00039 
00040 __STXXL_BEGIN_NAMESPACE
00041 
00044 
00049 
00052 namespace ksort_local
00053 {
00054     template <typename _BIDTp, typename _KeyTp>
00055     struct trigger_entry
00056     {
00057         typedef _BIDTp bid_type;
00058         typedef _KeyTp key_type;
00059 
00060         bid_type bid;
00061         key_type key;
00062 
00063         operator bid_type ()
00064         {
00065             return bid;
00066         }
00067     };
00068 
00069 
00070     template <typename _BIDTp, typename _KeyTp>
00071     inline bool operator < (const trigger_entry<_BIDTp, _KeyTp> & a,
00072                             const trigger_entry<_BIDTp, _KeyTp> & b)
00073     {
00074         return (a.key < b.key);
00075     }
00076 
00077     template <typename _BIDTp, typename _KeyTp>
00078     inline bool operator > (const trigger_entry<_BIDTp, _KeyTp> & a,
00079                             const trigger_entry<_BIDTp, _KeyTp> & b)
00080     {
00081         return (a.key > b.key);
00082     }
00083 
00084     template <typename type, typename key_type1>
00085     struct type_key
00086     {
00087         typedef key_type1 key_type;
00088         key_type key;
00089         type * ptr;
00090 
00091         type_key() { }
00092         type_key(key_type k, type * p) : key(k), ptr(p)
00093         { }
00094     };
00095 
00096     template <typename type, typename key1>
00097     bool operator < (const type_key<type, key1> & a, const type_key<type, key1> & b)
00098     {
00099         return a.key < b.key;
00100     }
00101 
00102     template <typename type, typename key1>
00103     bool operator > (const type_key<type, key1> & a, const type_key<type, key1> & b)
00104     {
00105         return a.key > b.key;
00106     }
00107 
00108 
00109     template <typename block_type, typename bid_type>
00110     struct write_completion_handler
00111     {
00112         block_type * block;
00113         bid_type bid;
00114         request_ptr * req;
00115         void operator () (request * /*completed_req*/)
00116         {
00117             * req = block->read(bid);
00118         }
00119     };
00120 
00121     template <typename type_key_,
00122               typename block_type,
00123               typename run_type,
00124               typename input_bid_iterator,
00125               typename key_extractor>
00126     inline void write_out(
00127         type_key_ * begin,
00128         type_key_ * end,
00129         block_type * & cur_blk,
00130         const block_type * end_blk,
00131         int_type & out_block,
00132         int_type & out_pos,
00133         run_type & run,
00134         write_completion_handler<block_type, typename block_type::bid_type> * & next_read,
00135         typename block_type::bid_type * & bids,
00136         request_ptr * write_reqs,
00137         request_ptr * read_reqs,
00138         input_bid_iterator & it,
00139         key_extractor keyobj)
00140     {
00141         typedef typename block_type::bid_type bid_type;
00142         typedef typename block_type::type type;
00143 
00144         type * elem = cur_blk->elem;
00145         for (type_key_ * p = begin; p < end; p++)
00146         {
00147             elem[out_pos++] = *(p->ptr);
00148 
00149             if (out_pos >= block_type::size)
00150             {
00151                 run[out_block].key = keyobj(*(cur_blk->elem));
00152 
00153                 if (cur_blk < end_blk)
00154                 {
00155                     next_read->block = cur_blk;
00156                     next_read->req = read_reqs + out_block;
00157                     read_reqs[out_block] = NULL;
00158                     bids[out_block] = next_read->bid = *(it++);
00159 
00160                     write_reqs[out_block] = cur_blk->write(
00161                         run[out_block].bid,
00162                         // postpone read of block from next run
00163                         // after write of block from this run
00164                         *(next_read++));
00165                 }
00166                 else
00167                 {
00168                     write_reqs[out_block] = cur_blk->write(run[out_block].bid);
00169                 }
00170 
00171                 cur_blk++;
00172                 elem = cur_blk->elem;
00173                 out_block++;
00174                 out_pos = 0;
00175             }
00176         }
00177     }
00178 
00179     template <
00180         typename block_type,
00181         typename run_type,
00182         typename input_bid_iterator,
00183         typename key_extractor>
00184     void
00185     create_runs(
00186         input_bid_iterator it,
00187         run_type ** runs,
00188         const unsigned_type nruns,
00189         const unsigned_type m2,
00190         key_extractor keyobj)
00191     {
00192         typedef typename block_type::value_type type;
00193         typedef typename block_type::bid_type bid_type;
00194         typedef typename key_extractor::key_type key_type;
00195         typedef type_key<type, key_type> type_key_;
00196 
00197         block_manager * bm = block_manager::get_instance();
00198         block_type * Blocks1 = new block_type[m2];
00199         block_type * Blocks2 = new block_type[m2];
00200         bid_type * bids = new bid_type[m2];
00201         type_key_ * refs1 = new type_key_[m2 * Blocks1->size];
00202         type_key_ * refs2 = new type_key_[m2 * Blocks1->size];
00203         request_ptr * read_reqs = new request_ptr[m2];
00204         request_ptr * write_reqs = new request_ptr[m2];
00205         write_completion_handler<block_type, bid_type> * next_run_reads =
00206             new write_completion_handler<block_type, bid_type>[m2];
00207 
00208         run_type * run;
00209         run = *runs;
00210         int_type run_size = (*runs)->size();
00211         key_type offset = 0;
00212         const int log_k1 =
00213             static_cast<int>(ceil(log2((m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) ?
00214                                        (double(m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE)) : 2.)));
00215         const int log_k2 = int(log2(double(m2 * Blocks1->size))) - log_k1 - 1;
00216         STXXL_VERBOSE("log_k1: " << log_k1 << " log_k2:" << log_k2);
00217         const int_type k1 = 1 << log_k1;
00218         const int_type k2 = 1 << log_k2;
00219         int_type * bucket1 = new int_type[k1];
00220         int_type * bucket2 = new int_type[k2];
00221         int_type i;
00222 
00223         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00224 
00225         for (i = 0; i < run_size; i++)
00226         {
00227             bids[i] = *(it++);
00228             read_reqs[i] = Blocks1[i].read(bids[i]);
00229         }
00230 
00231         unsigned_type k = 0;
00232         const int shift1 = sizeof(key_type) * 8 - log_k1;
00233         const int shift2 = shift1 - log_k2;
00234         STXXL_VERBOSE("shift1: " << shift1 << " shift2:" << shift2);
00235 
00236         for ( ; k < nruns; k++)
00237         {
00238             run = runs[k];
00239             run_size = run->size();
00240 
00241             std::fill(bucket1, bucket1 + k1, 0);
00242 
00243             type_key_ * ref_ptr = refs1;
00244             for (i = 0; i < run_size; i++)
00245             {
00246                 if (k)
00247                     write_reqs[i]->wait();
00248 
00249                 read_reqs[i]->wait();
00250                 bm->delete_block(bids[i]);
00251 
00252                 classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
00253             }
00254 
00255             exclusive_prefix_sum(bucket1, k1);
00256             classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
00257                      offset, shift1);
00258 
00259             int_type out_block = 0;
00260             int_type out_pos = 0;
00261             unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
00262 
00263             // recurse on each bucket
00264             type_key_ * c = refs2;
00265             type_key_ * d = refs1;
00266             block_type * cur_blk = Blocks2;
00267             block_type * end_blk = Blocks2 + next_run_size;
00268             write_completion_handler<block_type, bid_type> * next_read = next_run_reads;
00269 
00270             for (i = 0; i < k1; i++)
00271             {
00272                 type_key_ * cEnd = refs2 + bucket1[i];
00273                 type_key_ * dEnd = refs1 + bucket1[i];
00274 
00275                 l1sort(c, cEnd, d, bucket2, k2,
00276                        offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2);         // key_type,key_type,... paranoia
00277 
00278                 write_out(
00279                     d, dEnd, cur_blk, end_blk,
00280                     out_block, out_pos, *run, next_read, bids,
00281                     write_reqs, read_reqs, it, keyobj);
00282 
00283                 c = cEnd;
00284                 d = dEnd;
00285             }
00286 
00287             std::swap(Blocks1, Blocks2);
00288         }
00289 
00290         wait_all(write_reqs, m2);
00291 
00292         delete[] bucket1;
00293         delete[] bucket2;
00294         delete[] refs1;
00295         delete[] refs2;
00296         delete[] Blocks1;
00297         delete[] Blocks2;
00298         delete[] bids;
00299         delete[] next_run_reads;
00300         delete[] read_reqs;
00301         delete[] write_reqs;
00302     }
00303 
00304     template <typename block_type,
00305               typename prefetcher_type,
00306               typename key_extractor>
00307     struct run_cursor2_cmp
00308     {
00309         typedef run_cursor2<block_type, prefetcher_type> cursor_type;
00310         key_extractor keyobj;
00311         run_cursor2_cmp(key_extractor keyobj_)
00312         {
00313             keyobj = keyobj_;
00314         }
00315         inline bool operator () (const cursor_type & a, const cursor_type & b)
00316         {
00317             if (UNLIKELY(b.empty()))
00318                 return true;
00319             // sentinel emulation
00320             if (UNLIKELY(a.empty()))
00321                 return false;
00322             //sentinel emulation
00323 
00324             return (keyobj(a.current()) < keyobj(b.current()));
00325         }
00326 
00327     private:
00328         run_cursor2_cmp() { }
00329     };
00330 
00331 
00332     template <typename record_type, typename key_extractor>
00333     class key_comparison : public std::binary_function<record_type, record_type, bool>
00334     {
00335         key_extractor ke;
00336 
00337     public:
00338         key_comparison() { }
00339         key_comparison(key_extractor ke_) : ke(ke_) { }
00340         bool operator () (const record_type & a, const record_type & b) const
00341         {
00342             return ke(a) < ke(b);
00343         }
00344     };
00345 
00346 
00347     template <typename block_type, typename run_type, typename key_ext_>
00348     bool check_ksorted_runs(run_type ** runs,
00349                             unsigned_type nruns,
00350                             unsigned_type m,
00351                             key_ext_ keyext)
00352     {
00353         typedef typename block_type::value_type value_type;
00354 
00355         //STXXL_VERBOSE1("check_sorted_runs  Runs: "<<nruns);
00356         STXXL_MSG("check_sorted_runs  Runs: " << nruns);
00357         unsigned_type irun = 0;
00358         for (irun = 0; irun < nruns; ++irun)
00359         {
00360             const unsigned_type nblocks_per_run = runs[irun]->size();
00361             unsigned_type blocks_left = nblocks_per_run;
00362             block_type * blocks = new block_type[m];
00363             request_ptr * reqs = new request_ptr[m];
00364             value_type last = keyext.min_value();
00365 
00366             for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00367             {
00368                 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00369                 const unsigned_type nelements = nblocks * block_type::size;
00370                 blocks_left -= nblocks;
00371 
00372                 for (unsigned_type j = 0; j < nblocks; ++j)
00373                 {
00374                     reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
00375                 }
00376                 wait_all(reqs, reqs + nblocks);
00377 
00378                 if (off && (keyext(blocks[0][0]) < keyext(last)))
00379                 {
00380                     STXXL_MSG("check_sorted_runs  wrong first value in the run " << irun);
00381                     STXXL_MSG(" first value: " << blocks[0][0] << " with key" << keyext(blocks[0][0]));
00382                     STXXL_MSG(" last  value: " << last << " with key" << keyext(last));
00383                     for (unsigned_type k = 0; k < block_type::size; ++k)
00384                         STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k] << " key: " << keyext(blocks[0][k]));
00385                     return false;
00386                 }
00387 
00388                 for (unsigned_type j = 0; j < nblocks; ++j)
00389                 {
00390                     if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
00391                     {
00392                         STXXL_MSG("check_sorted_runs  wrong trigger in the run " << irun << " block " << (off + j));
00393                         STXXL_MSG("                   trigger value: " << (*runs[irun])[off + j].key);
00394                         STXXL_MSG("Data in the block:");
00395                         for (unsigned_type k = 0; k < block_type::size; ++k)
00396                             STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
00397 
00398                         STXXL_MSG("BIDS:");
00399                         for (unsigned_type k = 0; k < nblocks; ++k)
00400                         {
00401                             if (k == j)
00402                                 STXXL_MSG("Bad one comes next.");
00403                             STXXL_MSG("BID " << (off + k) << " is: " << ((*runs[irun])[off + k].bid));
00404                         }
00405 
00406                         return false;
00407                     }
00408                 }
00409                 if (!stxxl::is_sorted(
00410 #if 1
00411                         ArrayOfSequencesIterator<
00412                             block_type, typename block_type::value_type, block_type::size
00413                             >(blocks, 0),
00414                         ArrayOfSequencesIterator<
00415                             block_type, typename block_type::value_type, block_type::size
00416                             >(blocks, nelements),
00417 #else
00418                         TwoToOneDimArrayRowAdaptor<
00419                             block_type, value_type, block_type::size
00420                             >(blocks, 0),
00421                         TwoToOneDimArrayRowAdaptor<
00422                             block_type, value_type, block_type::size
00423                             >(blocks, nelements),
00424 #endif
00425                         key_comparison<value_type, key_ext_>()))
00426                 {
00427                     STXXL_MSG("check_sorted_runs  wrong order in the run " << irun);
00428                     STXXL_MSG("Data in blocks:");
00429                     for (unsigned_type j = 0; j < nblocks; ++j)
00430                     {
00431                         for (unsigned_type k = 0; k < block_type::size; ++k)
00432                             STXXL_MSG("     Element " << k << " in block " << (off + j) << " is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
00433                     }
00434                     STXXL_MSG("BIDS:");
00435                     for (unsigned_type k = 0; k < nblocks; ++k)
00436                     {
00437                         STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00438                     }
00439 
00440                     return false;
00441                 }
00442                 last = blocks[nblocks - 1][block_type::size - 1];
00443             }
00444 
00445             assert(blocks_left == 0);
00446             delete[] reqs;
00447             delete[] blocks;
00448         }
00449 
00450         return true;
00451     }
00452 
00453 
00454     template <typename block_type, typename run_type, typename key_extractor>
00455     void merge_runs(run_type ** in_runs, unsigned_type nruns, run_type * out_run, unsigned_type _m, key_extractor keyobj)
00456     {
00457         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00458         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00459 
00460         unsigned_type i;
00461         run_type consume_seq(out_run->size());
00462 
00463         int_type * prefetch_seq = new int_type[out_run->size()];
00464 
00465         typename run_type::iterator copy_start = consume_seq.begin();
00466         for (i = 0; i < nruns; i++)
00467         {
00468             // TODO: try to avoid copy
00469             copy_start = std::copy(
00470                 in_runs[i]->begin(),
00471                 in_runs[i]->end(),
00472                 copy_start);
00473         }
00474         std::stable_sort(consume_seq.begin(), consume_seq.end());
00475 
00476         unsigned disks_number = config::get_instance()->disks_number();
00477 
00478 #ifdef PLAY_WITH_OPT_PREF
00479         const int_type n_write_buffers = 4 * disks_number;
00480 #else
00481         const int_type n_prefetch_buffers = STXXL_MAX(int_type(2 * disks_number), (3 * (int_type(_m) - int_type(nruns)) / 4));
00482         STXXL_VERBOSE("Prefetch buffers " << n_prefetch_buffers);
00483         const int_type n_write_buffers = STXXL_MAX(int_type(2 * disks_number), int_type(_m) - int_type(nruns) - int_type(n_prefetch_buffers));
00484         STXXL_VERBOSE("Write buffers " << n_write_buffers);
00485         // heuristic
00486         const int_type n_opt_prefetch_buffers = 2 * int_type(disks_number) + (3 * (int_type(n_prefetch_buffers) - int_type(2 * disks_number))) / 10;
00487         STXXL_VERBOSE("Prefetch buffers " << n_opt_prefetch_buffers);
00488 #endif
00489 
00490 #ifdef SORT_OPTIMAL_PREFETCHING
00491         compute_prefetch_schedule(
00492             consume_seq,
00493             prefetch_seq,
00494             n_opt_prefetch_buffers,
00495             disks_number);
00496 #else
00497         for (i = 0; i < out_run->size(); i++)
00498             prefetch_seq[i] = i;
00499 
00500 #endif
00501 
00502 
00503         prefetcher_type prefetcher(consume_seq.begin(),
00504                                    consume_seq.end(),
00505                                    prefetch_seq,
00506                                    nruns + n_prefetch_buffers);
00507 
00508         buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00509 
00510         unsigned_type out_run_size = out_run->size();
00511 
00512         run_cursor2_cmp<block_type, prefetcher_type, key_extractor> cmp(keyobj);
00513         loser_tree<
00514             run_cursor_type,
00515             run_cursor2_cmp<block_type, prefetcher_type, key_extractor>,
00516             block_type::size> losers(&prefetcher, nruns, cmp);
00517 
00518 
00519         block_type * out_buffer = writer.get_free_block();
00520 
00521         for (i = 0; i < out_run_size; i++)
00522         {
00523             losers.multi_merge(out_buffer->elem);
00524             (*out_run)[i].key = keyobj(out_buffer->elem[0]);
00525             out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00526         }
00527 
00528         delete[] prefetch_seq;
00529 
00530         block_manager * bm = block_manager::get_instance();
00531         for (i = 0; i < nruns; i++)
00532         {
00533             unsigned_type sz = in_runs[i]->size();
00534             for (unsigned_type j = 0; j < sz; j++)
00535                 bm->delete_block((*in_runs[i])[j].bid);
00536 
00537             delete in_runs[i];
00538         }
00539     }
00540 
00541 
00542     template <typename block_type,
00543               typename alloc_strategy,
00544               typename input_bid_iterator,
00545               typename key_extractor>
00546 
00547     simple_vector<trigger_entry<typename block_type::bid_type, typename key_extractor::key_type> > *
00548     ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj)
00549     {
00550         typedef typename block_type::value_type type;
00551         typedef typename key_extractor::key_type key_type;
00552         typedef typename block_type::bid_type bid_type;
00553         typedef trigger_entry<bid_type, typename key_extractor::key_type> trigger_entry_type;
00554         typedef simple_vector<trigger_entry_type> run_type;
00555         typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00556 
00557         unsigned_type m2 = div_and_round_up(_m, 2);
00558         const unsigned_type m2_rf = m2 * block_type::raw_size /
00559                                     (block_type::raw_size + block_type::size * sizeof(type_key<type, key_type>));
00560         STXXL_VERBOSE("Reducing number of blocks in a run from " << m2 << " to " <<
00561                       m2_rf << " due to key size: " << sizeof(typename key_extractor::key_type) << " bytes");
00562         m2 = m2_rf;
00563         unsigned_type full_runs = _n / m2;
00564         unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00565         unsigned_type nruns = full_runs + partial_runs;
00566         unsigned_type i;
00567 
00568         config * cfg = config::get_instance();
00569         block_manager * mng = block_manager::get_instance();
00570         int ndisks = cfg->disks_number();
00571 
00572         STXXL_VERBOSE("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00573 
00574         double begin = timestamp(), after_runs_creation, end;
00575 
00576         run_type ** runs = new run_type *[nruns];
00577 
00578         for (i = 0; i < full_runs; i++)
00579             runs[i] = new run_type(m2);
00580 
00581 
00582 #ifdef INTERLEAVED_ALLOC
00583         if (partial_runs)
00584         {
00585             unsigned_type last_run_size = _n - full_runs * m2;
00586             runs[i] = new run_type(last_run_size);
00587 
00588             mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
00589                             RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
00590                                                       (runs, 0, nruns, last_run_size),
00591                             RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
00592                                                       (runs, _n, nruns, last_run_size));
00593         }
00594         else
00595             mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
00596                             RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
00597                                                       (runs, 0, nruns),
00598                             RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
00599                                                       (runs, _n, nruns));
00600 
00601 #else
00602         if (partial_runs)
00603             runs[i] = new run_type(_n - full_runs * m2);
00604 
00605 
00606         for (i = 0; i < nruns; i++)
00607         {
00608             mng->new_blocks(alloc_strategy(0, ndisks),
00609                             trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
00610                             trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
00611         }
00612 #endif
00613 
00614         create_runs<block_type,
00615                     run_type,
00616                     input_bid_iterator,
00617                     key_extractor>(input_bids, runs, nruns, m2, keyobj);
00618 
00619         after_runs_creation = timestamp();
00620 
00621         double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00622 
00623         disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00624 
00625         // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) at once
00626 
00627         const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(_m))))));
00628         run_type ** new_runs;
00629 
00630         while (nruns > 1)
00631         {
00632             int_type new_nruns = div_and_round_up(nruns, merge_factor);
00633             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00634                           " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00635 
00636             new_runs = new run_type *[new_nruns];
00637 
00638             int_type runs_left = nruns;
00639             int_type cur_out_run = 0;
00640             int_type blocks_in_new_run = 0;
00641 
00642             while (runs_left > 0)
00643             {
00644                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00645                 blocks_in_new_run = 0;
00646                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00647                     blocks_in_new_run += runs[i]->size();
00648 
00649                 // allocate run
00650                 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00651                 runs_left -= runs2merge;
00652             }
00653             // allocate blocks in the new runs
00654             if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
00655             {
00656                 // if we sort a file we can reuse the input bids for the output
00657                 input_bid_iterator cur = input_bids;
00658                 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00659                 {
00660                     (*new_runs[0])[i++].bid = *cur;
00661                 }
00662 
00663                 bid_type & firstBID = (*new_runs[0])[0].bid;
00664                 if (firstBID.storage->get_id() != -1)
00665                 {
00666                     // the first block does not belong to the file
00667                     // need to reallocate it
00668                     mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
00669                 }
00670                 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00671                 if (lastBID.storage->get_id() != -1)
00672                 {
00673                     // the first block does not belong to the file
00674                     // need to reallocate it
00675                     mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
00676                 }
00677             }
00678             else
00679             {
00680                 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
00681                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00682                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00683             }
00684 
00685 
00686             // merge all
00687             runs_left = nruns;
00688             cur_out_run = 0;
00689             while (runs_left > 0)
00690             {
00691                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00692 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00693                 assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
00694 #endif
00695                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00696                 merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left,
00697                                                                 runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
00698                 runs_left -= runs2merge;
00699             }
00700 
00701             nruns = new_nruns;
00702             delete[] runs;
00703             runs = new_runs;
00704         }
00705 
00706         run_type * result = *runs;
00707         delete[] runs;
00708 
00709         end = timestamp();
00710 
00711         STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Run creation time: " <<
00712                       after_runs_creation - begin << " s");
00713         STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00714         STXXL_VERBOSE(*stats::get_instance());
00715         UNUSED(begin + io_wait_after_rf);
00716 
00717         return result;
00718     }
00719 }
00720 
00721 
00757 
00758 
00759 
00760 
00761 
00762 
00763 
00764 template <typename ExtIterator_, typename KeyExtractor_>
00765 void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__)
00766 {
00767     typedef simple_vector<ksort_local::trigger_entry<typename ExtIterator_::bid_type,
00768                                                      typename KeyExtractor_::key_type> > run_type;
00769     typedef typename ExtIterator_::vector_type::value_type value_type;
00770     typedef typename ExtIterator_::block_type block_type;
00771 
00772 
00773     unsigned_type n = 0;
00774     block_manager * mng = block_manager::get_instance();
00775 
00776     first_.flush();
00777 
00778     if ((last_ - first_) * sizeof(value_type) < M__)
00779     {
00780         stl_in_memory_sort(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>(keyobj));
00781     }
00782     else
00783     {
00784         assert(2 * block_type::raw_size <= M__);
00785 
00786         if (first_.block_offset())
00787         {
00788             if (last_.block_offset())            // first and last element reside
00789             // not in the beginning of the block
00790             {
00791                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00792                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00793                 typename ExtIterator_::bid_type first_bid, last_bid;
00794                 request_ptr req;
00795 
00796                 req = first_block->read(*first_.bid());
00797                 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);                // try to overlap
00798                 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00799                 req->wait();
00800 
00801 
00802                 req = last_block->read(*last_.bid());
00803 
00804                 unsigned_type i = 0;
00805                 for ( ; i < first_.block_offset(); i++)
00806                 {
00807                     first_block->elem[i] = keyobj.min_value();
00808                 }
00809 
00810                 req->wait();
00811 
00812                 req = first_block->write(first_bid);
00813                 for (i = last_.block_offset(); i < block_type::size; i++)
00814                 {
00815                     last_block->elem[i] = keyobj.max_value();
00816                 }
00817 
00818                 req->wait();
00819 
00820                 req = last_block->write(last_bid);
00821 
00822                 n = last_.bid() - first_.bid() + 1;
00823 
00824                 std::swap(first_bid, *first_.bid());
00825                 std::swap(last_bid, *last_.bid());
00826 
00827                 req->wait();
00828 
00829                 delete first_block;
00830                 delete last_block;
00831 
00832                 run_type * out =
00833                     ksort_local::ksort_blocks<
00834                         typename ExtIterator_::block_type,
00835                         typename ExtIterator_::vector_type::alloc_strategy,
00836                         typename ExtIterator_::bids_container_iterator,
00837                         KeyExtractor_>
00838                                                   (first_.bid(), n, M__ / block_type::raw_size, keyobj);
00839 
00840 
00841                 first_block = new typename ExtIterator_::block_type;
00842                 last_block = new typename ExtIterator_::block_type;
00843                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00844                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00845                 request_ptr * reqs = new request_ptr[2];
00846 
00847                 reqs[0] = first_block->read(first_bid);
00848                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00849                 wait_all(reqs, 2);
00850 
00851                 reqs[0] = last_block->read(last_bid);
00852                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00853 
00854                 for (i = first_.block_offset(); i < block_type::size; i++)
00855                 {
00856                     first_block->elem[i] = sorted_first_block->elem[i];
00857                 }
00858                 wait_all(reqs, 2);
00859 
00860                 req = first_block->write(first_bid);
00861 
00862                 for (i = 0; i < last_.block_offset(); i++)
00863                 {
00864                     last_block->elem[i] = sorted_last_block->elem[i];
00865                 }
00866 
00867                 req->wait();
00868 
00869 
00870                 req = last_block->write(last_bid);
00871 
00872                 mng->delete_block(out->begin()->bid);
00873                 mng->delete_block((*out)[out->size() - 1].bid);
00874 
00875                 *first_.bid() = first_bid;
00876                 *last_.bid() = last_bid;
00877 
00878                 typename run_type::iterator it = out->begin();
00879                 it++;
00880                 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
00881                 cur_bid++;
00882 
00883                 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
00884                 {
00885                     *cur_bid = (*it).bid;
00886                 }
00887 
00888                 delete first_block;
00889                 delete sorted_first_block;
00890                 delete sorted_last_block;
00891                 delete[] reqs;
00892                 delete out;
00893 
00894                 req->wait();
00895 
00896                 delete last_block;
00897             }
00898             else
00899             {
00900                 // first element resides
00901                 // not in the beginning of the block
00902 
00903                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00904                 typename ExtIterator_::bid_type first_bid;
00905                 request_ptr req;
00906 
00907                 req = first_block->read(*first_.bid());
00908                 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);                // try to overlap
00909                 req->wait();
00910 
00911 
00912                 unsigned_type i = 0;
00913                 for ( ; i < first_.block_offset(); i++)
00914                 {
00915                     first_block->elem[i] = keyobj.min_value();
00916                 }
00917 
00918                 req = first_block->write(first_bid);
00919 
00920                 n = last_.bid() - first_.bid();
00921 
00922                 std::swap(first_bid, *first_.bid());
00923 
00924                 req->wait();
00925 
00926                 delete first_block;
00927 
00928                 run_type * out =
00929                     ksort_local::ksort_blocks<
00930                         typename ExtIterator_::block_type,
00931                         typename ExtIterator_::vector_type::alloc_strategy,
00932                         typename ExtIterator_::bids_container_iterator,
00933                         KeyExtractor_>
00934                                                   (first_.bid(), n, M__ / block_type::raw_size, keyobj);
00935 
00936 
00937                 first_block = new typename ExtIterator_::block_type;
00938 
00939                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00940 
00941                 request_ptr * reqs = new request_ptr[2];
00942 
00943                 reqs[0] = first_block->read(first_bid);
00944                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00945                 wait_all(reqs, 2);
00946 
00947                 for (i = first_.block_offset(); i < block_type::size; i++)
00948                 {
00949                     first_block->elem[i] = sorted_first_block->elem[i];
00950                 }
00951 
00952                 req = first_block->write(first_bid);
00953 
00954                 mng->delete_block(out->begin()->bid);
00955 
00956                 *first_.bid() = first_bid;
00957 
00958                 typename run_type::iterator it = out->begin();
00959                 it++;
00960                 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
00961                 cur_bid++;
00962 
00963                 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
00964                 {
00965                     *cur_bid = (*it).bid;
00966                 }
00967 
00968                 *cur_bid = (*it).bid;
00969 
00970                 delete sorted_first_block;
00971                 delete[] reqs;
00972                 delete out;
00973 
00974                 req->wait();
00975 
00976                 delete first_block;
00977             }
00978         }
00979         else
00980         {
00981             if (last_.block_offset())            // last element resides
00982             // not in the beginning of the block
00983             {
00984                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00985                 typename ExtIterator_::bid_type last_bid;
00986                 request_ptr req;
00987                 unsigned_type i;
00988 
00989                 req = last_block->read(*last_.bid());
00990                 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00991                 req->wait();
00992 
00993                 for (i = last_.block_offset(); i < block_type::size; i++)
00994                 {
00995                     last_block->elem[i] = keyobj.max_value();
00996                 }
00997 
00998                 req = last_block->write(last_bid);
00999 
01000                 n = last_.bid() - first_.bid() + 1;
01001 
01002                 std::swap(last_bid, *last_.bid());
01003 
01004                 req->wait();
01005 
01006                 delete last_block;
01007 
01008                 run_type * out =
01009                     ksort_local::ksort_blocks<
01010                         typename ExtIterator_::block_type,
01011                         typename ExtIterator_::vector_type::alloc_strategy,
01012                         typename ExtIterator_::bids_container_iterator,
01013                         KeyExtractor_>
01014                                                   (first_.bid(), n, M__ / block_type::raw_size, keyobj);
01015 
01016 
01017                 last_block = new typename ExtIterator_::block_type;
01018                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
01019                 request_ptr * reqs = new request_ptr[2];
01020 
01021                 reqs[0] = last_block->read(last_bid);
01022                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
01023                 wait_all(reqs, 2);
01024 
01025                 for (i = 0; i < last_.block_offset(); i++)
01026                 {
01027                     last_block->elem[i] = sorted_last_block->elem[i];
01028                 }
01029 
01030                 req = last_block->write(last_bid);
01031 
01032                 mng->delete_block((*out)[out->size() - 1].bid);
01033 
01034                 *last_.bid() = last_bid;
01035 
01036                 typename run_type::iterator it = out->begin();
01037                 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
01038 
01039                 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
01040                 {
01041                     *cur_bid = (*it).bid;
01042                 }
01043 
01044                 delete sorted_last_block;
01045                 delete[] reqs;
01046                 delete out;
01047 
01048                 req->wait();
01049 
01050                 delete last_block;
01051             }
01052             else
01053             {
01054                 // first and last element reside in the beginning of blocks
01055                 n = last_.bid() - first_.bid();
01056 
01057                 run_type * out =
01058                     ksort_local::ksort_blocks<
01059                         typename ExtIterator_::block_type,
01060                         typename ExtIterator_::vector_type::alloc_strategy,
01061                         typename ExtIterator_::bids_container_iterator,
01062                         KeyExtractor_>
01063                                                   (first_.bid(), n, M__ / block_type::raw_size, keyobj);
01064 
01065                 typename run_type::iterator it = out->begin();
01066                 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
01067 
01068                 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
01069                 {
01070                     *cur_bid = (*it).bid;
01071                 }
01072 
01073                 delete out;
01074             }
01075         }
01076     }
01077 
01078 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01079     assert(stxxl::is_sorted(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>()));
01080 #endif
01081 }
01082 
01083 template <typename record_type>
01084 struct ksort_defaultkey
01085 {
01086     typedef typename record_type::key_type key_type;
01087     key_type operator () (const record_type & obj) const
01088     {
01089         return obj.key();
01090     }
01091     record_type max_value() const
01092     {
01093         return record_type::max_value();
01094     }
01095     record_type min_value() const
01096     {
01097         return record_type::min_value();
01098     }
01099 };
01100 
01101 
01108 
01117 template <typename ExtIterator_>
01118 void ksort(ExtIterator_ first_, ExtIterator_ last_, unsigned_type M__)
01119 {
01120     ksort(first_, last_,
01121           ksort_defaultkey<typename ExtIterator_::vector_type::value_type>(), M__);
01122 }
01123 
01124 
01126 
01127 __STXXL_END_NAMESPACE
01128 
01129 #endif // !STXXL_KSORT_HEADER
01130 // vim: et:ts=4:sw=4