OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
quantile.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #pragma once
26 
27 #include "DoubleSort.h"
28 #include "SimpleAllocator.h"
29 #include "VectorView.h"
30 #include "gpu_enabled.h"
31 
32 #ifndef __CUDACC__
33 #include <iomanip>
34 #include <mutex>
35 #include <ostream>
36 #endif
37 
38 #include <cmath>
39 #include <limits>
40 #include <memory>
41 #include <numeric>
42 #include <optional>
43 #include <type_traits>
44 
45 namespace quantile {
46 
47 namespace detail {
48 
49 template <typename RealType, typename IndexType>
50 struct Centroid {
51  RealType sum_;
52  IndexType count_;
53 };
54 
55 template <typename RealType, typename IndexType>
56 struct Centroids {
57  IndexType curr_idx_; // used during mergeCentroids algorithm
58  IndexType next_idx_; // used during mergeCentroids algorithm
59  int inc_; // 1 or -1 : forward or reverse iteration
62  static constexpr RealType infinity = std::numeric_limits<RealType>::infinity();
63  static constexpr RealType nan = std::numeric_limits<RealType>::quiet_NaN();
64  RealType max_{-infinity};
65  RealType min_{infinity};
66 
67  Centroids() = default;
68 
69  DEVICE Centroids(RealType* sums, IndexType* counts, IndexType const size)
70  : sums_(sums, size, size), counts_(counts, size, size) {}
71 
73  : sums_(sums), counts_(counts) {}
74 
76 
77  DEVICE IndexType capacity() const { return sums_.capacity(); }
78 
79  DEVICE void clear() {
80  sums_.clear();
81  counts_.clear();
82  max_ = -infinity;
83  min_ = infinity;
84  }
85 
86  DEVICE IndexType currCount() const { return counts_[curr_idx_]; }
87 
88  DEVICE RealType currMean() const { return mean(curr_idx_); }
89 
90  DEVICE bool hasCurr() const { return curr_idx_ < size(); }
91 
92  DEVICE bool hasNext() const { return next_idx_ < size(); }
93 
94  DEVICE RealType mean(IndexType const i) const { return sums_[i] / counts_[i]; }
95 
96  // Return true if centroid was merged, false if not.
97  DEVICE bool mergeIfFits(Centroids& centroid, IndexType const max_count);
98 
100 
101  DEVICE IndexType nextCount() const { return counts_[next_idx_]; }
102 
103  DEVICE RealType nextSum() const { return sums_[next_idx_]; }
104 
105  // Assumes this->hasNext() and b.hasNext().
106  // Order by (nextMean(), -nextCount()).
107  DEVICE bool operator<(Centroids const& b) const {
108  // lhs < rhs is same as nextMean() < b.nextMean() without division
109  RealType const lhs = nextSum() * b.nextCount();
110  RealType const rhs = b.nextSum() * nextCount();
111  return lhs < rhs || (lhs == rhs && b.nextCount() < nextCount());
112  }
113 
114  DEVICE void push_back(RealType const value, RealType const count) {
115  sums_.push_back(value);
116  counts_.push_back(count);
117  }
118 
119  DEVICE void resetIndices(bool const forward);
120 
121  DEVICE size_t size() const { return sums_.size(); }
122 
123  DEVICE IndexType totalWeight() const {
124  return gpu_enabled::accumulate(counts_.begin(), counts_.end(), IndexType(0));
125  }
126 
127 #ifndef __CUDACC__
128  template <typename RealType2, typename IndexType2>
129  friend std::ostream& operator<<(std::ostream&, Centroids<RealType2, IndexType2> const&);
130 #endif
131 };
132 
133 template <typename RealType, typename IndexType>
138  IndexType prefix_sum_{0}; // Prefix-sum of centroid counts.
139  IndexType const total_weight_;
140  bool const forward_;
141 
142  DEVICE void mergeMinMax();
143  DEVICE void setCurrCentroid();
144 
145  public:
148  bool const forward);
149 
150  // nullptr if !hasNext()
152 
153  DEVICE bool hasNext() const { return buf_->hasNext() || centroids_->hasNext(); }
154 
155  // Merge as many centroids as possible for count <= max_count.
156  DEVICE void merge(IndexType const max_count);
157 
158  // Assume curr_centroid_ is fully merged.
159  DEVICE void next();
160 
161  DEVICE IndexType prefixSum() const { return prefix_sum_; }
162 
163  DEVICE IndexType totalWeight() const { return total_weight_; }
164 };
165 
166 // Instantiated on host for reserving memory for Centroids.
167 template <typename RealType, typename IndexType>
169  // TODO CUDA memory containers
170  std::vector<RealType> sums_;
171  std::vector<IndexType> counts_;
172 
173  public:
174  explicit CentroidsMemory(size_t const size) : sums_(size), counts_(size) {}
175  size_t nbytes() const { return sums_.size() * (sizeof(RealType) + sizeof(IndexType)); }
176  size_t size() const { return sums_.size(); }
177  VectorView<RealType> sums() { return {sums_.data(), sums_.size(), sums_.size()}; }
179  return {counts_.data(), counts_.size(), counts_.size()};
180  }
181 };
182 
183 template <typename RealType, typename IndexType = size_t>
184 class TDigest {
185  // buf_ has two purposes, unless an externally managed buffer is used instead:
186  // * Used as the incoming buffer that gets filled before merging into centroids_.
187  // * After all data has been merged, it is used as a partial sum for centroids_.
190  bool forward_{true}; // alternate direction on each call to mergeCentroids().
191 #ifndef __CUDACC__
192  std::once_flag merge_buffer_final_once_;
193 #endif
194 
195  // simple_allocator_, buf_allocate_, centroids_allocate_ are used only by allocate().
196  std::optional<RealType> const q_{std::nullopt}; // Optional preset quantile parameter.
197  bool const use_linear_scaling_function_{false};
199  IndexType const buf_allocate_{0};
200  IndexType const centroids_allocate_{0};
201 
202  DEVICE RealType max() const {
203  return centroids_.max_;
204  }
205  DEVICE RealType min() const {
206  return centroids_.min_;
207  }
208 
209  DEVICE IndexType maxCardinality(IndexType const sum,
210  IndexType const total_weight,
211  RealType const c);
212 
213  // Require: centroids are sorted by (mean, -count)
215 
216  DEVICE RealType firstCentroid(RealType const x) const;
217  DEVICE RealType interiorCentroid(RealType const x,
218  IndexType const idx1,
219  IndexType const prefix_sum) const;
220  DEVICE RealType lastCentroid(RealType const x, IndexType const N) const;
221  DEVICE RealType oneCentroid(RealType const x) const;
222 
223  DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const;
224 
225  public:
227 
228  TDigest() = default;
229 
231  : centroids_(mem.sums().data(), mem.counts().data(), mem.size()) {
232  centroids_.clear();
233  }
234 
235  DEVICE TDigest(RealType q,
236  SimpleAllocator* simple_allocator,
237  IndexType buf_allocate,
238  IndexType centroids_allocate)
239  : q_(q)
240  , use_linear_scaling_function_(q_ && 0.1 <= *q_ && *q_ <= 0.9)
241  , simple_allocator_(simple_allocator)
242  , buf_allocate_(buf_allocate)
243  , centroids_allocate_(centroids_allocate) {}
244 
246  return centroids_;
247  }
248 
249  // Store value to buf_, and merge when full.
250  DEVICE void add(RealType value);
251 
252  // Allocate memory if needed by simple_allocator_.
253  DEVICE void allocate();
254 
255  DEVICE void mergeBuffer();
256 
257  // Calls mergeBuffer() w/ std::call_once and turns buf_ into a partial sum of counts.
258  DEVICE void mergeBufferFinal();
259 
260  // Import from sorted memory range. Ok to change values during merge.
261  DEVICE void mergeSorted(RealType* sums, IndexType* counts, IndexType size);
262 
263  DEVICE void mergeTDigest(TDigest& t_digest) {
264  mergeBuffer();
265  t_digest.mergeBuffer();
266  mergeCentroids(t_digest.centroids_);
267  }
268 
269  // Write partial sum of centroids_.counts_ into buf and return as a VectorView.
270  DEVICE VectorView<IndexType const> partialSumOfCounts(IndexType* const buf) const;
271 
273  RealType const q) const;
274 
275  // Must be called after mergeBufferFinal().
276  DEVICE RealType quantile(RealType const q) const {
277  return quantile({buf_.counts_.data(), centroids_.size()}, q);
278  }
279 
280  DEVICE RealType quantile() {
281  return q_ ? quantile(*q_) : centroids_.nan;
282  }
283 
284  // Assumes mem is externally managed.
285  DEVICE void setBuffer(Memory& mem) {
287  mem.sums().data(), mem.counts().data(), mem.size());
288  buf_.clear();
289  }
290 
291  // Assumes mem is externally managed.
294  mem.sums().data(), mem.counts().data(), mem.size());
295  centroids_.clear();
296  }
297 
299  VectorView<IndexType> const counts) {
301  centroids_.clear();
302  }
303 
304  // Total number of data points in centroids_.
305  DEVICE IndexType totalWeight() const {
306  return centroids_.totalWeight();
307  }
308 };
309 
310 // Class template member definitions
311 
312 // Class Centroids<>
313 
314 namespace {
315 
316 // Order by (mean, -count)
317 template <typename RealType, typename IndexType>
320  // Assume value1() is positive and use multiplication instead of division.
321  DEVICE bool operator()(Value const& a, Value const& b) const {
322  auto const lhs = a.value0() * b.value1();
323  auto const rhs = b.value0() * a.value1();
324  return lhs < rhs || (lhs == rhs && b.value1() < a.value1());
325  }
326 };
327 
328 } // namespace
329 
330 template <typename RealType, typename IndexType>
332  if (inc_ == -1 && curr_idx_ != 0) {
333  // Shift data to the left left by curr_idx_.
334  // Prefer to copy, but thrust::copy doesn't support overlapping ranges.
335  // Reverse instead, which gets sorted below.
336  // gpu_enabled::copy(sums_.begin() + curr_idx_, sums_.end(), sums_.begin());
337  // gpu_enabled::copy(counts_.begin() + curr_idx_, counts_.end(), counts_.begin());
338  gpu_enabled::reverse(sums_.begin(), sums_.end());
339  gpu_enabled::reverse(counts_.begin(), counts_.end());
340  }
341  // Shift VectorViews to the right by buff.curr_idx_.
342  IndexType const offset = inc_ == 1 ? 0 : buff.curr_idx_;
343  IndexType const buff_size =
344  inc_ == 1 ? buff.curr_idx_ + 1 : buff.size() - buff.curr_idx_;
345  VectorView<RealType> buff_sums(buff.sums_.begin() + offset, buff_size, buff_size);
346  VectorView<IndexType> buff_counts(buff.counts_.begin() + offset, buff_size, buff_size);
347  // Copy buff into sums_ and counts_.
348  IndexType const curr_size = inc_ == 1 ? curr_idx_ + 1 : size() - curr_idx_;
349  IndexType const total_size = curr_size + buff_sums.size();
350  assert(total_size <= sums_.capacity()); // TODO proof of inequality
351  sums_.resize(total_size);
352  gpu_enabled::copy(buff_sums.begin(), buff_sums.end(), sums_.begin() + curr_size);
353  assert(total_size <= counts_.capacity());
354  counts_.resize(total_size);
355  gpu_enabled::copy(buff_counts.begin(), buff_counts.end(), counts_.begin() + curr_size);
356 
357  // Sort sums_ and counts_ by (mean, -count).
358  double_sort::Iterator<RealType, IndexType> const begin(sums_.begin(), counts_.begin());
359  double_sort::Iterator<RealType, IndexType> const end(sums_.end(), counts_.end());
360  gpu_enabled::sort(begin, end, OrderByMeanAscCountDesc<RealType, IndexType>());
361 }
362 
363 // Return true if centroid was merged, false if not.
364 template <typename RealType, typename IndexType>
366  IndexType const max_count) {
367  if (counts_[curr_idx_] + centroid.nextCount() <= max_count) {
368  sums_[curr_idx_] += centroid.nextSum();
369  counts_[curr_idx_] += centroid.nextCount();
370  centroid.next_idx_ += centroid.inc_;
371  return true;
372  }
373  return false;
374 }
375 
376 template <typename RealType, typename IndexType>
378  curr_idx_ += inc_;
379  if (curr_idx_ != next_idx_) {
380  sums_[curr_idx_] = sums_[next_idx_];
381  counts_[curr_idx_] = counts_[next_idx_];
382  }
383  next_idx_ += inc_;
384 }
385 
386 template <typename RealType, typename IndexType>
388  if (forward) {
389  inc_ = 1;
390  curr_idx_ = ~IndexType(0);
391  } else {
392  inc_ = -1;
393  curr_idx_ = size();
394  }
395  static_assert(std::is_unsigned<IndexType>::value,
396  "IndexType must be an unsigned type.");
397  next_idx_ = curr_idx_ + inc_;
398 }
399 
400 #ifndef __CUDACC__
401 template <typename RealType, typename IndexType>
402 std::ostream& operator<<(std::ostream& out,
403  Centroids<RealType, IndexType> const& centroids) {
404  out << "Centroids<" << typeid(RealType).name() << ',' << typeid(IndexType).name()
405  << ">(size(" << centroids.size() << ") curr_idx_(" << centroids.curr_idx_
406  << ") next_idx_(" << centroids.next_idx_ << ") sums_(";
407  for (IndexType i = 0; i < centroids.sums_.size(); ++i) {
408  out << (i ? " " : "") << std::setprecision(20) << centroids.sums_[i];
409  }
410  out << ") counts_(";
411  for (IndexType i = 0; i < centroids.counts_.size(); ++i) {
412  out << (i ? " " : "") << centroids.counts_[i];
413  }
414  return out << "))";
415 }
416 #endif
417 
418 // Class CentroidsMerger<>
419 
420 template <typename RealType, typename IndexType>
424  bool const forward)
425  : buf_(buf)
426  , centroids_(centroids)
427  , total_weight_(centroids->totalWeight() + buf->totalWeight())
428  , forward_(forward) {
429  buf_->resetIndices(forward_);
430  centroids_->resetIndices(forward_);
431  mergeMinMax();
432  setCurrCentroid();
433 }
434 
435 template <typename RealType, typename IndexType>
438  if (buf_->hasNext()) {
439  if (centroids_->hasNext()) {
440  return (*buf_ < *centroids_) == forward_ ? buf_ : centroids_;
441  }
442  return buf_;
443  } else if (centroids_->hasNext()) {
444  return centroids_;
445  }
446  return nullptr;
447 }
448 
449 namespace {
450 
451 // Helper struct for mergeCentroids() for tracking skipped centroids.
452 // A consecutive sequence of incoming centroids with the same mean
453 // and exceed max_count are skipped before expensive reshuffling of memory.
454 template <typename RealType, typename IndexType>
455 class Skipped {
456  struct Data {
457  Centroids<RealType, IndexType>* centroid_{nullptr};
458  IndexType start_{0};
459  IndexType count_merged_{0};
460  IndexType count_skipped_{0};
461  } data_[2];
463 
464  DEVICE static void shiftCentroids(Data& data) {
465  if (data.count_merged_) {
466  shiftRange(data.centroid_->sums_.begin() + data.start_,
467  data.count_skipped_,
468  data.count_merged_,
469  data.centroid_->inc_);
470  shiftRange(data.centroid_->counts_.begin() + data.start_,
471  data.count_skipped_,
472  data.count_merged_,
473  data.centroid_->inc_);
474  data.start_ += data.centroid_->inc_ * data.count_merged_;
475  }
476  }
477  template <typename T>
478  DEVICE static void shiftRange(T* const begin,
479  IndexType skipped,
480  IndexType const merged,
481  int const inc) {
482 #ifdef __CUDACC__
483  T* src = begin + inc * (skipped - 1);
484  T* dst = src + inc * merged;
485  for (; skipped; --skipped, src -= inc, dst -= inc) {
486  *dst = *src;
487  }
488 #else
489  if (inc == 1) {
490  std::copy_backward(begin, begin + skipped, begin + skipped + merged);
491  } else {
492  std::copy(begin + 1 - skipped, begin + 1, begin + 1 - skipped - merged);
493  }
494 #endif
495  }
496 
497  public:
499  return data_[0].centroid_ != centroid;
500  }
502  return mean_.sum_ * next_centroid->nextCount() !=
503  next_centroid->nextSum() * mean_.count_;
504  }
506  IndexType const idx = index(next_centroid);
507  if (data_[idx].count_skipped_) {
508  ++data_[idx].count_merged_;
509  }
510  }
511  DEVICE operator bool() const {
512  return data_[0].centroid_;
513  }
514  // Shift skipped centroids over merged centroids, and rewind next_idx_.
516  shiftCentroids(data_[0]);
517  data_[0].centroid_->next_idx_ = data_[0].start_;
518  if (data_[1].centroid_) {
519  shiftCentroids(data_[1]);
520  data_[1].centroid_->next_idx_ = data_[1].start_;
521  }
522  }
524  mean_ = Centroid<RealType, IndexType>{next_centroid->nextSum(),
525  next_centroid->nextCount()};
526  data_[0] = {next_centroid, next_centroid->next_idx_, 0, 1};
527  next_centroid->next_idx_ += next_centroid->inc_;
528  }
530  IndexType const idx = index(next_centroid);
531  if (idx == 1 && data_[1].centroid_ == nullptr) {
532  data_[1] = {next_centroid, next_centroid->next_idx_, 0, 1};
533  } else {
534  if (data_[idx].count_merged_) {
535  shiftCentroids(data_[idx]);
536  data_[idx].count_merged_ = 0;
537  }
538  ++data_[idx].count_skipped_;
539  }
540  next_centroid->next_idx_ += next_centroid->inc_;
541  }
542 };
543 
544 } // namespace
545 
546 // Merge as many centroids as possible for count <= max_count.
547 template <typename RealType, typename IndexType>
548 DEVICE void CentroidsMerger<RealType, IndexType>::merge(IndexType const max_count) {
549  Skipped<RealType, IndexType> skipped;
550  while (auto* next_centroid = getNextCentroid()) {
551  if (skipped) {
552  if (skipped.isDifferentMean(next_centroid)) {
553  break;
554  } else if (curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
555  skipped.merged(next_centroid);
556  } else {
557  skipped.skipSubsequent(next_centroid);
558  }
559  } else if (!curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
560  skipped.skipFirst(next_centroid);
561  }
562  }
563  if (skipped) {
564  skipped.shiftCentroidsAndSetNext();
565  }
566 }
567 
568 // Track min/max without assuming the data points exist in any particular centroids.
569 // Otherwise to assume and track their existence in the min/max centroids introduces
570 // significant complexity. For example, the min centroid may not remain the min
571 // centroid if it merges into it the 3rd smallest centroid, skipping the 2nd due to
572 // the scaling function constraint.
573 // When the quantile() is calculated, the min/max data points are assumed to exist
574 // in the min/max centroids for the purposes of calculating the quantile within those
575 // centroids.
576 template <typename RealType, typename IndexType>
578  if (centroids_->max_ < buf_->max_) {
579  centroids_->max_ = buf_->max_;
580  }
581  if (buf_->min_ < centroids_->min_) {
582  centroids_->min_ = buf_->min_;
583  }
584 }
585 
586 // Assume curr_centroid_ is fully merged.
587 template <typename RealType, typename IndexType>
589  prefix_sum_ += curr_centroid_->currCount();
590  setCurrCentroid();
591 }
592 
593 template <typename RealType, typename IndexType>
595  if ((curr_centroid_ = getNextCentroid())) {
596  curr_centroid_->moveNextToCurrent();
597  }
598 }
599 
600 // class TDigest<>
601 
602 template <typename RealType, typename IndexType>
604  if (buf_.sums_.full()) {
605  mergeBuffer();
606  }
607  buf_.sums_.push_back(value);
608  buf_.counts_.push_back(1);
609 }
610 
611 // Assumes buf_ is allocated iff centroids_ is allocated.
612 template <typename RealType, typename IndexType>
614  if (buf_.capacity() == 0) {
615  auto* p0 = simple_allocator_->allocate(buf_allocate_ * sizeof(RealType));
616  auto* p1 = simple_allocator_->allocate(buf_allocate_ * sizeof(IndexType));
618  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, buf_allocate_),
619  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, buf_allocate_));
620  p0 = simple_allocator_->allocate(centroids_allocate_ * sizeof(RealType));
621  p1 = simple_allocator_->allocate(centroids_allocate_ * sizeof(IndexType));
622  centroids_ = Centroids<RealType, IndexType>(
623  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, centroids_allocate_),
624  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, centroids_allocate_));
625  }
626 }
627 
628 // Return total_weight * (f_inv(f(q_{i-1})+1) - q_{i-1})
629 template <typename RealType, typename IndexType>
630 DEVICE IndexType
632  IndexType const total_weight,
633  RealType const c) {
634  IndexType const max_bins = centroids_.capacity(); // "compression parameter" delta
635  if (total_weight <= max_bins) {
636  return 0;
637  } else if (use_linear_scaling_function_) {
638  // Whitepaper scaling function k=0.
639  return 2 * total_weight / max_bins;
640  } else {
641  // Whitepaper scaling function k=1.
642  RealType const x = 2.0 * sum / total_weight - 1; // = 2 q_{i-1} - 1
643  RealType const f_inv = 0.5 + 0.5 * std::sin(c + std::asin(x));
644  constexpr RealType eps = 1e-5; // round-off epsilon for floor().
645  IndexType const dsum = static_cast<IndexType>(total_weight * f_inv + eps);
646  return dsum < sum ? 0 : dsum - sum;
647  }
648 }
649 
650 // Assumes buf_ consists only of singletons.
651 template <typename RealType, typename IndexType>
653  if (buf_.size()) {
654  gpu_enabled::sort(buf_.sums_.begin(), buf_.sums_.end());
655  buf_.min_ = buf_.sums_.front();
656  buf_.max_ = buf_.sums_.back();
657  mergeCentroids(buf_);
658  }
659 }
660 
661 // [QE-383] Make concurrent calls to mergeBufferFinal() thread-safe.
662 template <typename RealType, typename IndexType>
664  auto const call_once = [this] {
665  mergeBuffer();
666  assert(centroids_.size() <= buf_.capacity());
667  partialSumOfCounts(buf_.counts_.data());
668  };
669 #ifdef __CUDACC__
670  call_once();
671 #else
672  std::call_once(merge_buffer_final_once_, call_once);
673 #endif
674 }
675 
676 template <typename RealType, typename IndexType>
678  IndexType* counts,
679  IndexType size) {
680  if (size) {
681  if (buf_.capacity() == 0) {
682  buf_ = Centroids<RealType, IndexType>(sums, counts, size); // Set capacity and size
683  } else {
684  buf_.sums_.set(sums, size); // Does not change capacity
685  buf_.counts_.set(counts, size);
686  }
687  gpu_enabled::fill(buf_.counts_.begin(), buf_.counts_.end(), IndexType(1));
688  buf_.min_ = buf_.sums_.front();
689  buf_.max_ = buf_.sums_.back();
690  mergeCentroids(buf_);
691  }
692 }
693 
694 // Require:
695 // * buf centroids are not empty and sorted by (mean, -count)
696 // * buf.min_ and buf.max_ are correctly set.
697 // During filling stage, buf=buf_.
698 // During reduction, buf_=centroids_[i]
699 template <typename RealType, typename IndexType>
702  constexpr RealType two_pi = 6.283185307179586476925286766559005768e+00;
703  // Hoisted constant used in maxCardinality().
704  RealType const c = two_pi / centroids_.capacity();
705  // Loop over sorted sequence of buf and centroids_.
706  // Some latter centroids may be merged into the current centroid, so the number
707  // of iterations is only at most equal to the number of initial centroids.
709  for (CM cm(&buf, &centroids_, forward_); cm.hasNext(); cm.next()) {
710  // cm.prefixSum() == 0 on first iteration.
711  // Max cardinality for current centroid to be fully merged based on scaling function.
712  IndexType const max_cardinality = maxCardinality(cm.prefixSum(), cm.totalWeight(), c);
713  cm.merge(max_cardinality);
714  }
715  // Combine sorted centroids buf[0..curr_idx_] + centroids_[0..curr_idx_] if forward
716  centroids_.appendAndSortCurrent(buf);
717  buf.clear();
718  forward_ ^= true; // alternate merge direction on each call
719 }
720 
721 namespace {
722 template <typename CountsIterator>
723 DEVICE bool isSingleton(CountsIterator itr) {
724  return *itr == 1;
725 }
726 } // namespace
727 
728 // Assumes x < centroids_.counts_.front().
729 template <typename RealType, typename IndexType>
730 DEVICE RealType TDigest<RealType, IndexType>::firstCentroid(RealType const x) const {
731  if (x < 1) {
732  return min();
733  } else if (centroids_.size() == 1) {
734  return oneCentroid(x);
735  } else if (centroids_.counts_.front() == 2) {
736  RealType const sum = centroids_.sums_.front();
737  return x == 1 ? 0.5 * sum : sum - min();
738  } else {
739  RealType const count = centroids_.counts_.front();
740  RealType const dx = x - RealType(0.5) * (1 + count);
741  RealType const mean = (centroids_.sums_.front() - min()) / (count - 1);
742  return mean + slope(0, 0 < dx) * dx;
743  }
744 }
745 
746 // x is between first and last centroids.
747 template <typename RealType, typename IndexType>
748 DEVICE RealType
750  IndexType const idx1,
751  IndexType const prefix_sum) const {
752  if (isSingleton(centroids_.counts_.begin() + idx1)) {
753  RealType const sum1 = centroids_.sums_[idx1];
754  if (x == prefix_sum - centroids_.counts_[idx1]) {
755  if (isSingleton(centroids_.counts_.begin() + idx1 - 1)) {
756  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
757  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
758  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
759  }
760  }
761  return sum1;
762  } else {
763  RealType const dx = x + RealType(0.5) * centroids_.counts_[idx1] - prefix_sum;
764  IndexType const idx2 = idx1 + 2 * (0 < dx) - 1;
765  return centroids_.mean(idx1) + slope(idx1, idx2) * dx;
766  }
767 }
768 
769 // Assumes N - centroids_.counts_.back() <= x < N, and there is more than 1 centroid.
770 template <typename RealType, typename IndexType>
772  IndexType const N) const {
773  if (N - 1 < x) {
774  return max();
775  }
776  IndexType const idx1 = centroids_.size() - 1;
777  RealType const sum1 = centroids_.sums_[idx1];
778  IndexType const count1 = centroids_.counts_[idx1];
779  if (count1 == 1) { // => x == N - 1
780  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
781  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
782  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
783  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
784  } else {
785  return sum1;
786  }
787  } else if (count1 == 2) { // => 3 <= N
788  if (x == N - 1) {
789  return 0.5 * sum1;
790  } else if (x == N - 2) {
791  RealType const sum2 = centroids_.sums_[idx1 - 1];
792  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
793  return 0.5 * (sum2 + sum1 - max());
794  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
795  return 0.5 * (sum2 - min() + sum1 - max());
796  }
797  }
798  return sum1 - max();
799  } else { // => 3 <= count1
800  RealType const dx = x + RealType(0.5) * (count1 + 1) - N;
801  RealType const mean = (sum1 - max()) / (count1 - 1);
802  return mean + slope(idx1, idx1 - (dx < 0)) * dx;
803  }
804 }
805 
806 // Assumes there is only 1 centroid, and 1 <= x < centroids_.counts_.front().
807 template <typename RealType, typename IndexType>
808 DEVICE RealType TDigest<RealType, IndexType>::oneCentroid(RealType const x) const {
809  IndexType const N = centroids_.counts_.front();
810  if (N - 1 < x) { // includes case N == 1
811  return max();
812  } else if (N == 2) { // x == 1
813  return 0.5 * centroids_.sums_.front();
814  } else if (N == 3) { // 1 <= x <= 2
815  if (x == 2) {
816  return 0.5 * (centroids_.sums_.front() - min());
817  } else {
818  RealType const s = centroids_.sums_.front() - max();
819  return x == 1 ? 0.5 * s : s - min();
820  }
821  } else { // 3 < N
822  RealType const dx = x - RealType(0.5) * N;
823  RealType const mean = (centroids_.sums_.front() - (min() + max())) / (N - 2);
824  RealType const slope = 2 * (0 < dx ? max() - mean : mean - min()) / (N - 2);
825  return mean + slope * dx;
826  }
827 }
828 
829 // Return a partial sum of centroids_.counts_ stored in buf.
830 template <typename RealType, typename IndexType>
832  IndexType* const buf) const {
833  gpu_enabled::partial_sum(centroids_.counts_.begin(), centroids_.counts_.end(), buf);
834  return {buf, centroids_.size()};
835 }
836 
837 template <typename RealType, typename IndexType>
838 DEVICE RealType
840  RealType const q) const {
841  if (centroids_.size()) {
842  IndexType const N = partial_sum.back();
843  RealType const x = q * N;
844  auto const it1 = gpu_enabled::upper_bound(partial_sum.begin(), partial_sum.end(), x);
845  if (it1 == partial_sum.begin()) {
846  return firstCentroid(x);
847  } else if (it1 == partial_sum.end()) { // <==> 1 <= q
848  return max();
849  } else if (it1 + 1 == partial_sum.end()) {
850  return lastCentroid(x, N);
851  } else {
852  return interiorCentroid(x, it1 - partial_sum.begin(), *it1);
853  }
854  } else {
855  return centroids_.nan;
856  }
857 }
858 
859 // Requirement: 1 < M and 0 <= idx1, idx2 < M where M = Number of centroids.
860 // Return slope of line segment connecting idx1 and idx2.
861 // If equal then assume it is the section contained within an extrema.
862 // Centroid for idx1 is not a singleton, but idx2 may be.
863 template <typename RealType, typename IndexType>
865  IndexType idx2) const {
866  IndexType const M = centroids_.size();
867  if (idx1 == idx2) { // Line segment is contained in either the first or last centroid.
868  RealType const n = static_cast<RealType>(centroids_.counts_[idx1]);
869  RealType const s = centroids_.sums_[idx1];
870  return idx1 == 0 ? 2 * (s - n * min()) / ((n - 1) * (n - 1))
871  : 2 * (n * max() - s) / ((n - 1) * (n - 1));
872  } else {
873  bool const min1 = idx1 == 0; // idx1 is the min centroid
874  bool const max1 = idx1 == M - 1; // idx1 is the max centroid
875  bool const min2 = idx2 == 0; // idx2 is the min centroid
876  bool const max2 = idx2 == M - 1; // idx2 is the max centroid
877  RealType const n1 = static_cast<RealType>(centroids_.counts_[idx1] - min1 - max1);
878  RealType const s1 = centroids_.sums_[idx1] - (min1 ? min() : max1 ? max() : 0);
879  RealType const s2 = centroids_.sums_[idx2] - (min2 ? min() : max2 ? max() : 0);
880  if (isSingleton(centroids_.counts_.begin() + idx2)) {
881  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - s1) / (n1 * n1);
882  } else {
883  RealType const n2 = static_cast<RealType>(centroids_.counts_[idx2] - min2 - max2);
884  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - n2 * s1) / (n1 * n2 * (n1 + n2));
885  }
886  }
887 }
888 
889 } // namespace detail
890 
892 
893 } // namespace quantile
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
static constexpr RealType infinity
Definition: quantile.h:62
DEVICE void push_back(RealType const value, RealType const count)
Definition: quantile.h:114
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
std::vector< RealType > sums_
Definition: quantile.h:170
DEVICE size_type capacity() const
Definition: VectorView.h:60
Centroids< RealType, IndexType > * buf_
Definition: quantile.h:135
DEVICE void skipSubsequent(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:529
std::ostream & operator<<(std::ostream &out, Centroids< RealType, IndexType > const &centroids)
Definition: quantile.h:402
bool const use_linear_scaling_function_
Definition: quantile.h:197
CentroidsMemory(size_t const size)
Definition: quantile.h:174
DEVICE RealType mean(IndexType const i) const
Definition: quantile.h:94
DEVICE bool index(Centroids< RealType, IndexType > *centroid) const
Definition: quantile.h:498
DEVICE void resetIndices(bool const forward)
Definition: quantile.h:387
DEVICE Centroids< RealType, IndexType > & centroids()
Definition: quantile.h:245
DEVICE void moveNextToCurrent()
Definition: quantile.h:377
DEVICE void add(RealType value)
Definition: quantile.h:603
DEVICE VectorView< IndexType const > partialSumOfCounts(IndexType *const buf) const
Definition: quantile.h:831
DEVICE RealType quantile(RealType const q) const
Definition: quantile.h:276
std::once_flag merge_buffer_final_once_
Definition: quantile.h:192
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
DEVICE void merged(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:505
DEVICE bool mergeIfFits(Centroids &centroid, IndexType const max_count)
Definition: quantile.h:365
DEVICE bool hasNext() const
Definition: quantile.h:92
DEVICE TDigest(Memory &mem)
Definition: quantile.h:230
DEVICE void mergeBuffer()
Definition: quantile.h:652
DEVICE IndexType totalWeight() const
Definition: quantile.h:305
VectorView< RealType > sums()
Definition: quantile.h:177
DEVICE T * data()
Definition: VectorView.h:64
VectorView< RealType > sums_
Definition: quantile.h:60
VectorView< IndexType > counts_
Definition: quantile.h:61
std::vector< IndexType > counts_
Definition: quantile.h:171
DEVICE void clear()
Definition: VectorView.h:63
DEVICE void mergeBufferFinal()
Definition: quantile.h:663
#define DEVICE
constexpr double a
Definition: Utm.h:32
DEVICE RealType currMean() const
Definition: quantile.h:88
DEVICE RealType quantile()
Definition: quantile.h:280
DEVICE void setCentroids(Memory &mem)
Definition: quantile.h:292
DEVICE size_type size() const
Definition: VectorView.h:83
DEVICE TDigest(RealType q, SimpleAllocator *simple_allocator, IndexType buf_allocate, IndexType centroids_allocate)
Definition: quantile.h:235
Centroids< RealType, IndexType > * centroids_
Definition: quantile.h:136
DEVICE RealType max() const
Definition: quantile.h:202
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
DEVICE void set(T *data, size_type const size)
Definition: VectorView.h:79
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
DEVICE Centroids(VectorView< RealType > sums, VectorView< IndexType > counts)
Definition: quantile.h:72
DEVICE T * begin() const
Definition: VectorView.h:59
DEVICE CentroidsMerger(Centroids< RealType, IndexType > *buf, Centroids< RealType, IndexType > *centroids, bool const forward)
Definition: quantile.h:421
DEVICE void mergeTDigest(TDigest &t_digest)
Definition: quantile.h:263
DEVICE IndexType nextCount() const
Definition: quantile.h:101
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
DEVICE void setCentroids(VectorView< RealType > const sums, VectorView< IndexType > const counts)
Definition: quantile.h:298
DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const
Definition: quantile.h:864
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
Centroids< RealType, IndexType > buf_
Definition: quantile.h:188
IndexType const buf_allocate_
Definition: quantile.h:199
DEVICE RealType nextSum() const
Definition: quantile.h:103
DEVICE RealType lastCentroid(RealType const x, IndexType const N) const
Definition: quantile.h:771
std::optional< RealType > const q_
Definition: quantile.h:196
DEVICE void skipFirst(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:523
DEVICE void allocate()
Definition: quantile.h:613
static DEVICE void shiftRange(T *const begin, IndexType skipped, IndexType const merged, int const inc)
Definition: quantile.h:478
DEVICE void setBuffer(Memory &mem)
Definition: quantile.h:285
DEVICE bool operator()(Value const &a, Value const &b) const
Definition: quantile.h:321
static constexpr RealType nan
Definition: quantile.h:63
DEVICE Centroids< RealType, IndexType > * getNextCentroid() const
Definition: quantile.h:437
DEVICE IndexType prefixSum() const
Definition: quantile.h:161
DEVICE Centroids(RealType *sums, IndexType *counts, IndexType const size)
Definition: quantile.h:69
SimpleAllocator *const simple_allocator_
Definition: quantile.h:198
DEVICE void merge(IndexType const max_count)
Definition: quantile.h:548
DEVICE bool operator<(Centroids const &b) const
Definition: quantile.h:107
DEVICE bool hasNext() const
Definition: quantile.h:153
IndexType const centroids_allocate_
Definition: quantile.h:200
Centroids< RealType, IndexType > * curr_centroid_
Definition: quantile.h:137
DEVICE IndexType maxCardinality(IndexType const sum, IndexType const total_weight, RealType const c)
Definition: quantile.h:631
DEVICE size_t size() const
Definition: quantile.h:121
DEVICE IndexType currCount() const
Definition: quantile.h:86
DEVICE IndexType totalWeight() const
Definition: quantile.h:123
DEVICE bool isDifferentMean(Centroids< RealType, IndexType > *next_centroid) const
Definition: quantile.h:501
DEVICE T0 value0() const
Definition: DoubleSort.h:87
constexpr unsigned N
Definition: Utm.h:110
DEVICE IndexType capacity() const
Definition: quantile.h:77
DEVICE IndexType totalWeight() const
Definition: quantile.h:163
DEVICE RealType interiorCentroid(RealType const x, IndexType const idx1, IndexType const prefix_sum) const
Definition: quantile.h:749
DEVICE RealType oneCentroid(RealType const x) const
Definition: quantile.h:808
DEVICE T1 value1() const
Definition: DoubleSort.h:90
DEVICE void mergeCentroids(Centroids< RealType, IndexType > &)
Definition: quantile.h:700
string name
Definition: setup.in.py:72
constexpr double n
Definition: Utm.h:38
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
DEVICE T & back()
Definition: VectorView.h:57
DEVICE RealType min() const
Definition: quantile.h:205
DEVICE RealType firstCentroid(RealType const x) const
Definition: quantile.h:730
DEVICE bool isSingleton(CountsIterator itr)
Definition: quantile.h:723
DEVICE void appendAndSortCurrent(Centroids &buff)
Definition: quantile.h:331
DEVICE bool hasCurr() const
Definition: quantile.h:90
DEVICE void mergeSorted(RealType *sums, IndexType *counts, IndexType size)
Definition: quantile.h:677
VectorView< IndexType > counts()
Definition: quantile.h:178
Centroids< RealType, IndexType > centroids_
Definition: quantile.h:189
DEVICE T * end() const
Definition: VectorView.h:67