OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
quantile.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file quantile.h
19  * @author Matt Pulver <matt.pulver@omnisci.com>
20  * @description Calculate approximate median and general quantiles, based on
21  * "Computing Extremely Accurate Quantiles Using t-Digests" by T. Dunning et al.
22  * https://arxiv.org/abs/1902.04023
23  *
24  */
25 
26 #pragma once
27 
28 #include "DoubleSort.h"
29 #include "SimpleAllocator.h"
30 #include "VectorView.h"
31 #include "gpu_enabled.h"
32 
33 #ifndef __CUDACC__
34 #include <iomanip>
35 #include <mutex>
36 #include <ostream>
37 #endif
38 
39 #include <cmath>
40 #include <limits>
41 #include <memory>
42 #include <numeric>
43 #include <optional>
44 #include <type_traits>
45 
46 namespace quantile {
47 
48 namespace detail {
49 
50 template <typename RealType, typename IndexType>
51 struct Centroid {
52  RealType sum_;
53  IndexType count_;
54 };
55 
56 template <typename RealType, typename IndexType>
57 struct Centroids {
58  IndexType curr_idx_; // used during mergeCentroids algorithm
59  IndexType next_idx_; // used during mergeCentroids algorithm
60  int inc_; // 1 or -1 : forward or reverse iteration
63  static constexpr RealType infinity = std::numeric_limits<RealType>::infinity();
64  static constexpr RealType nan = std::numeric_limits<RealType>::quiet_NaN();
65  RealType max_{-infinity};
66  RealType min_{infinity};
67 
68  Centroids() = default;
69 
70  DEVICE Centroids(RealType* sums, IndexType* counts, IndexType const size)
71  : sums_(sums, size, size), counts_(counts, size, size) {}
72 
74  : sums_(sums), counts_(counts) {}
75 
77 
78  DEVICE IndexType capacity() const { return sums_.capacity(); }
79 
80  DEVICE void clear() {
81  sums_.clear();
82  counts_.clear();
83  max_ = -infinity;
84  min_ = infinity;
85  }
86 
87  DEVICE IndexType currCount() const { return counts_[curr_idx_]; }
88 
89  DEVICE RealType currMean() const { return mean(curr_idx_); }
90 
91  DEVICE bool hasCurr() const { return curr_idx_ < size(); }
92 
93  DEVICE bool hasNext() const { return next_idx_ < size(); }
94 
95  DEVICE RealType mean(IndexType const i) const { return sums_[i] / counts_[i]; }
96 
97  // Return true if centroid was merged, false if not.
98  DEVICE bool mergeIfFits(Centroids& centroid, IndexType const max_count);
99 
100  DEVICE void moveNextToCurrent();
101 
102  DEVICE IndexType nextCount() const { return counts_[next_idx_]; }
103 
104  DEVICE RealType nextSum() const { return sums_[next_idx_]; }
105 
106  // Assumes this->hasNext() and b.hasNext().
107  // Order by (nextMean(), -nextCount()).
108  DEVICE bool operator<(Centroids const& b) const {
109  // lhs < rhs is same as nextMean() < b.nextMean() without division
110  RealType const lhs = nextSum() * b.nextCount();
111  RealType const rhs = b.nextSum() * nextCount();
112  return lhs < rhs || (lhs == rhs && b.nextCount() < nextCount());
113  }
114 
115  DEVICE void push_back(RealType const value, RealType const count) {
116  sums_.push_back(value);
117  counts_.push_back(count);
118  }
119 
120  DEVICE void resetIndices(bool const forward);
121 
122  DEVICE size_t size() const { return sums_.size(); }
123 
124  DEVICE IndexType totalWeight() const {
125  return gpu_enabled::accumulate(counts_.begin(), counts_.end(), IndexType(0));
126  }
127 
128 #ifndef __CUDACC__
129  template <typename RealType2, typename IndexType2>
130  friend std::ostream& operator<<(std::ostream&, Centroids<RealType2, IndexType2> const&);
131 #endif
132 };
133 
134 template <typename RealType, typename IndexType>
139  IndexType prefix_sum_{0}; // Prefix-sum of centroid counts.
140  IndexType const total_weight_;
141  bool const forward_;
142 
143  DEVICE void mergeMinMax();
144  DEVICE void setCurrCentroid();
145 
146  public:
149  bool const forward);
150 
151  // nullptr if !hasNext()
153 
154  DEVICE bool hasNext() const { return buf_->hasNext() || centroids_->hasNext(); }
155 
156  // Merge as many centroids as possible for count <= max_count.
157  DEVICE void merge(IndexType const max_count);
158 
159  // Assume curr_centroid_ is fully merged.
160  DEVICE void next();
161 
162  DEVICE IndexType prefixSum() const { return prefix_sum_; }
163 
164  DEVICE IndexType totalWeight() const { return total_weight_; }
165 };
166 
167 // Instantiated on host for reserving memory for Centroids.
168 template <typename RealType, typename IndexType>
170  // TODO CUDA memory containers
171  std::vector<RealType> sums_;
172  std::vector<IndexType> counts_;
173 
174  public:
175  explicit CentroidsMemory(size_t const size) : sums_(size), counts_(size) {}
176  size_t nbytes() const { return sums_.size() * (sizeof(RealType) + sizeof(IndexType)); }
177  size_t size() const { return sums_.size(); }
178  VectorView<RealType> sums() { return {sums_.data(), sums_.size(), sums_.size()}; }
180  return {counts_.data(), counts_.size(), counts_.size()};
181  }
182 };
183 
184 template <typename RealType, typename IndexType = size_t>
185 class TDigest {
186  // buf_ has two purposes, unless an externally managed buffer is used instead:
187  // * Used as the incoming buffer that gets filled before merging into centroids_.
188  // * After all data has been merged, it is used as a partial sum for centroids_.
191  bool forward_{true}; // alternate direction on each call to mergeCentroids().
192 #ifndef __CUDACC__
193  std::once_flag merge_buffer_final_once_;
194 #endif
195 
196  // simple_allocator_, buf_allocate_, centroids_allocate_ are used only by allocate().
197  std::optional<RealType> const q_{std::nullopt}; // Optional preset quantile parameter.
198  bool const use_linear_scaling_function_{false};
200  IndexType const buf_allocate_{0};
201  IndexType const centroids_allocate_{0};
202 
203  DEVICE RealType max() const { return centroids_.max_; }
204  DEVICE RealType min() const { return centroids_.min_; }
205 
206  DEVICE IndexType maxCardinality(IndexType const sum,
207  IndexType const total_weight,
208  RealType const c);
209 
210  // Require: centroids are sorted by (mean, -count)
212 
213  DEVICE RealType firstCentroid(RealType const x) const;
214  DEVICE RealType interiorCentroid(RealType const x,
215  IndexType const idx1,
216  IndexType const prefix_sum) const;
217  DEVICE RealType lastCentroid(RealType const x, IndexType const N) const;
218  DEVICE RealType oneCentroid(RealType const x) const;
219 
220  DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const;
221 
222  public:
224 
225  TDigest() = default;
226 
228  : centroids_(mem.sums().data(), mem.counts().data(), mem.size()) {
229  centroids_.clear();
230  }
231 
232  DEVICE TDigest(RealType q,
233  SimpleAllocator* simple_allocator,
234  IndexType buf_allocate,
235  IndexType centroids_allocate)
236  : q_(q)
237  , use_linear_scaling_function_(q_ && 0.1 <= *q_ && *q_ <= 0.9)
238  , simple_allocator_(simple_allocator)
239  , buf_allocate_(buf_allocate)
240  , centroids_allocate_(centroids_allocate) {}
241 
243 
244  // Store value to buf_, and merge when full.
245  DEVICE void add(RealType value);
246 
247  // Allocate memory if needed by simple_allocator_.
248  DEVICE void allocate();
249 
250  DEVICE void mergeBuffer();
251 
252  // Calls mergeBuffer() w/ std::call_once and turns buf_ into a partial sum of counts.
253  DEVICE void mergeBufferFinal();
254 
255  // Import from sorted memory range. Ok to change values during merge.
256  DEVICE void mergeSorted(RealType* sums, IndexType* counts, IndexType size);
257 
258  DEVICE void mergeTDigest(TDigest& t_digest) {
259  mergeBuffer();
260  t_digest.mergeBuffer();
261  mergeCentroids(t_digest.centroids_);
262  }
263 
264  // Write partial sum of centroids_.counts_ into buf and return as a VectorView.
265  DEVICE VectorView<IndexType const> partialSumOfCounts(IndexType* const buf) const;
266 
268  RealType const q) const;
269 
270  // Must be called after mergeBufferFinal().
271  DEVICE RealType quantile(RealType const q) const {
272  return quantile({buf_.counts_.data(), centroids_.size()}, q);
273  }
274 
275  DEVICE RealType quantile() { return q_ ? quantile(*q_) : centroids_.nan; }
276 
277  // Assumes mem is externally managed.
278  DEVICE void setBuffer(Memory& mem) {
280  mem.sums().data(), mem.counts().data(), mem.size());
281  buf_.clear();
282  }
283 
284  // Assumes mem is externally managed.
287  mem.sums().data(), mem.counts().data(), mem.size());
288  centroids_.clear();
289  }
290 
292  VectorView<IndexType> const counts) {
294  centroids_.clear();
295  }
296 
297  // Total number of data points in centroids_.
298  DEVICE IndexType totalWeight() const { return centroids_.totalWeight(); }
299 };
300 
301 // Class template member definitions
302 
303 // Class Centroids<>
304 
305 namespace {
306 
307 // Order by (mean, -count)
308 template <typename RealType, typename IndexType>
311  // Assume value1() is positive and use multiplication instead of division.
312  DEVICE bool operator()(Value const& a, Value const& b) const {
313  auto const lhs = a.value0() * b.value1();
314  auto const rhs = b.value0() * a.value1();
315  return lhs < rhs || (lhs == rhs && b.value1() < a.value1());
316  }
317 };
318 
319 } // namespace
320 
321 template <typename RealType, typename IndexType>
323  if (inc_ == -1 && curr_idx_ != 0) {
324  // Shift data to the left left by curr_idx_.
325  // Prefer to copy, but thrust::copy doesn't support overlapping ranges.
326  // Reverse instead, which gets sorted below.
327  // gpu_enabled::copy(sums_.begin() + curr_idx_, sums_.end(), sums_.begin());
328  // gpu_enabled::copy(counts_.begin() + curr_idx_, counts_.end(), counts_.begin());
329  gpu_enabled::reverse(sums_.begin(), sums_.end());
330  gpu_enabled::reverse(counts_.begin(), counts_.end());
331  }
332  // Shift VectorViews to the right by buff.curr_idx_.
333  IndexType const offset = inc_ == 1 ? 0 : buff.curr_idx_;
334  IndexType const buff_size =
335  inc_ == 1 ? buff.curr_idx_ + 1 : buff.size() - buff.curr_idx_;
336  VectorView<RealType> buff_sums(buff.sums_.begin() + offset, buff_size, buff_size);
337  VectorView<IndexType> buff_counts(buff.counts_.begin() + offset, buff_size, buff_size);
338  // Copy buff into sums_ and counts_.
339  IndexType const curr_size = inc_ == 1 ? curr_idx_ + 1 : size() - curr_idx_;
340  IndexType const total_size = curr_size + buff_sums.size();
341  assert(total_size <= sums_.capacity()); // TODO proof of inequality
342  sums_.resize(total_size);
343  gpu_enabled::copy(buff_sums.begin(), buff_sums.end(), sums_.begin() + curr_size);
344  assert(total_size <= counts_.capacity());
345  counts_.resize(total_size);
346  gpu_enabled::copy(buff_counts.begin(), buff_counts.end(), counts_.begin() + curr_size);
347 
348  // Sort sums_ and counts_ by (mean, -count).
349  double_sort::Iterator<RealType, IndexType> const begin(sums_.begin(), counts_.begin());
350  double_sort::Iterator<RealType, IndexType> const end(sums_.end(), counts_.end());
351  gpu_enabled::sort(begin, end, OrderByMeanAscCountDesc<RealType, IndexType>());
352 }
353 
354 // Return true if centroid was merged, false if not.
355 template <typename RealType, typename IndexType>
357  IndexType const max_count) {
358  if (counts_[curr_idx_] + centroid.nextCount() <= max_count) {
359  sums_[curr_idx_] += centroid.nextSum();
360  counts_[curr_idx_] += centroid.nextCount();
361  centroid.next_idx_ += centroid.inc_;
362  return true;
363  }
364  return false;
365 }
366 
367 template <typename RealType, typename IndexType>
369  curr_idx_ += inc_;
370  if (curr_idx_ != next_idx_) {
371  sums_[curr_idx_] = sums_[next_idx_];
372  counts_[curr_idx_] = counts_[next_idx_];
373  }
374  next_idx_ += inc_;
375 }
376 
377 template <typename RealType, typename IndexType>
379  if (forward) {
380  inc_ = 1;
381  curr_idx_ = ~IndexType(0);
382  } else {
383  inc_ = -1;
384  curr_idx_ = size();
385  }
386  static_assert(std::is_unsigned<IndexType>::value,
387  "IndexType must be an unsigned type.");
388  next_idx_ = curr_idx_ + inc_;
389 }
390 
391 #ifndef __CUDACC__
392 template <typename RealType, typename IndexType>
393 std::ostream& operator<<(std::ostream& out,
394  Centroids<RealType, IndexType> const& centroids) {
395  out << "Centroids<" << typeid(RealType).name() << ',' << typeid(IndexType).name()
396  << ">(size(" << centroids.size() << ") curr_idx_(" << centroids.curr_idx_
397  << ") next_idx_(" << centroids.next_idx_ << ") sums_(";
398  for (IndexType i = 0; i < centroids.sums_.size(); ++i) {
399  out << (i ? " " : "") << std::setprecision(20) << centroids.sums_[i];
400  }
401  out << ") counts_(";
402  for (IndexType i = 0; i < centroids.counts_.size(); ++i) {
403  out << (i ? " " : "") << centroids.counts_[i];
404  }
405  return out << "))";
406 }
407 #endif
408 
409 // Class CentroidsMerger<>
410 
411 template <typename RealType, typename IndexType>
415  bool const forward)
416  : buf_(buf)
417  , centroids_(centroids)
418  , total_weight_(centroids->totalWeight() + buf->totalWeight())
419  , forward_(forward) {
420  buf_->resetIndices(forward_);
421  centroids_->resetIndices(forward_);
422  mergeMinMax();
423  setCurrCentroid();
424 }
425 
426 template <typename RealType, typename IndexType>
429  if (buf_->hasNext()) {
430  if (centroids_->hasNext()) {
431  return (*buf_ < *centroids_) == forward_ ? buf_ : centroids_;
432  }
433  return buf_;
434  } else if (centroids_->hasNext()) {
435  return centroids_;
436  }
437  return nullptr;
438 }
439 
440 namespace {
441 
442 // Helper struct for mergeCentroids() for tracking skipped centroids.
443 // A consecutive sequence of incoming centroids with the same mean
444 // and exceed max_count are skipped before expensive reshuffling of memory.
445 template <typename RealType, typename IndexType>
446 class Skipped {
447  struct Data {
448  Centroids<RealType, IndexType>* centroid_{nullptr};
449  IndexType start_{0};
450  IndexType count_merged_{0};
451  IndexType count_skipped_{0};
452  } data_[2];
454 
455  DEVICE static void shiftCentroids(Data& data) {
456  if (data.count_merged_) {
457  shiftRange(data.centroid_->sums_.begin() + data.start_,
458  data.count_skipped_,
459  data.count_merged_,
460  data.centroid_->inc_);
461  shiftRange(data.centroid_->counts_.begin() + data.start_,
462  data.count_skipped_,
463  data.count_merged_,
464  data.centroid_->inc_);
465  data.start_ += data.centroid_->inc_ * data.count_merged_;
466  }
467  }
468  template <typename T>
469  DEVICE static void shiftRange(T* const begin,
470  IndexType skipped,
471  IndexType const merged,
472  int const inc) {
473 #ifdef __CUDACC__
474  T* src = begin + inc * (skipped - 1);
475  T* dst = src + inc * merged;
476  for (; skipped; --skipped, src -= inc, dst -= inc) {
477  *dst = *src;
478  }
479 #else
480  if (inc == 1) {
481  std::copy_backward(begin, begin + skipped, begin + skipped + merged);
482  } else {
483  std::copy(begin + 1 - skipped, begin + 1, begin + 1 - skipped - merged);
484  }
485 #endif
486  }
487 
488  public:
490  return data_[0].centroid_ != centroid;
491  }
493  return mean_.sum_ * next_centroid->nextCount() !=
494  next_centroid->nextSum() * mean_.count_;
495  }
497  IndexType const idx = index(next_centroid);
498  if (data_[idx].count_skipped_) {
499  ++data_[idx].count_merged_;
500  }
501  }
502  DEVICE operator bool() const { return data_[0].centroid_; }
503  // Shift skipped centroids over merged centroids, and rewind next_idx_.
505  shiftCentroids(data_[0]);
506  data_[0].centroid_->next_idx_ = data_[0].start_;
507  if (data_[1].centroid_) {
508  shiftCentroids(data_[1]);
509  data_[1].centroid_->next_idx_ = data_[1].start_;
510  }
511  }
513  mean_ = Centroid<RealType, IndexType>{next_centroid->nextSum(),
514  next_centroid->nextCount()};
515  data_[0] = {next_centroid, next_centroid->next_idx_, 0, 1};
516  next_centroid->next_idx_ += next_centroid->inc_;
517  }
519  IndexType const idx = index(next_centroid);
520  if (idx == 1 && data_[1].centroid_ == nullptr) {
521  data_[1] = {next_centroid, next_centroid->next_idx_, 0, 1};
522  } else {
523  if (data_[idx].count_merged_) {
524  shiftCentroids(data_[idx]);
525  data_[idx].count_merged_ = 0;
526  }
527  ++data_[idx].count_skipped_;
528  }
529  next_centroid->next_idx_ += next_centroid->inc_;
530  }
531 };
532 
533 } // namespace
534 
535 // Merge as many centroids as possible for count <= max_count.
536 template <typename RealType, typename IndexType>
537 DEVICE void CentroidsMerger<RealType, IndexType>::merge(IndexType const max_count) {
538  Skipped<RealType, IndexType> skipped;
539  while (auto* next_centroid = getNextCentroid()) {
540  if (skipped) {
541  if (skipped.isDifferentMean(next_centroid)) {
542  break;
543  } else if (curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
544  skipped.merged(next_centroid);
545  } else {
546  skipped.skipSubsequent(next_centroid);
547  }
548  } else if (!curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
549  skipped.skipFirst(next_centroid);
550  }
551  }
552  if (skipped) {
553  skipped.shiftCentroidsAndSetNext();
554  }
555 }
556 
557 // Track min/max without assuming the data points exist in any particular centroids.
558 // Otherwise to assume and track their existence in the min/max centroids introduces
559 // significant complexity. For example, the min centroid may not remain the min
560 // centroid if it merges into it the 3rd smallest centroid, skipping the 2nd due to
561 // the scaling function constraint.
562 // When the quantile() is calculated, the min/max data points are assumed to exist
563 // in the min/max centroids for the purposes of calculating the quantile within those
564 // centroids.
565 template <typename RealType, typename IndexType>
567  if (centroids_->max_ < buf_->max_) {
568  centroids_->max_ = buf_->max_;
569  }
570  if (buf_->min_ < centroids_->min_) {
571  centroids_->min_ = buf_->min_;
572  }
573 }
574 
575 // Assume curr_centroid_ is fully merged.
576 template <typename RealType, typename IndexType>
578  prefix_sum_ += curr_centroid_->currCount();
579  setCurrCentroid();
580 }
581 
582 template <typename RealType, typename IndexType>
584  if ((curr_centroid_ = getNextCentroid())) {
585  curr_centroid_->moveNextToCurrent();
586  }
587 }
588 
589 // class TDigest<>
590 
591 template <typename RealType, typename IndexType>
593  if (buf_.sums_.full()) {
594  mergeBuffer();
595  }
596  buf_.sums_.push_back(value);
597  buf_.counts_.push_back(1);
598 }
599 
600 // Assumes buf_ is allocated iff centroids_ is allocated.
601 template <typename RealType, typename IndexType>
603  if (buf_.capacity() == 0) {
604  auto* p0 = simple_allocator_->allocate(buf_allocate_ * sizeof(RealType));
605  auto* p1 = simple_allocator_->allocate(buf_allocate_ * sizeof(IndexType));
607  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, buf_allocate_),
608  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, buf_allocate_));
609  p0 = simple_allocator_->allocate(centroids_allocate_ * sizeof(RealType));
610  p1 = simple_allocator_->allocate(centroids_allocate_ * sizeof(IndexType));
611  centroids_ = Centroids<RealType, IndexType>(
612  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, centroids_allocate_),
613  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, centroids_allocate_));
614  }
615 }
616 
617 // Return total_weight * (f_inv(f(q_{i-1})+1) - q_{i-1})
618 template <typename RealType, typename IndexType>
619 DEVICE IndexType
621  IndexType const total_weight,
622  RealType const c) {
623  IndexType const max_bins = centroids_.capacity(); // "compression parameter" delta
624  if (total_weight <= max_bins) {
625  return 0;
626  } else if (use_linear_scaling_function_) {
627  // Whitepaper scaling function k=0.
628  return 2 * total_weight / max_bins;
629  } else {
630  // Whitepaper scaling function k=1.
631  RealType const x = 2.0 * sum / total_weight - 1; // = 2 q_{i-1} - 1
632  RealType const f_inv = 0.5 + 0.5 * std::sin(c + std::asin(x));
633  constexpr RealType eps = 1e-5; // round-off epsilon for floor().
634  IndexType const dsum = static_cast<IndexType>(total_weight * f_inv + eps);
635  return dsum < sum ? 0 : dsum - sum;
636  }
637 }
638 
639 // Assumes buf_ consists only of singletons.
640 template <typename RealType, typename IndexType>
642  if (buf_.size()) {
643  gpu_enabled::sort(buf_.sums_.begin(), buf_.sums_.end());
644  buf_.min_ = buf_.sums_.front();
645  buf_.max_ = buf_.sums_.back();
646  mergeCentroids(buf_);
647  }
648 }
649 
650 // [QE-383] Make concurrent calls to mergeBufferFinal() thread-safe.
651 template <typename RealType, typename IndexType>
653  auto const call_once = [this] {
654  mergeBuffer();
655  assert(centroids_.size() <= buf_.capacity());
656  partialSumOfCounts(buf_.counts_.data());
657  };
658 #ifdef __CUDACC__
659  call_once();
660 #else
661  std::call_once(merge_buffer_final_once_, call_once);
662 #endif
663 }
664 
665 template <typename RealType, typename IndexType>
667  IndexType* counts,
668  IndexType size) {
669  if (size) {
670  if (buf_.capacity() == 0) {
671  buf_ = Centroids<RealType, IndexType>(sums, counts, size); // Set capacity and size
672  } else {
673  buf_.sums_.set(sums, size); // Does not change capacity
674  buf_.counts_.set(counts, size);
675  }
676  gpu_enabled::fill(buf_.counts_.begin(), buf_.counts_.end(), IndexType(1));
677  buf_.min_ = buf_.sums_.front();
678  buf_.max_ = buf_.sums_.back();
679  mergeCentroids(buf_);
680  }
681 }
682 
683 // Require:
684 // * buf centroids are not empty and sorted by (mean, -count)
685 // * buf.min_ and buf.max_ are correctly set.
686 // During filling stage, buf=buf_.
687 // During reduction, buf_=centroids_[i]
688 template <typename RealType, typename IndexType>
691  constexpr RealType two_pi = 6.283185307179586476925286766559005768e+00;
692  // Hoisted constant used in maxCardinality().
693  RealType const c = two_pi / centroids_.capacity();
694  // Loop over sorted sequence of buf and centroids_.
695  // Some latter centroids may be merged into the current centroid, so the number
696  // of iterations is only at most equal to the number of initial centroids.
698  for (CM cm(&buf, &centroids_, forward_); cm.hasNext(); cm.next()) {
699  // cm.prefixSum() == 0 on first iteration.
700  // Max cardinality for current centroid to be fully merged based on scaling function.
701  IndexType const max_cardinality = maxCardinality(cm.prefixSum(), cm.totalWeight(), c);
702  cm.merge(max_cardinality);
703  }
704  // Combine sorted centroids buf[0..curr_idx_] + centroids_[0..curr_idx_] if forward
705  centroids_.appendAndSortCurrent(buf);
706  buf.clear();
707  forward_ ^= true; // alternate merge direction on each call
708 }
709 
710 namespace {
711 template <typename CountsIterator>
712 DEVICE bool isSingleton(CountsIterator itr) {
713  return *itr == 1;
714 }
715 } // namespace
716 
717 // Assumes x < centroids_.counts_.front().
718 template <typename RealType, typename IndexType>
719 DEVICE RealType TDigest<RealType, IndexType>::firstCentroid(RealType const x) const {
720  if (x < 1) {
721  return min();
722  } else if (centroids_.size() == 1) {
723  return oneCentroid(x);
724  } else if (centroids_.counts_.front() == 2) {
725  RealType const sum = centroids_.sums_.front();
726  return x == 1 ? 0.5 * sum : sum - min();
727  } else {
728  RealType const count = centroids_.counts_.front();
729  RealType const dx = x - RealType(0.5) * (1 + count);
730  RealType const mean = (centroids_.sums_.front() - min()) / (count - 1);
731  return mean + slope(0, 0 < dx) * dx;
732  }
733 }
734 
735 // x is between first and last centroids.
736 template <typename RealType, typename IndexType>
737 DEVICE RealType
739  IndexType const idx1,
740  IndexType const prefix_sum) const {
741  if (isSingleton(centroids_.counts_.begin() + idx1)) {
742  RealType const sum1 = centroids_.sums_[idx1];
743  if (x == prefix_sum - centroids_.counts_[idx1]) {
744  if (isSingleton(centroids_.counts_.begin() + idx1 - 1)) {
745  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
746  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
747  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
748  }
749  }
750  return sum1;
751  } else {
752  RealType const dx = x + RealType(0.5) * centroids_.counts_[idx1] - prefix_sum;
753  IndexType const idx2 = idx1 + 2 * (0 < dx) - 1;
754  return centroids_.mean(idx1) + slope(idx1, idx2) * dx;
755  }
756 }
757 
758 // Assumes N - centroids_.counts_.back() <= x < N, and there is more than 1 centroid.
759 template <typename RealType, typename IndexType>
761  IndexType const N) const {
762  if (N - 1 < x) {
763  return max();
764  }
765  IndexType const idx1 = centroids_.size() - 1;
766  RealType const sum1 = centroids_.sums_[idx1];
767  IndexType const count1 = centroids_.counts_[idx1];
768  if (count1 == 1) { // => x == N - 1
769  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
770  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
771  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
772  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
773  } else {
774  return sum1;
775  }
776  } else if (count1 == 2) { // => 3 <= N
777  if (x == N - 1) {
778  return 0.5 * sum1;
779  } else if (x == N - 2) {
780  RealType const sum2 = centroids_.sums_[idx1 - 1];
781  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
782  return 0.5 * (sum2 + sum1 - max());
783  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
784  return 0.5 * (sum2 - min() + sum1 - max());
785  }
786  }
787  return sum1 - max();
788  } else { // => 3 <= count1
789  RealType const dx = x + RealType(0.5) * (count1 + 1) - N;
790  RealType const mean = (sum1 - max()) / (count1 - 1);
791  return mean + slope(idx1, idx1 - (dx < 0)) * dx;
792  }
793 }
794 
795 // Assumes there is only 1 centroid, and 1 <= x < centroids_.counts_.front().
796 template <typename RealType, typename IndexType>
797 DEVICE RealType TDigest<RealType, IndexType>::oneCentroid(RealType const x) const {
798  IndexType const N = centroids_.counts_.front();
799  if (N - 1 < x) { // includes case N == 1
800  return max();
801  } else if (N == 2) { // x == 1
802  return 0.5 * centroids_.sums_.front();
803  } else if (N == 3) { // 1 <= x <= 2
804  if (x == 2) {
805  return 0.5 * (centroids_.sums_.front() - min());
806  } else {
807  RealType const s = centroids_.sums_.front() - max();
808  return x == 1 ? 0.5 * s : s - min();
809  }
810  } else { // 3 < N
811  RealType const dx = x - RealType(0.5) * N;
812  RealType const mean = (centroids_.sums_.front() - (min() + max())) / (N - 2);
813  RealType const slope = 2 * (0 < dx ? max() - mean : mean - min()) / (N - 2);
814  return mean + slope * dx;
815  }
816 }
817 
818 // Return a partial sum of centroids_.counts_ stored in buf.
819 template <typename RealType, typename IndexType>
821  IndexType* const buf) const {
822  gpu_enabled::partial_sum(centroids_.counts_.begin(), centroids_.counts_.end(), buf);
823  return {buf, centroids_.size()};
824 }
825 
826 template <typename RealType, typename IndexType>
827 DEVICE RealType
829  RealType const q) const {
830  if (centroids_.size()) {
831  IndexType const N = partial_sum.back();
832  RealType const x = q * N;
833  auto const it1 = gpu_enabled::upper_bound(partial_sum.begin(), partial_sum.end(), x);
834  if (it1 == partial_sum.begin()) {
835  return firstCentroid(x);
836  } else if (it1 == partial_sum.end()) { // <==> 1 <= q
837  return max();
838  } else if (it1 + 1 == partial_sum.end()) {
839  return lastCentroid(x, N);
840  } else {
841  return interiorCentroid(x, it1 - partial_sum.begin(), *it1);
842  }
843  } else {
844  return centroids_.nan;
845  }
846 }
847 
848 // Requirement: 1 < M and 0 <= idx1, idx2 < M where M = Number of centroids.
849 // Return slope of line segment connecting idx1 and idx2.
850 // If equal then assume it is the section contained within an extrema.
851 // Centroid for idx1 is not a singleton, but idx2 may be.
852 template <typename RealType, typename IndexType>
854  IndexType idx2) const {
855  IndexType const M = centroids_.size();
856  if (idx1 == idx2) { // Line segment is contained in either the first or last centroid.
857  RealType const n = static_cast<RealType>(centroids_.counts_[idx1]);
858  RealType const s = centroids_.sums_[idx1];
859  return idx1 == 0 ? 2 * (s - n * min()) / ((n - 1) * (n - 1))
860  : 2 * (n * max() - s) / ((n - 1) * (n - 1));
861  } else {
862  bool const min1 = idx1 == 0; // idx1 is the min centroid
863  bool const max1 = idx1 == M - 1; // idx1 is the max centroid
864  bool const min2 = idx2 == 0; // idx2 is the min centroid
865  bool const max2 = idx2 == M - 1; // idx2 is the max centroid
866  RealType const n1 = static_cast<RealType>(centroids_.counts_[idx1] - min1 - max1);
867  RealType const s1 = centroids_.sums_[idx1] - (min1 ? min() : max1 ? max() : 0);
868  RealType const s2 = centroids_.sums_[idx2] - (min2 ? min() : max2 ? max() : 0);
869  if (isSingleton(centroids_.counts_.begin() + idx2)) {
870  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - s1) / (n1 * n1);
871  } else {
872  RealType const n2 = static_cast<RealType>(centroids_.counts_[idx2] - min2 - max2);
873  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - n2 * s1) / (n1 * n2 * (n1 + n2));
874  }
875  }
876 }
877 
878 } // namespace detail
879 
881 
882 } // namespace quantile
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
static constexpr RealType infinity
Definition: quantile.h:63
DEVICE void push_back(RealType const value, RealType const count)
Definition: quantile.h:115
DEVICE void push_back(T const &value)
Definition: VectorView.h:74
std::vector< RealType > sums_
Definition: quantile.h:171
DEVICE size_type capacity() const
Definition: VectorView.h:61
Centroids< RealType, IndexType > * buf_
Definition: quantile.h:136
DEVICE void skipSubsequent(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:518
std::ostream & operator<<(std::ostream &out, Centroids< RealType, IndexType > const &centroids)
Definition: quantile.h:393
bool const use_linear_scaling_function_
Definition: quantile.h:198
CentroidsMemory(size_t const size)
Definition: quantile.h:175
DEVICE RealType mean(IndexType const i) const
Definition: quantile.h:95
DEVICE bool index(Centroids< RealType, IndexType > *centroid) const
Definition: quantile.h:489
DEVICE void resetIndices(bool const forward)
Definition: quantile.h:378
DEVICE Centroids< RealType, IndexType > & centroids()
Definition: quantile.h:242
DEVICE void moveNextToCurrent()
Definition: quantile.h:368
DEVICE void add(RealType value)
Definition: quantile.h:592
DEVICE VectorView< IndexType const > partialSumOfCounts(IndexType *const buf) const
Definition: quantile.h:820
DEVICE RealType quantile(RealType const q) const
Definition: quantile.h:271
std::once_flag merge_buffer_final_once_
Definition: quantile.h:193
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
DEVICE void merged(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:496
DEVICE bool mergeIfFits(Centroids &centroid, IndexType const max_count)
Definition: quantile.h:356
DEVICE bool hasNext() const
Definition: quantile.h:93
DEVICE TDigest(Memory &mem)
Definition: quantile.h:227
DEVICE void mergeBuffer()
Definition: quantile.h:641
DEVICE IndexType totalWeight() const
Definition: quantile.h:298
VectorView< RealType > sums()
Definition: quantile.h:178
DEVICE T * data()
Definition: VectorView.h:65
VectorView< RealType > sums_
Definition: quantile.h:61
VectorView< IndexType > counts_
Definition: quantile.h:62
std::vector< IndexType > counts_
Definition: quantile.h:172
DEVICE void clear()
Definition: VectorView.h:64
DEVICE void mergeBufferFinal()
Definition: quantile.h:652
#define DEVICE
constexpr double a
Definition: Utm.h:32
DEVICE RealType currMean() const
Definition: quantile.h:89
DEVICE RealType quantile()
Definition: quantile.h:275
DEVICE void setCentroids(Memory &mem)
Definition: quantile.h:285
DEVICE size_type size() const
Definition: VectorView.h:84
DEVICE TDigest(RealType q, SimpleAllocator *simple_allocator, IndexType buf_allocate, IndexType centroids_allocate)
Definition: quantile.h:232
Centroids< RealType, IndexType > * centroids_
Definition: quantile.h:137
DEVICE RealType max() const
Definition: quantile.h:203
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
DEVICE void set(T *data, size_type const size)
Definition: VectorView.h:80
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
DEVICE Centroids(VectorView< RealType > sums, VectorView< IndexType > counts)
Definition: quantile.h:73
DEVICE T * begin() const
Definition: VectorView.h:60
DEVICE CentroidsMerger(Centroids< RealType, IndexType > *buf, Centroids< RealType, IndexType > *centroids, bool const forward)
Definition: quantile.h:412
DEVICE void mergeTDigest(TDigest &t_digest)
Definition: quantile.h:258
DEVICE IndexType nextCount() const
Definition: quantile.h:102
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
DEVICE void setCentroids(VectorView< RealType > const sums, VectorView< IndexType > const counts)
Definition: quantile.h:291
DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const
Definition: quantile.h:853
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
Centroids< RealType, IndexType > buf_
Definition: quantile.h:189
IndexType const buf_allocate_
Definition: quantile.h:200
DEVICE RealType nextSum() const
Definition: quantile.h:104
DEVICE RealType lastCentroid(RealType const x, IndexType const N) const
Definition: quantile.h:760
std::optional< RealType > const q_
Definition: quantile.h:197
DEVICE void skipFirst(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:512
DEVICE void allocate()
Definition: quantile.h:602
static DEVICE void shiftRange(T *const begin, IndexType skipped, IndexType const merged, int const inc)
Definition: quantile.h:469
DEVICE void setBuffer(Memory &mem)
Definition: quantile.h:278
DEVICE bool operator()(Value const &a, Value const &b) const
Definition: quantile.h:312
static constexpr RealType nan
Definition: quantile.h:64
DEVICE Centroids< RealType, IndexType > * getNextCentroid() const
Definition: quantile.h:428
DEVICE IndexType prefixSum() const
Definition: quantile.h:162
DEVICE Centroids(RealType *sums, IndexType *counts, IndexType const size)
Definition: quantile.h:70
SimpleAllocator *const simple_allocator_
Definition: quantile.h:199
DEVICE void merge(IndexType const max_count)
Definition: quantile.h:537
DEVICE bool operator<(Centroids const &b) const
Definition: quantile.h:108
DEVICE bool hasNext() const
Definition: quantile.h:154
IndexType const centroids_allocate_
Definition: quantile.h:201
Centroids< RealType, IndexType > * curr_centroid_
Definition: quantile.h:138
DEVICE IndexType maxCardinality(IndexType const sum, IndexType const total_weight, RealType const c)
Definition: quantile.h:620
DEVICE size_t size() const
Definition: quantile.h:122
DEVICE IndexType currCount() const
Definition: quantile.h:87
DEVICE IndexType totalWeight() const
Definition: quantile.h:124
DEVICE bool isDifferentMean(Centroids< RealType, IndexType > *next_centroid) const
Definition: quantile.h:492
DEVICE T0 value0() const
Definition: DoubleSort.h:87
constexpr unsigned N
Definition: Utm.h:110
DEVICE IndexType capacity() const
Definition: quantile.h:78
DEVICE IndexType totalWeight() const
Definition: quantile.h:164
DEVICE RealType interiorCentroid(RealType const x, IndexType const idx1, IndexType const prefix_sum) const
Definition: quantile.h:738
DEVICE RealType oneCentroid(RealType const x) const
Definition: quantile.h:797
DEVICE T1 value1() const
Definition: DoubleSort.h:88
DEVICE void mergeCentroids(Centroids< RealType, IndexType > &)
Definition: quantile.h:689
string name
Definition: setup.in.py:72
constexpr double n
Definition: Utm.h:38
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
DEVICE T & back()
Definition: VectorView.h:58
DEVICE RealType min() const
Definition: quantile.h:204
DEVICE RealType firstCentroid(RealType const x) const
Definition: quantile.h:719
DEVICE bool isSingleton(CountsIterator itr)
Definition: quantile.h:712
DEVICE void appendAndSortCurrent(Centroids &buff)
Definition: quantile.h:322
DEVICE bool hasCurr() const
Definition: quantile.h:91
DEVICE void mergeSorted(RealType *sums, IndexType *counts, IndexType size)
Definition: quantile.h:666
VectorView< IndexType > counts()
Definition: quantile.h:179
Centroids< RealType, IndexType > centroids_
Definition: quantile.h:190
DEVICE T * end() const
Definition: VectorView.h:68