tlx
parallel_sample_sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/sort/strings/parallel_sample_sort.hpp
3  *
4  * Parallel Super Scalar String Sample Sort (pS5)
5  *
6  * See also Timo Bingmann, Andreas Eberle, and Peter Sanders. "Engineering
7  * parallel string sorting." Algorithmica 77.1 (2017): 235-286.
8  *
9  * Part of tlx - http://panthema.net/tlx
10  *
11  * Copyright (C) 2013-2019 Timo Bingmann <tb@panthema.net>
12  *
13  * All rights reserved. Published under the Boost Software License, Version 1.0
14  ******************************************************************************/
15 
16 #ifndef TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
17 #define TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <cmath>
22 #include <cstdlib>
23 #include <cstring>
24 #include <random>
25 #include <vector>
26 
30 
31 #include <tlx/logger/core.hpp>
32 #include <tlx/math/clz.hpp>
33 #include <tlx/math/ctz.hpp>
34 #include <tlx/meta/enable_if.hpp>
35 #include <tlx/multi_timer.hpp>
36 #include <tlx/simple_vector.hpp>
37 #include <tlx/thread_pool.hpp>
38 #include <tlx/unused.hpp>
39 
40 namespace tlx {
41 namespace sort_strings_detail {
42 
43 class PS5SortStep;
44 
45 /******************************************************************************/
46 //! Parallel Super Scalar String Sample Sort Parameter Struct
47 
49 {
50 public:
51  static const bool debug_steps = false;
52  static const bool debug_jobs = false;
53 
54  static const bool debug_bucket_size = false;
55  static const bool debug_recursion = false;
56  static const bool debug_lcp = false;
57 
58  static const bool debug_result = false;
59 
60  //! enable/disable various sorting levels
61  static const bool enable_parallel_sample_sort = true;
62  static const bool enable_sequential_sample_sort = true;
63  static const bool enable_sequential_mkqs = true;
64 
65  //! terminate sort after first parallel sample sort step
66  static const bool use_only_first_sortstep = false;
67 
68  //! enable work freeing
69  static const bool enable_work_sharing = true;
70 
71  //! whether the base sequential_threshold() on the remaining unsorted string
72  //! set or on the whole string set.
73  static const bool enable_rest_size = false;
74 
75  //! key type for sample sort: 32-bit or 64-bit
76  typedef size_t key_type;
77 
78  //! depth of classification tree used in sample sorts
79  static const unsigned TreeBits = 10;
80 
81  //! classification tree variant for sample sorts
83 
84  //! threshold to run sequential small sorts
85  static const size_t smallsort_threshold = 1024 * 1024;
86  //! threshold to switch to insertion sort
87  static const size_t inssort_threshold = 32;
88 };
89 
90 /******************************************************************************/
91 //! Parallel Super Scalar String Sample Sort Context
92 
93 template <typename Parameters>
94 class PS5Context : public Parameters
95 {
96 public:
97  //! total size of input
98  size_t total_size;
99 
100  //! number of remaining strings to sort
101  std::atomic<size_t> rest_size;
102 
103  //! counters
105 
106  //! timers for individual sorting steps
108 
109  //! number of threads overall
110  size_t num_threads;
111 
112  //! thread pool
114 
115  //! context constructor
116  PS5Context(size_t _thread_num)
118  num_threads(_thread_num),
119  threads_(_thread_num)
120  { }
121 
122  //! enqueue a new job in the thread pool
123  template <typename StringPtr>
124  void enqueue(PS5SortStep* sstep, const StringPtr& strptr, size_t depth);
125 
126  //! return sequential sorting threshold
128  size_t threshold = this->smallsort_threshold;
129  if (this->enable_rest_size) {
130  return std::max(threshold, rest_size / num_threads);
131  }
132  else {
133  return std::max(threshold, total_size / num_threads);
134  }
135  }
136 
137  //! decrement number of unordered strings
138  void donesize(size_t n) {
139  if (this->enable_rest_size)
140  rest_size -= n;
141  }
142 };
143 
144 /******************************************************************************/
145 //! LCP calculation of Splitter Strings
146 
147 template <typename KeyType>
148 static inline unsigned char
149 lcpKeyType(const KeyType& a, const KeyType& b) {
150  // XOR both values and count the number of zero bytes
151  return clz(a ^ b) / 8;
152 }
153 
154 template <typename KeyType>
155 static inline unsigned char
156 lcpKeyDepth(const KeyType& a) {
157  // count number of non-zero bytes
158  return sizeof(KeyType) - (ctz(a) / 8);
159 }
160 
161 //! return the d-th character in the (swapped) key
162 template <typename KeyType>
163 static inline unsigned char
164 getCharAtDepth(const KeyType& a, unsigned char d) {
165  return static_cast<unsigned char>(a >> (8 * (sizeof(KeyType) - 1 - d)));
166 }
167 
168 /******************************************************************************/
169 //! PS5SortStep Top-Level Class to Keep Track of Substeps
170 
172 {
173 private:
174  //! Number of substeps still running
175  std::atomic<size_t> substep_working_;
176 
177  //! Pure virtual function called by substep when all substeps are done.
178  virtual void substep_all_done() = 0;
179 
180 protected:
182 
183  virtual ~PS5SortStep() {
184  assert(substep_working_ == 0);
185  }
186 
187  //! Register new substep
188  void substep_add() {
190  }
191 
192 public:
193  //! Notify superstep that the currently substep is done.
195  assert(substep_working_ > 0);
196  if (--substep_working_ == 0)
198  }
199 };
200 
201 /******************************************************************************/
202 //! LCP Calculation for Finished Sample Sort Steps
203 
204 template <size_t bktnum, typename Context, typename Classify,
205  typename StringPtr, typename BktSizeType>
206 void ps5_sample_sort_lcp(const Context& ctx, const Classify& classifier,
207  const StringPtr& strptr, size_t depth,
208  const BktSizeType* bkt) {
209  assert(!strptr.flipped());
210 
211  const typename StringPtr::StringSet& strset = strptr.active();
212  typedef typename Context::key_type key_type;
213 
214  size_t b = 0; // current bucket number
215  key_type prevkey = 0; // previous key
216 
217  // the following while loops only check b < bktnum when b is odd,
218  // because bktnum is always odd. We need a goto to jump into the loop,
219  // as b == 0 start even.
220  goto even_first;
221 
222  // find first non-empty bucket
223  while (b < bktnum)
224  {
225  // odd bucket: = bkt
226  if (bkt[b] != bkt[b + 1]) {
227  prevkey = classifier.get_splitter(b / 2);
228  assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
229  break;
230  }
231  ++b;
232 even_first:
233  // even bucket: <, << or > bkt
234  if (bkt[b] != bkt[b + 1]) {
235  prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
236  break;
237  }
238  ++b;
239  }
240  ++b;
241 
242  // goto depends on whether the first non-empty bucket was odd or
243  // even. the while loop below encodes this in the program counter.
244  if (b < bktnum && b % 2 == 0) goto even_bucket;
245 
246  // find next non-empty bucket
247  while (b < bktnum)
248  {
249  // odd bucket: = bkt
250  if (bkt[b] != bkt[b + 1]) {
251  key_type thiskey = classifier.get_splitter(b / 2);
252  assert(thiskey == get_key_at<key_type>(strset, bkt[b], depth));
253 
254  int rlcp = lcpKeyType(prevkey, thiskey);
255  strptr.set_lcp(bkt[b], depth + rlcp);
256  // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
257 
258  TLX_LOGC(ctx.debug_lcp)
259  << "LCP at odd-bucket " << b
260  << " [" << bkt[b] << "," << bkt[b + 1] << ")"
261  << " is " << depth + rlcp;
262 
263  prevkey = thiskey;
264  assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
265  }
266  ++b;
267 even_bucket:
268  // even bucket: <, << or > bkt
269  if (bkt[b] != bkt[b + 1]) {
270  key_type thiskey = get_key_at<key_type>(strset, bkt[b], depth);
271 
272  int rlcp = lcpKeyType(prevkey, thiskey);
273  strptr.set_lcp(bkt[b], depth + rlcp);
274  // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
275 
276  TLX_LOGC(ctx.debug_lcp)
277  << "LCP at even-bucket " << b
278  << " [" << bkt[b] << "," << bkt[b + 1] << ")"
279  << " is " << depth + rlcp;
280 
281  prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
282  }
283  ++b;
284  }
285 }
286 
287 /******************************************************************************/
288 //! SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts
289 
290 template <typename Context, typename StringPtr, typename BktSizeType>
292 {
293 public:
294  Context& ctx_;
295 
296  //! parent sort step
298 
300  size_t depth_;
302 
303  typedef typename Context::key_type key_type;
304  typedef typename StringPtr::StringSet StringSet;
305  typedef BktSizeType bktsize_type;
306 
307  PS5SmallsortJob(Context& ctx, PS5SortStep* pstep,
308  const StringPtr& strptr, size_t depth)
309  : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
310  TLX_LOGC(ctx_.debug_steps)
311  << "enqueue depth=" << depth_
312  << " size=" << strptr_.size() << " flip=" << strptr_.flipped();
313  }
314 
316  mtimer_.stop();
317  ctx_.mtimer.add(mtimer_);
318  }
319 
321  size_t bktcache_size_ = 0;
322 
323  void run() {
324  mtimer_.start("sequ_ss");
325 
326  size_t n = strptr_.size();
327 
328  TLX_LOGC(ctx_.debug_jobs)
329  << "Process PS5SmallsortJob " << this << " of size " << n;
330 
331  // create anonymous wrapper job
332  this->substep_add();
333 
334  if (ctx_.enable_sequential_sample_sort && n >= ctx_.smallsort_threshold)
335  {
336  bktcache_.resize(n * sizeof(uint16_t));
338  }
339  else
340  {
341  mtimer_.start("mkqs");
343  }
344 
345  // finish wrapper job, handler delete's this
346  this->substep_notify_done();
347  }
348 
349  /*------------------------------------------------------------------------*/
350  //! Stack of Recursive Sample Sort Steps
351 
353  {
354  public:
356  size_t idx_;
357  size_t depth_;
358 
359  using StringSet = typename StringPtr::StringSet;
360  using bktsize_type = BktSizeType;
361 
362  typename Context::Classify classifier;
363 
364  static const size_t num_splitters = Context::Classify::num_splitters;
365  static const size_t bktnum = 2 * num_splitters + 1;
366 
367  unsigned char splitter_lcp[num_splitters + 1];
369 
370  SeqSampleSortStep(Context& ctx, const StringPtr& strptr, size_t depth,
371  uint16_t* bktcache)
372  : strptr_(strptr), idx_(0), depth_(depth) {
373  size_t n = strptr_.size();
374 
375  // step 1: select splitters with oversampling
376 
377  const size_t oversample_factor = 2;
378  const size_t sample_size = oversample_factor * num_splitters;
379 
380  simple_vector<key_type> samples(sample_size);
381 
382  const StringSet& strset = strptr_.active();
383 
384  std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
385 
386  for (size_t i = 0; i < sample_size; ++i)
387  samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
388 
389  std::sort(samples.begin(), samples.end());
390 
391  classifier.build(samples.data(), sample_size, splitter_lcp);
392 
393  // step 2: classify all strings
394 
395  classifier.classify(
396  strset, strset.begin(), strset.end(), bktcache, depth_);
397 
398  // step 2.5: count bucket sizes
399 
400  bktsize_type bktsize[bktnum];
401  memset(bktsize, 0, bktnum * sizeof(bktsize_type));
402 
403  for (size_t si = 0; si < n; ++si)
404  ++bktsize[bktcache[si]];
405 
406  // step 3: inclusive prefix sum
407 
408  bkt[0] = bktsize[0];
409  for (unsigned int i = 1; i < bktnum; ++i) {
410  bkt[i] = bkt[i - 1] + bktsize[i];
411  }
412  assert(bkt[bktnum - 1] == n);
413  bkt[bktnum] = n;
414 
415  // step 4: premute out-of-place
416 
417  const StringSet& strB = strptr_.active();
418  // get alternative shadow pointer array
419  const StringSet& sorted = strptr_.shadow();
420  typename StringSet::Iterator sbegin = sorted.begin();
421 
422  for (typename StringSet::Iterator str = strB.begin();
423  str != strB.end(); ++str, ++bktcache)
424  *(sbegin + --bkt[*bktcache]) = std::move(*str);
425 
426  // bkt is afterwards the exclusive prefix sum of bktsize
427 
428  // statistics
429 
430  ++ctx.sequ_ss_steps;
431  }
432 
433  void calculate_lcp(Context& ctx) {
434  TLX_LOGC(ctx.debug_lcp) << "Calculate LCP after sample sort step";
435  if (strptr_.with_lcp) {
436  ps5_sample_sort_lcp<bktnum>(ctx, classifier, strptr_, depth_, bkt);
437  }
438  }
439  };
440 
441  size_t ss_front_ = 0;
442  std::vector<SeqSampleSortStep> ss_stack_;
443 
444  void sort_sample_sort(const StringPtr& strptr, size_t depth) {
445  typedef SeqSampleSortStep Step;
446 
447  assert(ss_front_ == 0);
448  assert(ss_stack_.size() == 0);
449 
450  uint16_t* bktcache = reinterpret_cast<uint16_t*>(bktcache_.data());
451 
452  // sort first level
453  ss_stack_.emplace_back(ctx_, strptr, depth, bktcache);
454 
455  // step 5: "recursion"
456 
457  while (ss_stack_.size() > ss_front_)
458  {
459  Step& s = ss_stack_.back();
460  size_t i = s.idx_++; // process the bucket s.idx_
461 
462  if (i < Step::bktnum)
463  {
464  size_t bktsize = s.bkt[i + 1] - s.bkt[i];
465 
466  StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
467 
468  // i is even -> bkt[i] is less-than bucket
469  if (i % 2 == 0)
470  {
471  if (bktsize == 0) {
472  // empty bucket
473  }
474  else if (bktsize < ctx_.smallsort_threshold)
475  {
476  assert(i / 2 <= Step::num_splitters);
477  if (i == Step::bktnum - 1)
478  TLX_LOGC(ctx_.debug_recursion)
479  << "Recurse[" << s.depth_ << "]: > bkt "
480  << i << " size " << bktsize << " no lcp";
481  else
482  TLX_LOGC(ctx_.debug_recursion)
483  << "Recurse[" << s.depth_ << "]: < bkt "
484  << i << " size " << bktsize << " lcp "
485  << int(s.splitter_lcp[i / 2] & 0x7F);
486 
487  ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
489  sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
490  }
491  else
492  {
493  if (i == Step::bktnum - 1)
494  TLX_LOGC(ctx_.debug_recursion)
495  << "Recurse[" << s.depth_ << "]: > bkt "
496  << i << " size " << bktsize << " no lcp";
497  else
498  TLX_LOGC(ctx_.debug_recursion)
499  << "Recurse[" << s.depth_ << "]: < bkt "
500  << i << " size " << bktsize << " lcp "
501  << int(s.splitter_lcp[i / 2] & 0x7F);
502 
503  ss_stack_.emplace_back(
504  ctx_, sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F), bktcache);
505  }
506  }
507  // i is odd -> bkt[i] is equal bucket
508  else
509  {
510  if (bktsize == 0) {
511  // empty bucket
512  }
513  else if (s.splitter_lcp[i / 2] & 0x80) {
514  // equal-bucket has nullptr-terminated key, done.
515  TLX_LOGC(ctx_.debug_recursion)
516  << "Recurse[" << s.depth_ << "]: = bkt "
517  << i << " size " << bktsize << " is done!";
518  StringPtr spb = sp.copy_back();
519 
520  if (sp.with_lcp) {
521  spb.fill_lcp(
522  s.depth_ + lcpKeyDepth(s.classifier.get_splitter(i / 2)));
523  }
524  ctx_.donesize(bktsize);
525  }
526  else if (bktsize < ctx_.smallsort_threshold)
527  {
528  TLX_LOGC(ctx_.debug_recursion)
529  << "Recurse[" << s.depth_ << "]: = bkt "
530  << i << " size " << bktsize << " lcp keydepth!";
531 
532  ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
533  sort_mkqs_cache(sp, s.depth_ + sizeof(key_type));
534  }
535  else
536  {
537  TLX_LOGC(ctx_.debug_recursion)
538  << "Recurse[" << s.depth_ << "]: = bkt "
539  << i << " size " << bktsize << " lcp keydepth!";
540 
541  ss_stack_.emplace_back(
542  ctx_, sp, s.depth_ + sizeof(key_type), bktcache);
543  }
544  }
545  }
546  else
547  {
548  // finished sort
549  assert(ss_stack_.size() > ss_front_);
550 
551  // after full sort: calculate LCPs at this level
552  ss_stack_.back().calculate_lcp(ctx_);
553 
554  ss_stack_.pop_back();
555  }
556 
557  if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
559  }
560  }
561  }
562 
564  assert(ss_stack_.size() >= ss_front_);
565 
566  if (ss_stack_.size() == ss_front_) {
567  // ss_stack_ is empty, check other stack
568  return mkqs_free_work();
569  }
570 
571  // convert top level of stack into independent jobs
572  TLX_LOGC(ctx_.debug_jobs)
573  << "Freeing top level of PS5SmallsortJob's sample_sort stack";
574 
575  typedef SeqSampleSortStep Step;
576  Step& s = ss_stack_[ss_front_];
577 
578  while (s.idx_ < Step::bktnum)
579  {
580  size_t i = s.idx_++; // process the bucket s.idx_
581 
582  size_t bktsize = s.bkt[i + 1] - s.bkt[i];
583 
584  StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
585 
586  // i is even -> bkt[i] is less-than bucket
587  if (i % 2 == 0)
588  {
589  if (bktsize == 0) {
590  // empty bucket
591  }
592  else
593  {
594  if (i == Step::bktnum - 1)
595  TLX_LOGC(ctx_.debug_recursion)
596  << "Recurse[" << s.depth_ << "]: > bkt "
597  << i << " size " << bktsize << " no lcp";
598  else
599  TLX_LOGC(ctx_.debug_recursion)
600  << "Recurse[" << s.depth_ << "]: < bkt "
601  << i << " size " << bktsize << " lcp "
602  << int(s.splitter_lcp[i / 2] & 0x7F);
603 
604  this->substep_add();
605  ctx_.enqueue(this, sp,
606  s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
607  }
608  }
609  // i is odd -> bkt[i] is equal bucket
610  else
611  {
612  if (bktsize == 0) {
613  // empty bucket
614  }
615  else if (s.splitter_lcp[i / 2] & 0x80) {
616  // equal-bucket has nullptr-terminated key, done.
617  TLX_LOGC(ctx_.debug_recursion)
618  << "Recurse[" << s.depth_ << "]: = bkt "
619  << i << " size " << bktsize << " is done!";
620  StringPtr spb = sp.copy_back();
621 
622  if (sp.with_lcp) {
623  spb.fill_lcp(s.depth_ + lcpKeyDepth(
624  s.classifier.get_splitter(i / 2)));
625  }
626  ctx_.donesize(bktsize);
627  }
628  else
629  {
630  TLX_LOGC(ctx_.debug_recursion)
631  << "Recurse[" << s.depth_ << "]: = bkt "
632  << i << " size " << bktsize << " lcp keydepth!";
633 
634  this->substep_add();
635  ctx_.enqueue(this, sp, s.depth_ + sizeof(key_type));
636  }
637  }
638  }
639 
640  // shorten the current stack
641  ++ss_front_;
642  }
643 
644  /*------------------------------------------------------------------------*/
645  //! Stack of Recursive MKQS Steps
646 
647  static inline int cmp(const key_type& a, const key_type& b) {
648  return (a > b) ? 1 : (a < b) ? -1 : 0;
649  }
650 
651  template <typename Type>
652  static inline size_t
653  med3(Type* A, size_t i, size_t j, size_t k) {
654  if (A[i] == A[j]) return i;
655  if (A[k] == A[i] || A[k] == A[j]) return k;
656  if (A[i] < A[j]) {
657  if (A[j] < A[k]) return j;
658  if (A[i] < A[k]) return k;
659  return i;
660  }
661  else {
662  if (A[j] > A[k]) return j;
663  if (A[i] < A[k]) return i;
664  return k;
665  }
666  }
667 
668  //! Insertion sort the strings only based on the cached characters.
669  static inline void
671  const StringSet& strings = strptr.active();
672  size_t n = strptr.size();
673  size_t pi, pj;
674  for (pi = 1; --n > 0; ++pi) {
675  typename StringSet::String tmps = std::move(strings.at(pi));
676  key_type tmpc = cache[pi];
677  for (pj = pi; pj > 0; --pj) {
678  if (cache[pj - 1] <= tmpc)
679  break;
680  strings.at(pj) = std::move(strings.at(pj - 1));
681  cache[pj] = cache[pj - 1];
682  }
683  strings.at(pj) = std::move(tmps);
684  cache[pj] = tmpc;
685  }
686  }
687 
688  //! Insertion sort, but use cached characters if possible.
689  template <bool CacheDirty>
690  static inline void
691  insertion_sort_cache(const StringPtr& _strptr, key_type* cache, size_t depth) {
692  StringPtr strptr = _strptr.copy_back();
693 
694  if (strptr.size() <= 1) return;
695  if (CacheDirty)
696  return insertion_sort(strptr, depth, /* memory */ 0);
697 
698  insertion_sort_cache_block(strptr, cache);
699 
700  size_t start = 0, bktsize = 1;
701  for (size_t i = 0; i < strptr.size() - 1; ++i) {
702  // group areas with equal cache values
703  if (cache[i] == cache[i + 1]) {
704  ++bktsize;
705  continue;
706  }
707  // calculate LCP between group areas
708  if (start != 0 && strptr.with_lcp) {
709  int rlcp = lcpKeyType(cache[start - 1], cache[start]);
710  strptr.set_lcp(start, depth + rlcp);
711  // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
712  }
713  // sort group areas deeper if needed
714  if (bktsize > 1) {
715  if (cache[start] & 0xFF) {
716  // need deeper sort
718  strptr.sub(start, bktsize), depth + sizeof(key_type),
719  /* memory */ 0);
720  }
721  else {
722  // cache contains nullptr-termination
723  strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
724  }
725  }
726  bktsize = 1;
727  start = i + 1;
728  }
729  // tail of loop for last item
730  if (start != 0 && strptr.with_lcp) {
731  int rlcp = lcpKeyType(cache[start - 1], cache[start]);
732  strptr.set_lcp(start, depth + rlcp);
733  // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
734  }
735  if (bktsize > 1) {
736  if (cache[start] & 0xFF) {
737  // need deeper sort
739  strptr.sub(start, bktsize), depth + sizeof(key_type),
740  /* memory */ 0);
741  }
742  else {
743  // cache contains nullptr-termination
744  strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
745  }
746  }
747  }
748 
749  class MKQSStep
750  {
751  public:
755  size_t idx_;
756  unsigned char eq_recurse_;
757  // typename StringPtr::StringSet::Char dchar_eq_, dchar_gt_;
759 
760  MKQSStep(Context& ctx, const StringPtr& strptr,
761  key_type* cache, size_t depth, bool CacheDirty)
762  : strptr_(strptr), cache_(cache), depth_(depth), idx_(0) {
763  size_t n = strptr_.size();
764 
765  const StringSet& strset = strptr_.active();
766 
767  if (CacheDirty) {
768  typename StringSet::Iterator it = strset.begin();
769  for (size_t i = 0; i < n; ++i, ++it) {
770  cache_[i] = get_key<key_type>(strset, *it, depth);
771  }
772  }
773  // select median of 9
774  size_t p = med3(
775  cache_,
776  med3(cache_, 0, n / 8, n / 4),
777  med3(cache_, n / 2 - n / 8, n / 2, n / 2 + n / 8),
778  med3(cache_, n - 1 - n / 4, n - 1 - n / 8, n - 3));
779  // swap pivot to first position
780  std::swap(strset.at(0), strset.at(p));
781  std::swap(cache_[0], cache_[p]);
782  // save the pivot value
783  key_type pivot = cache_[0];
784  // for immediate LCP calculation
785  key_type max_lt = 0, min_gt = std::numeric_limits<key_type>::max();
786 
787  // indexes into array:
788  // 0 [pivot] 1 [===] leq [<<<] llt [???] rgt [>>>] req [===] n-1
789  size_t leq = 1, llt = 1, rgt = n - 1, req = n - 1;
790  while (true)
791  {
792  while (llt <= rgt)
793  {
794  int r = cmp(cache[llt], pivot);
795  if (r > 0) {
796  min_gt = std::min(min_gt, cache[llt]);
797  break;
798  }
799  else if (r == 0) {
800  std::swap(strset.at(leq), strset.at(llt));
801  std::swap(cache[leq], cache[llt]);
802  leq++;
803  }
804  else {
805  max_lt = std::max(max_lt, cache[llt]);
806  }
807  ++llt;
808  }
809  while (llt <= rgt)
810  {
811  int r = cmp(cache[rgt], pivot);
812  if (r < 0) {
813  max_lt = std::max(max_lt, cache[rgt]);
814  break;
815  }
816  else if (r == 0) {
817  std::swap(strset.at(req), strset.at(rgt));
818  std::swap(cache[req], cache[rgt]);
819  req--;
820  }
821  else {
822  min_gt = std::min(min_gt, cache[rgt]);
823  }
824  --rgt;
825  }
826  if (llt > rgt)
827  break;
828  std::swap(strset.at(llt), strset.at(rgt));
829  std::swap(cache[llt], cache[rgt]);
830  ++llt;
831  --rgt;
832  }
833  // calculate size of areas = < and >, save into struct
834  size_t num_leq = leq, num_req = n - 1 - req;
835  num_eq_ = num_leq + num_req;
836  num_lt_ = llt - leq;
837  num_gt_ = req - rgt;
838  assert(num_eq_ > 0);
839  assert(num_lt_ + num_eq_ + num_gt_ == n);
840 
841  // swap equal values from left to center
842  const size_t size1 = std::min(num_leq, num_lt_);
843  std::swap_ranges(strset.begin(), strset.begin() + size1,
844  strset.begin() + llt - size1);
845  std::swap_ranges(cache, cache + size1, cache + llt - size1);
846 
847  // swap equal values from right to center
848  const size_t size2 = std::min(num_req, num_gt_);
849  std::swap_ranges(strset.begin() + llt, strset.begin() + llt + size2,
850  strset.begin() + n - size2);
851  std::swap_ranges(cache + llt, cache + llt + size2,
852  cache + n - size2);
853 
854  // No recursive sorting if pivot has a zero byte
855  eq_recurse_ = (pivot & 0xFF);
856 
857  // save LCP values for writing into LCP array after sorting further
858  if (strptr_.with_lcp && num_lt_ > 0) {
859  assert(max_lt == *std::max_element(
860  cache_ + 0, cache + num_lt_));
861 
862  lcp_lt_ = lcpKeyType(max_lt, pivot);
863  // dchar_eq_ = getCharAtDepth(pivot, lcp_lt_);
864  TLX_LOGC(ctx.debug_lcp) << "LCP lt with pivot: " << depth_ + lcp_lt_;
865  }
866 
867  // calculate equal area lcp: +1 for the equal zero termination byte
868  lcp_eq_ = lcpKeyDepth(pivot);
869 
870  if (strptr_.with_lcp && num_gt_ > 0) {
871  assert(min_gt == *std::min_element(
872  cache_ + num_lt_ + num_eq_, cache_ + n));
873 
874  lcp_gt_ = lcpKeyType(pivot, min_gt);
875  // dchar_gt_ = getCharAtDepth(min_gt, lcp_gt_);
876  TLX_LOGC(ctx.debug_lcp) << "LCP pivot with gt: " << depth_ + lcp_gt_;
877  }
878 
879  ++ctx.base_sort_steps;
880  }
881 
882  void calculate_lcp() {
883  if (strptr_.with_lcp && num_lt_ > 0) {
885  // strptr_.set_cache(num_lt_, dchar_eq_);
886  }
887 
888  if (strptr_.with_lcp && num_gt_ > 0) {
890  // strptr_.set_cache(num_lt_ + num_eq_, dchar_gt_);
891  }
892  }
893  };
894 
895  size_t ms_front_ = 0;
896  std::vector<MKQSStep> ms_stack_;
897 
898  void sort_mkqs_cache(const StringPtr& strptr, size_t depth) {
899  assert(strcmp(mtimer_.running(), "mkqs") == 0);
900 
901  if (!ctx_.enable_sequential_mkqs ||
902  strptr.size() < ctx_.inssort_threshold) {
903  TLX_LOGC(ctx_.debug_jobs)
904  << "insertion_sort() size "
905  << strptr.size() << " depth " << depth;
906 
907  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
908  insertion_sort(strptr.copy_back(), depth, /* memory */ 0);
909  ctx_.donesize(strptr.size());
910  return;
911  }
912 
913  TLX_LOGC(ctx_.debug_jobs)
914  << "sort_mkqs_cache() size " << strptr.size() << " depth " << depth;
915 
916  if (bktcache_.size() < strptr.size() * sizeof(key_type)) {
917  bktcache_.destroy();
918  bktcache_.resize(strptr.size() * sizeof(key_type));
919  }
920 
921  // reuse bktcache as keycache
922  key_type* cache = reinterpret_cast<key_type*>(bktcache_.data());
923 
924  assert(ms_front_ == 0);
925  assert(ms_stack_.size() == 0);
926 
927  // std::deque is much slower than std::vector, so we use an artificial
928  // pop_front variable.
929  ms_stack_.emplace_back(ctx_, strptr, cache, depth, true);
930 
931  while (ms_stack_.size() > ms_front_)
932  {
933  MKQSStep& ms = ms_stack_.back();
934  ++ms.idx_; // increment here, because stack may change
935 
936  // process the lt-subsequence
937  if (ms.idx_ == 1) {
938  if (ms.num_lt_ == 0) {
939  // empty subsequence
940  }
941  else if (ms.num_lt_ < ctx_.inssort_threshold) {
942  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
943  insertion_sort_cache<false>(ms.strptr_.sub(0, ms.num_lt_),
944  ms.cache_, ms.depth_);
945  ctx_.donesize(ms.num_lt_);
946  }
947  else {
948  ms_stack_.emplace_back(
949  ctx_,
950  ms.strptr_.sub(0, ms.num_lt_),
951  ms.cache_, ms.depth_, false);
952  }
953  }
954  // process the eq-subsequence
955  else if (ms.idx_ == 2) {
956  StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
957 
958  assert(ms.num_eq_ > 0);
959 
960  if (!ms.eq_recurse_) {
961  StringPtr spb = sp.copy_back();
962  spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
963  ctx_.donesize(spb.size());
964  }
965  else if (ms.num_eq_ < ctx_.inssort_threshold) {
966  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
967  insertion_sort_cache<true>(sp, ms.cache_ + ms.num_lt_,
968  ms.depth_ + sizeof(key_type));
969  ctx_.donesize(ms.num_eq_);
970  }
971  else {
972  ms_stack_.emplace_back(
973  ctx_, sp,
974  ms.cache_ + ms.num_lt_,
975  ms.depth_ + sizeof(key_type), true);
976  }
977  }
978  // process the gt-subsequence
979  else if (ms.idx_ == 3) {
980  StringPtr sp = ms.strptr_.sub(
981  ms.num_lt_ + ms.num_eq_, ms.num_gt_);
982 
983  if (ms.num_gt_ == 0) {
984  // empty subsequence
985  }
986  else if (ms.num_gt_ < ctx_.inssort_threshold) {
987  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
988  insertion_sort_cache<false>(
989  sp, ms.cache_ + ms.num_lt_ + ms.num_eq_, ms.depth_);
990  ctx_.donesize(ms.num_gt_);
991  }
992  else {
993  ms_stack_.emplace_back(
994  ctx_, sp,
995  ms.cache_ + ms.num_lt_ + ms.num_eq_,
996  ms.depth_, false);
997  }
998  }
999  // calculate lcps
1000  else {
1001  // finished sort
1002  assert(ms_stack_.size() > ms_front_);
1003 
1004  // calculate LCP after the three parts are sorted
1005  ms_stack_.back().calculate_lcp();
1006 
1007  ms_stack_.pop_back();
1008  }
1009 
1010  if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
1012  }
1013  }
1014  }
1015 
1017  assert(ms_stack_.size() >= ms_front_);
1018 
1019  for (unsigned int fl = 0; fl < 8; ++fl)
1020  {
1021  if (ms_stack_.size() == ms_front_) {
1022  return;
1023  }
1024 
1025  TLX_LOGC(ctx_.debug_jobs)
1026  << "Freeing top level of PS5SmallsortJob's mkqs stack - size "
1027  << ms_stack_.size();
1028 
1029  // convert top level of stack into independent jobs
1030 
1031  MKQSStep& ms = ms_stack_[ms_front_];
1032 
1033  if (ms.idx_ == 0 && ms.num_lt_ != 0)
1034  {
1035  this->substep_add();
1036  ctx_.enqueue(this, ms.strptr_.sub(0, ms.num_lt_), ms.depth_);
1037  }
1038  if (ms.idx_ <= 1) // st.num_eq > 0 always
1039  {
1040  assert(ms.num_eq_ > 0);
1041 
1042  StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
1043 
1044  if (ms.eq_recurse_) {
1045  this->substep_add();
1046  ctx_.enqueue(this, sp, ms.depth_ + sizeof(key_type));
1047  }
1048  else {
1049  StringPtr spb = sp.copy_back();
1050  spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
1051  ctx_.donesize(ms.num_eq_);
1052  }
1053  }
1054  if (ms.idx_ <= 2 && ms.num_gt_ != 0)
1055  {
1056  this->substep_add();
1057  ctx_.enqueue(
1058  this, ms.strptr_.sub(ms.num_lt_ + ms.num_eq_, ms.num_gt_),
1059  ms.depth_);
1060  }
1061 
1062  // shorten the current stack
1063  ++ms_front_;
1064  }
1065  }
1066 
1067  /*------------------------------------------------------------------------*/
1068  // Called When PS5SmallsortJob is Finished
1069 
1070  void substep_all_done() final {
1071  TLX_LOGC(ctx_.debug_recursion)
1072  << "SmallSort[" << depth_ << "] "
1073  << "all substeps done -> LCP calculation";
1074 
1075  while (ms_front_ > 0) {
1076  TLX_LOGC(ctx_.debug_lcp)
1077  << "SmallSort[" << depth_ << "] ms_front_: " << ms_front_;
1078  ms_stack_[--ms_front_].calculate_lcp();
1079  }
1080 
1081  while (ss_front_ > 0) {
1082  TLX_LOGC(ctx_.debug_lcp)
1083  << "SmallSort[" << depth_ << "] ss_front_: " << ss_front_;
1084  ss_stack_[--ss_front_].calculate_lcp(ctx_);
1085  }
1086 
1088  delete this;
1089  }
1090 };
1091 
1092 /******************************************************************************/
1093 //! PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs
1094 
1095 template <typename Context, typename StringPtr>
1097 {
1098 public:
1100  typedef typename StringSet::Iterator StrIterator;
1101  typedef typename Context::key_type key_type;
1102 
1103  //! context
1104  Context& ctx_;
1105  //! parent sort step for notification
1107 
1108  //! string pointers, size, and current sorting depth
1110  size_t depth_;
1111 
1112  //! number of parts into which the strings were split
1113  size_t parts_;
1114  //! size of all parts except the last
1115  size_t psize_;
1116  //! number of threads still working
1117  std::atomic<size_t> pwork_;
1118 
1119  //! classifier instance and variables (contains splitter tree
1120  typename Context::Classify classifier_;
1121 
1122  static const size_t treebits_ = Context::Classify::treebits;
1123  static const size_t num_splitters_ = Context::Classify::num_splitters;
1124  static const size_t bktnum_ = 2 * num_splitters_ + 1;
1125 
1126  //! LCPs of splitters, needed for recursive calls
1127  unsigned char splitter_lcp_[num_splitters_ + 1];
1128 
1129  //! individual bucket array of threads, keep bkt[0] for DistributeJob
1131  //! bucket ids cache, created by classifier and later counted
1133 
1134  /*------------------------------------------------------------------------*/
1135  // Constructor
1136 
1137  PS5BigSortStep(Context& ctx, PS5SortStep* pstep,
1138  const StringPtr& strptr, size_t depth)
1139  : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
1140  // calculate number of parts
1141  parts_ = strptr_.size() / ctx.sequential_threshold() * 2;
1142  if (parts_ == 0) parts_ = 1;
1143 
1144  bkt_.resize(parts_);
1145  bktcache_.resize(parts_);
1146 
1147  psize_ = (strptr.size() + parts_ - 1) / parts_;
1148 
1149  TLX_LOGC(ctx_.debug_steps)
1150  << "enqueue depth=" << depth_
1151  << " size=" << strptr_.size()
1152  << " parts=" << parts_
1153  << " psize=" << psize_
1154  << " flip=" << strptr_.flipped();
1155 
1156  ctx.threads_.enqueue([this]() { sample(); });
1157  ++ctx.para_ss_steps;
1158  }
1159 
1160  virtual ~PS5BigSortStep() { }
1161 
1162  /*------------------------------------------------------------------------*/
1163  // Sample Step
1164 
1165  void sample() {
1166  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1167  TLX_LOGC(ctx_.debug_jobs) << "Process SampleJob @ " << this;
1168 
1169  const size_t oversample_factor = 2;
1170  size_t sample_size = oversample_factor * num_splitters_;
1171 
1172  const StringSet& strset = strptr_.active();
1173  size_t n = strset.size();
1174 
1175  simple_vector<key_type> samples(sample_size);
1176 
1177  std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
1178 
1179  for (size_t i = 0; i < sample_size; ++i)
1180  samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
1181 
1182  std::sort(samples.begin(), samples.end());
1183 
1184  classifier_.build(samples.data(), sample_size, splitter_lcp_);
1185 
1186  // create new jobs
1187  pwork_ = parts_;
1188  for (unsigned int p = 0; p < parts_; ++p) {
1189  ctx_.threads_.enqueue([this, p]() { count(p); });
1190  }
1191  }
1192 
1193  /*------------------------------------------------------------------------*/
1194  // Counting Step
1195 
1196  void count(unsigned int p) {
1197  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1198  TLX_LOGC(ctx_.debug_jobs) << "Process CountJob " << p << " @ " << this;
1199 
1200  const StringSet& strset = strptr_.active();
1201 
1202  StrIterator strB = strset.begin() + p * psize_;
1203  StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1204  if (strE < strB) strE = strB;
1205 
1206  bktcache_[p].resize(strE - strB);
1207  uint16_t* bktcache = bktcache_[p].data();
1208  classifier_.classify(strset, strB, strE, bktcache, depth_);
1209 
1210  bkt_[p].resize(bktnum_ + (p == 0 ? 1 : 0));
1211  size_t* bkt = bkt_[p].data();
1212  memset(bkt, 0, bktnum_ * sizeof(size_t));
1213 
1214  for (uint16_t* bc = bktcache; bc != bktcache + (strE - strB); ++bc)
1215  ++bkt[*bc];
1216 
1217  if (--pwork_ == 0)
1218  count_finished();
1219  }
1220 
1222  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1223  TLX_LOGC(ctx_.debug_jobs) << "Finishing CountJob " << this << " with prefixsum";
1224 
1225  // abort sorting if we're measuring only the top level
1226  if (ctx_.use_only_first_sortstep)
1227  return;
1228 
1229  // inclusive prefix sum over bkt
1230  size_t sum = 0;
1231  for (unsigned int i = 0; i < bktnum_; ++i) {
1232  for (unsigned int p = 0; p < parts_; ++p) {
1233  bkt_[p][i] = (sum += bkt_[p][i]);
1234  }
1235  }
1236  assert(sum == strptr_.size());
1237 
1238  // create new jobs
1239  pwork_ = parts_;
1240  for (unsigned int p = 0; p < parts_; ++p) {
1241  ctx_.threads_.enqueue([this, p]() { distribute(p); });
1242  }
1243  }
1244 
1245  /*------------------------------------------------------------------------*/
1246  // Distribute Step
1247 
1248  void distribute(unsigned int p) {
1249  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1250  TLX_LOGC(ctx_.debug_jobs) << "Process DistributeJob " << p << " @ " << this;
1251 
1252  const StringSet& strset = strptr_.active();
1253 
1254  StrIterator strB = strset.begin() + p * psize_;
1255  StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1256  if (strE < strB) strE = strB;
1257 
1258  // get alternative shadow pointer array
1259  const StringSet& sorted = strptr_.shadow();
1260  typename StringSet::Iterator sbegin = sorted.begin();
1261 
1262  uint16_t* bktcache = bktcache_[p].data();
1263  size_t* bkt = bkt_[p].data();
1264 
1265  for (StrIterator str = strB; str != strE; ++str, ++bktcache)
1266  *(sbegin + --bkt[*bktcache]) = std::move(*str);
1267 
1268  if (p != 0) // p = 0 is needed for recursion into bkts
1269  bkt_[p].destroy();
1270 
1271  bktcache_[p].destroy();
1272 
1273  if (--pwork_ == 0)
1275  }
1276 
1278  TLX_LOGC(ctx_.debug_jobs)
1279  << "Finishing DistributeJob " << this << " with enqueuing subjobs";
1280 
1281  size_t* bkt = bkt_[0].data();
1282  assert(bkt);
1283 
1284  // first processor's bkt pointers are boundaries between bkts, just add sentinel:
1285  assert(bkt[0] == 0);
1286  bkt[bktnum_] = strptr_.size();
1287 
1288  // keep anonymous subjob handle while creating subjobs
1289  this->substep_add();
1290 
1291  size_t i = 0;
1292  while (i < bktnum_ - 1)
1293  {
1294  // i is even -> bkt[i] is less-than bucket
1295  size_t bktsize = bkt[i + 1] - bkt[i];
1296  if (bktsize == 0) {
1297  // empty bucket
1298  }
1299  else if (bktsize == 1) { // just one string pointer, copyback
1300  strptr_.flip(bkt[i], 1).copy_back();
1301  ctx_.donesize(1);
1302  }
1303  else
1304  {
1305  TLX_LOGC(ctx_.debug_recursion)
1306  << "Recurse[" << depth_ << "]: < bkt " << bkt[i]
1307  << " size " << bktsize << " lcp "
1308  << int(splitter_lcp_[i / 2] & 0x7F);
1309  this->substep_add();
1310  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1311  depth_ + (splitter_lcp_[i / 2] & 0x7F));
1312  }
1313  ++i;
1314  // i is odd -> bkt[i] is equal bucket
1315  bktsize = bkt[i + 1] - bkt[i];
1316  if (bktsize == 0) {
1317  // empty bucket
1318  }
1319  else if (bktsize == 1) { // just one string pointer, copyback
1320  strptr_.flip(bkt[i], 1).copy_back();
1321  ctx_.donesize(1);
1322  }
1323  else
1324  {
1325  if (splitter_lcp_[i / 2] & 0x80) {
1326  // equal-bucket has nullptr-terminated key, done.
1327  TLX_LOGC(ctx_.debug_recursion)
1328  << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1329  << " size " << bktsize << " is done!";
1330  StringPtr sp = strptr_.flip(bkt[i], bktsize).copy_back();
1331  sp.fill_lcp(
1332  depth_ + lcpKeyDepth(classifier_.get_splitter(i / 2)));
1333  ctx_.donesize(bktsize);
1334  }
1335  else {
1336  TLX_LOGC(ctx_.debug_recursion)
1337  << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1338  << " size " << bktsize << " lcp keydepth!";
1339  this->substep_add();
1340  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1341  depth_ + sizeof(key_type));
1342  }
1343  }
1344  ++i;
1345  }
1346 
1347  size_t bktsize = bkt[i + 1] - bkt[i];
1348 
1349  if (bktsize == 0) {
1350  // empty bucket
1351  }
1352  else if (bktsize == 1) { // just one string pointer, copyback
1353  strptr_.flip(bkt[i], 1).copy_back();
1354  ctx_.donesize(1);
1355  }
1356  else {
1357  TLX_LOGC(ctx_.debug_recursion)
1358  << "Recurse[" << depth_ << "]: > bkt " << bkt[i]
1359  << " size " << bktsize << " no lcp";
1360  this->substep_add();
1361  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize), depth_);
1362  }
1363 
1364  this->substep_notify_done(); // release anonymous subjob handle
1365 
1366  if (!strptr_.with_lcp)
1367  bkt_[0].destroy();
1368  }
1369 
1370  /*------------------------------------------------------------------------*/
1371  // After Recursive Sorting
1372 
1373  void substep_all_done() final {
1374  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1375  if (strptr_.with_lcp) {
1376  TLX_LOGC(ctx_.debug_steps)
1377  << "pSampleSortStep[" << depth_ << "]: all substeps done.";
1378 
1379  ps5_sample_sort_lcp<bktnum_>(
1380  ctx_, classifier_, strptr_, depth_, bkt_[0].data());
1381  bkt_[0].destroy();
1382  }
1383 
1385  delete this;
1386  }
1387 };
1388 
1389 /******************************************************************************/
1390 // PS5Context::enqueue()
1391 
1392 template <typename Parameters>
1393 template <typename StringPtr>
1395  PS5SortStep* pstep, const StringPtr& strptr, size_t depth) {
1396  if (this->enable_parallel_sample_sort &&
1397  (strptr.size() > sequential_threshold() ||
1398  this->use_only_first_sortstep)) {
1399  new PS5BigSortStep<PS5Context, StringPtr>(*this, pstep, strptr, depth);
1400  }
1401  else {
1402  if (strptr.size() < (1LLU << 32)) {
1404  *this, pstep, strptr, depth);
1405  threads_.enqueue([j]() { j->run(); });
1406  }
1407  else {
1409  *this, pstep, strptr, depth);
1410  threads_.enqueue([j]() { j->run(); });
1411  }
1412  }
1413 }
1414 
1415 /******************************************************************************/
1416 // Externally Callable Sorting Methods
1417 
1418 //! Main Parallel Sample Sort Function. See below for more convenient wrappers.
1419 template <typename PS5Parameters, typename StringPtr>
1420 void parallel_sample_sort_base(const StringPtr& strptr, size_t depth) {
1421 
1422  using Context = PS5Context<PS5Parameters>;
1423  Context ctx(std::thread::hardware_concurrency());
1424  ctx.total_size = strptr.size();
1425  ctx.rest_size = strptr.size();
1426  ctx.num_threads = ctx.threads_.size();
1427 
1428  MultiTimer timer;
1429  timer.start("sort");
1430 
1431  ctx.enqueue(/* pstep */ nullptr, strptr, depth);
1432  ctx.threads_.loop_until_empty();
1433 
1434  timer.stop();
1435 
1436  assert(!ctx.enable_rest_size || ctx.rest_size == 0);
1437 
1438  using BigSortStep = PS5BigSortStep<Context, StringPtr>;
1439 
1440  TLX_LOGC(ctx.debug_result)
1441  << "RESULT"
1442  << " sizeof(key_type)=" << sizeof(typename PS5Parameters::key_type)
1443  << " splitter_treebits=" << size_t(BigSortStep::treebits_)
1444  << " num_splitters=" << size_t(BigSortStep::num_splitters_)
1445  << " num_threads=" << ctx.num_threads
1446  << " enable_work_sharing=" << size_t(ctx.enable_work_sharing)
1447  << " use_restsize=" << size_t(ctx.enable_rest_size)
1448  << " tm_para_ss=" << ctx.mtimer.get("para_ss")
1449  << " tm_seq_ss=" << ctx.mtimer.get("sequ_ss")
1450  << " tm_mkqs=" << ctx.mtimer.get("mkqs")
1451  << " tm_inssort=" << ctx.mtimer.get("inssort")
1452  << " tm_total=" << ctx.mtimer.total()
1453  << " tm_idle="
1454  << (ctx.num_threads * timer.total()) - ctx.mtimer.total()
1455  << " steps_para_sample_sort=" << ctx.para_ss_steps
1456  << " steps_seq_sample_sort=" << ctx.sequ_ss_steps
1457  << " steps_base_sort=" << ctx.base_sort_steps;
1458 }
1459 
1460 //! Parallel Sample Sort Function for a generic StringSet, this allocates the
1461 //! shadow array for flipping.
1462 template <typename PS5Parameters, typename StringPtr>
1465  const StringPtr& strptr, size_t depth, size_t memory = 0) {
1466  tlx::unused(memory);
1467 
1468  typedef typename StringPtr::StringSet StringSet;
1469  const StringSet& strset = strptr.active();
1470 
1472  typedef typename StringSet::Container Container;
1473 
1474  // allocate shadow pointer array
1475  Container shadow = strset.allocate(strset.size());
1476  StringShadowPtr new_strptr(strset, StringSet(shadow));
1477 
1478  parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1479 
1480  StringSet::deallocate(shadow);
1481 }
1482 
1483 //! Parallel Sample Sort Function for a generic StringSet with LCPs, this
1484 //! allocates the shadow array for flipping.
1485 template <typename PS5Parameters, typename StringPtr>
1488  const StringPtr& strptr, size_t depth, size_t memory = 0) {
1489  tlx::unused(memory);
1490 
1491  typedef typename StringPtr::StringSet StringSet;
1492  typedef typename StringPtr::LcpType LcpType;
1493  const StringSet& strset = strptr.active();
1494 
1496  typedef typename StringSet::Container Container;
1497 
1498  // allocate shadow pointer array
1499  Container shadow = strset.allocate(strset.size());
1500  StringShadowLcpPtr new_strptr(strset, StringSet(shadow), strptr.lcp());
1501 
1502  parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1503 
1504  StringSet::deallocate(shadow);
1505 }
1506 
1507 //! Parallel Sample Sort Function with default parameter size for a generic
1508 //! StringSet.
1509 template <typename StringPtr>
1511  const StringPtr& strptr, size_t depth, size_t memory) {
1512  return parallel_sample_sort_params<PS5ParametersDefault>(
1513  strptr, depth, memory);
1514 }
1515 
1516 } // namespace sort_strings_detail
1517 } // namespace tlx
1518 
1519 #endif // !TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
1520 
1521 /******************************************************************************/
MultiTimer can be used to measure time usage of different phases in a program or algorithm.
Definition: multi_timer.hpp:37
void start(const char *timer)
start new timer phase, stop the currently running one.
Definition: multi_timer.cpp:66
void stop()
stop the currently running timer.
Definition: multi_timer.cpp:84
const char * running() const
return name of currently running timer.
double total() const
return total duration of all timers.
RAII Scoped MultiTimer switcher: switches the timer of a MultiTimer on construction and back to old o...
Independent RAII Scoped MultiTimer: contains a MultiTimer which is started with the given timer,...
Simpler non-growing vector without initialization.
iterator data() noexcept
return iterator to beginning of vector
iterator begin() noexcept
return mutable iterator to first element
iterator end() noexcept
return mutable iterator beyond last element
ThreadPool starts a fixed number p of std::threads which process Jobs that are enqueued into a concur...
Definition: thread_pool.hpp:65
PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs.
simple_vector< simple_vector< size_t > > bkt_
individual bucket array of threads, keep bkt[0] for DistributeJob
PS5SortStep * pstep_
parent sort step for notification
std::atomic< size_t > pwork_
number of threads still working
simple_vector< simple_vector< uint16_t > > bktcache_
bucket ids cache, created by classifier and later counted
size_t psize_
size of all parts except the last
StringPtr strptr_
string pointers, size, and current sorting depth
Context::Classify classifier_
classifier instance and variables (contains splitter tree
void substep_all_done() final
Pure virtual function called by substep when all substeps are done.
PS5BigSortStep(Context &ctx, PS5SortStep *pstep, const StringPtr &strptr, size_t depth)
unsigned char splitter_lcp_[num_splitters_+1]
LCPs of splitters, needed for recursive calls.
size_t parts_
number of parts into which the strings were split
Parallel Super Scalar String Sample Sort Context.
void enqueue(PS5SortStep *sstep, const StringPtr &strptr, size_t depth)
enqueue a new job in the thread pool
void donesize(size_t n)
decrement number of unordered strings
std::atomic< size_t > rest_size
number of remaining strings to sort
PS5Context(size_t _thread_num)
context constructor
size_t sequential_threshold()
return sequential sorting threshold
size_t num_threads
number of threads overall
std::atomic< size_t > para_ss_steps
counters
MultiTimer mtimer
timers for individual sorting steps
Parallel Super Scalar String Sample Sort Parameter Struct.
static const size_t smallsort_threshold
threshold to run sequential small sorts
static const bool enable_parallel_sample_sort
enable/disable various sorting levels
static const unsigned TreeBits
depth of classification tree used in sample sorts
size_t key_type
key type for sample sort: 32-bit or 64-bit
static const size_t inssort_threshold
threshold to switch to insertion sort
static const bool enable_rest_size
whether the base sequential_threshold() on the remaining unsorted string set or on the whole string s...
static const bool enable_work_sharing
enable work freeing
static const bool use_only_first_sortstep
terminate sort after first parallel sample sort step
MKQSStep(Context &ctx, const StringPtr &strptr, key_type *cache, size_t depth, bool CacheDirty)
SeqSampleSortStep(Context &ctx, const StringPtr &strptr, size_t depth, uint16_t *bktcache)
SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts.
void sort_mkqs_cache(const StringPtr &strptr, size_t depth)
static void insertion_sort_cache(const StringPtr &_strptr, key_type *cache, size_t depth)
Insertion sort, but use cached characters if possible.
static size_t med3(Type *A, size_t i, size_t j, size_t k)
static void insertion_sort_cache_block(const StringPtr &strptr, key_type *cache)
Insertion sort the strings only based on the cached characters.
PS5SmallsortJob(Context &ctx, PS5SortStep *pstep, const StringPtr &strptr, size_t depth)
void substep_all_done() final
Pure virtual function called by substep when all substeps are done.
std::vector< SeqSampleSortStep > ss_stack_
void sort_sample_sort(const StringPtr &strptr, size_t depth)
static int cmp(const key_type &a, const key_type &b)
Stack of Recursive MKQS Steps.
PS5SortStep Top-Level Class to Keep Track of Substeps.
virtual void substep_all_done()=0
Pure virtual function called by substep when all substeps are done.
void substep_notify_done()
Notify superstep that the currently substep is done.
std::atomic< size_t > substep_working_
Number of substeps still running.
Sample Sort Classification Tree Unrolled, Interleaved, and with Perfect Tree Index Calculations.
Objectified string array pointer array.
Definition: string_ptr.hpp:48
size_t size() const
return valid length
Definition: string_ptr.hpp:66
const StringSet & active() const
return currently active array
Definition: string_ptr.hpp:63
StringPtr sub(size_t offset, size_t sub_size) const
Advance (both) pointers by given offset, return sub-array.
Definition: string_ptr.hpp:69
void fill_lcp(const LcpType &) const
fill entire LCP array with v, excluding the first lcp[0] position!
Definition: string_ptr.hpp:83
static const bool with_lcp
if we want to save the LCPs
Definition: string_ptr.hpp:75
void set_lcp(size_t, const LcpType &) const
set the i-th lcp to v and check its value
Definition: string_ptr.hpp:79
Objectified string array pointer and shadow pointer array for out-of-place swapping of pointers.
Definition: string_ptr.hpp:248
Objectified string array pointer and shadow pointer array for out-of-place swapping of pointers.
Definition: string_ptr.hpp:169
unsigned clz(Integral x)
unsigned ctz(Integral x)
#define TLX_LOGC(cond)
Explicitly specify the condition for logging.
Definition: core.hpp:137
static uint32_t min(uint32_t x, uint32_t y)
Definition: md5.cpp:32
static void sort(Iterator begin, Iterator end, Comparator cmp=Comparator())
Call best known sorting network for up to sixteen elements with given comparison method.
Definition: best.hpp:523
static unsigned char lcpKeyDepth(const KeyType &a)
enable_if<!StringPtr::with_lcp, void >::type parallel_sample_sort_params(const StringPtr &strptr, size_t depth, size_t memory=0)
Parallel Sample Sort Function for a generic StringSet, this allocates the shadow array for flipping.
static enable_if<!StringPtr::with_lcp, void >::type insertion_sort(const StringPtr &strptr, size_t depth, size_t)
Generic insertion sort for abstract string sets.
void ps5_sample_sort_lcp(const Context &ctx, const Classify &classifier, const StringPtr &strptr, size_t depth, const BktSizeType *bkt)
LCP Calculation for Finished Sample Sort Steps.
void parallel_sample_sort_base(const StringPtr &strptr, size_t depth)
Main Parallel Sample Sort Function. See below for more convenient wrappers.
static unsigned char lcpKeyType(const KeyType &a, const KeyType &b)
LCP calculation of Splitter Strings.
void parallel_sample_sort(const StringPtr &strptr, size_t depth, size_t memory)
Parallel Sample Sort Function with default parameter size for a generic StringSet.
static unsigned char getCharAtDepth(const KeyType &a, unsigned char d)
return the d-th character in the (swapped) key
void unused(Types &&...)
Definition: unused.hpp:20
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
swap enclosed object with another counting pointer (no reference counts need change)
SFINAE enable_if – copy of std::enable_if<> with less extra cruft.
Definition: enable_if.hpp:22