OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
quantile.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #pragma once
26 
27 #include "DoubleSort.h"
28 #include "SimpleAllocator.h"
29 #include "VectorView.h"
30 #include "gpu_enabled.h"
31 
32 #ifndef __CUDACC__
33 #include <iomanip>
34 #include <mutex>
35 #include <ostream>
36 #endif
37 
38 #include <cmath>
39 #include <limits>
40 #include <memory>
41 #include <numeric>
42 #include <optional>
43 #include <type_traits>
44 
45 namespace quantile {
46 
47 namespace detail {
48 
49 template <typename RealType, typename IndexType>
50 struct Centroid {
51  RealType sum_;
52  IndexType count_;
53 };
54 
55 template <typename RealType, typename IndexType>
56 struct Centroids {
57  IndexType curr_idx_; // used during mergeCentroids algorithm
58  IndexType next_idx_; // used during mergeCentroids algorithm
59  int inc_; // 1 or -1 : forward or reverse iteration
62  static constexpr RealType infinity = std::numeric_limits<RealType>::infinity();
63  static constexpr RealType nan = std::numeric_limits<RealType>::quiet_NaN();
64  RealType max_{-infinity};
65  RealType min_{infinity};
66 
67  Centroids() = default;
68 
69  DEVICE Centroids(RealType* sums, IndexType* counts, IndexType const size)
70  : sums_(sums, size, size), counts_(counts, size, size) {}
71 
73  : sums_(sums), counts_(counts) {}
74 
76 
77  DEVICE IndexType capacity() const { return sums_.capacity(); }
78 
79  DEVICE void clear() {
80  sums_.clear();
81  counts_.clear();
82  max_ = -infinity;
83  min_ = infinity;
84  }
85 
86  DEVICE IndexType currCount() const { return counts_[curr_idx_]; }
87 
88  DEVICE RealType currMean() const { return mean(curr_idx_); }
89 
90  DEVICE bool hasCurr() const { return curr_idx_ < size(); }
91 
92  DEVICE bool hasNext() const { return next_idx_ < size(); }
93 
94  DEVICE RealType mean(IndexType const i) const { return sums_[i] / counts_[i]; }
95 
96  // Return true if centroid was merged, false if not.
97  DEVICE bool mergeIfFits(Centroids& centroid, IndexType const max_count);
98 
100 
101  DEVICE IndexType nextCount() const { return counts_[next_idx_]; }
102 
103  DEVICE RealType nextSum() const { return sums_[next_idx_]; }
104 
105  // Assumes this->hasNext() and b.hasNext().
106  // Order by (nextMean(), -nextCount()).
107  DEVICE bool operator<(Centroids const& b) const {
108  // lhs < rhs is same as nextMean() < b.nextMean() without division
109  RealType const lhs = nextSum() * b.nextCount();
110  RealType const rhs = b.nextSum() * nextCount();
111  return lhs < rhs || (lhs == rhs && b.nextCount() < nextCount());
112  }
113 
114  DEVICE void push_back(RealType const value, RealType const count) {
115  sums_.push_back(value);
116  counts_.push_back(count);
117  }
118 
119  DEVICE void resetIndices(bool const forward);
120 
121  DEVICE size_t size() const { return sums_.size(); }
122 
123  DEVICE IndexType totalWeight() const {
124  return gpu_enabled::accumulate(counts_.begin(), counts_.end(), IndexType(0));
125  }
126 
127 #ifndef __CUDACC__
128  template <typename RealType2, typename IndexType2>
129  friend std::ostream& operator<<(std::ostream&, Centroids<RealType2, IndexType2> const&);
130 #endif
131 };
132 
133 template <typename RealType, typename IndexType>
138  IndexType prefix_sum_{0}; // Prefix-sum of centroid counts.
139  IndexType const total_weight_;
140  bool const forward_;
141 
142  DEVICE void mergeMinMax();
143  DEVICE void setCurrCentroid();
144 
145  public:
148  bool const forward);
149 
150  // nullptr if !hasNext()
152 
153  DEVICE bool hasNext() const { return buf_->hasNext() || centroids_->hasNext(); }
154 
155  // Merge as many centroids as possible for count <= max_count.
156  DEVICE void merge(IndexType const max_count);
157 
158  // Assume curr_centroid_ is fully merged.
159  DEVICE void next();
160 
161  DEVICE IndexType prefixSum() const { return prefix_sum_; }
162 
163  DEVICE IndexType totalWeight() const { return total_weight_; }
164 };
165 
166 // Instantiated on host for reserving memory for Centroids.
167 template <typename RealType, typename IndexType>
169  // TODO CUDA memory containers
170  std::vector<RealType> sums_;
171  std::vector<IndexType> counts_;
172 
173  public:
174  explicit CentroidsMemory(size_t const size) : sums_(size), counts_(size) {}
175  size_t nbytes() const { return sums_.size() * (sizeof(RealType) + sizeof(IndexType)); }
176  size_t size() const { return sums_.size(); }
177  VectorView<RealType> sums() { return {sums_.data(), sums_.size(), sums_.size()}; }
179  return {counts_.data(), counts_.size(), counts_.size()};
180  }
181 };
182 
183 template <typename RealType, typename IndexType = size_t>
184 class TDigest {
185  // buf_ has two purposes, unless an externally managed buffer is used instead:
186  // * Used as the incoming buffer that gets filled before merging into centroids_.
187  // * After all data has been merged, it is used as a partial sum for centroids_.
190  bool forward_{true}; // alternate direction on each call to mergeCentroids().
191 #ifndef __CUDACC__
192  std::once_flag merge_buffer_final_once_;
193 #endif
194 
195  // simple_allocator_, buf_allocate_, centroids_allocate_ are used only by allocate().
196  std::optional<RealType> const q_{std::nullopt}; // Optional preset quantile parameter.
197  bool const use_linear_scaling_function_{false};
199  IndexType const buf_allocate_{0};
200  IndexType const centroids_allocate_{0};
201 
202  DEVICE RealType max() const { return centroids_.max_; }
203  DEVICE RealType min() const { return centroids_.min_; }
204 
205  DEVICE IndexType maxCardinality(IndexType const sum,
206  IndexType const total_weight,
207  RealType const c);
208 
209  // Require: centroids are sorted by (mean, -count)
211 
212  DEVICE RealType firstCentroid(RealType const x) const;
213  DEVICE RealType interiorCentroid(RealType const x,
214  IndexType const idx1,
215  IndexType const prefix_sum) const;
216  DEVICE RealType lastCentroid(RealType const x, IndexType const N) const;
217  DEVICE RealType oneCentroid(RealType const x) const;
218 
219  DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const;
220 
221  public:
223 
224  TDigest() = default;
225 
227  : centroids_(mem.sums().data(), mem.counts().data(), mem.size()) {
228  centroids_.clear();
229  }
230 
231  DEVICE TDigest(RealType q,
232  SimpleAllocator* simple_allocator,
233  IndexType buf_allocate,
234  IndexType centroids_allocate)
235  : q_(q)
236  , use_linear_scaling_function_(q_ && 0.1 <= *q_ && *q_ <= 0.9)
237  , simple_allocator_(simple_allocator)
238  , buf_allocate_(buf_allocate)
239  , centroids_allocate_(centroids_allocate) {}
240 
242 
243  // Store value to buf_, and merge when full.
244  DEVICE void add(RealType value);
245 
246  // Allocate memory if needed by simple_allocator_.
247  DEVICE void allocate();
248 
249  DEVICE void mergeBuffer();
250 
251  // Calls mergeBuffer() w/ std::call_once and turns buf_ into a partial sum of counts.
252  DEVICE void mergeBufferFinal();
253 
254  // Import from sorted memory range. Ok to change values during merge.
255  DEVICE void mergeSorted(RealType* sums, IndexType* counts, IndexType size);
256 
257  DEVICE void mergeTDigest(TDigest& t_digest) {
258  mergeBuffer();
259  t_digest.mergeBuffer();
260  mergeCentroids(t_digest.centroids_);
261  }
262 
263  // Write partial sum of centroids_.counts_ into buf and return as a VectorView.
264  DEVICE VectorView<IndexType const> partialSumOfCounts(IndexType* const buf) const;
265 
267  RealType const q) const;
268 
269  // Must be called after mergeBufferFinal().
270  DEVICE RealType quantile(RealType const q) const {
271  return quantile({buf_.counts_.data(), centroids_.size()}, q);
272  }
273 
274  DEVICE RealType quantile() { return q_ ? quantile(*q_) : centroids_.nan; }
275 
276  // Assumes mem is externally managed.
277  DEVICE void setBuffer(Memory& mem) {
279  mem.sums().data(), mem.counts().data(), mem.size());
280  buf_.clear();
281  }
282 
283  // Assumes mem is externally managed.
286  mem.sums().data(), mem.counts().data(), mem.size());
287  centroids_.clear();
288  }
289 
291  VectorView<IndexType> const counts) {
293  centroids_.clear();
294  }
295 
296  // Total number of data points in centroids_.
297  DEVICE IndexType totalWeight() const { return centroids_.totalWeight(); }
298 };
299 
300 // Class template member definitions
301 
302 // Class Centroids<>
303 
304 namespace {
305 
306 // Order by (mean, -count)
307 template <typename RealType, typename IndexType>
310  // Assume value1() is positive and use multiplication instead of division.
311  DEVICE bool operator()(Value const& a, Value const& b) const {
312  auto const lhs = a.value0() * b.value1();
313  auto const rhs = b.value0() * a.value1();
314  return lhs < rhs || (lhs == rhs && b.value1() < a.value1());
315  }
316 };
317 
318 } // namespace
319 
320 template <typename RealType, typename IndexType>
322  if (inc_ == -1 && curr_idx_ != 0) {
323  // Shift data to the left left by curr_idx_.
324  // Prefer to copy, but thrust::copy doesn't support overlapping ranges.
325  // Reverse instead, which gets sorted below.
326  // gpu_enabled::copy(sums_.begin() + curr_idx_, sums_.end(), sums_.begin());
327  // gpu_enabled::copy(counts_.begin() + curr_idx_, counts_.end(), counts_.begin());
328  gpu_enabled::reverse(sums_.begin(), sums_.end());
329  gpu_enabled::reverse(counts_.begin(), counts_.end());
330  }
331  // Shift VectorViews to the right by buff.curr_idx_.
332  IndexType const offset = inc_ == 1 ? 0 : buff.curr_idx_;
333  IndexType const buff_size =
334  inc_ == 1 ? buff.curr_idx_ + 1 : buff.size() - buff.curr_idx_;
335  VectorView<RealType> buff_sums(buff.sums_.begin() + offset, buff_size, buff_size);
336  VectorView<IndexType> buff_counts(buff.counts_.begin() + offset, buff_size, buff_size);
337  // Copy buff into sums_ and counts_.
338  IndexType const curr_size = inc_ == 1 ? curr_idx_ + 1 : size() - curr_idx_;
339  IndexType const total_size = curr_size + buff_sums.size();
340  assert(total_size <= sums_.capacity()); // TODO proof of inequality
341  sums_.resize(total_size);
342  gpu_enabled::copy(buff_sums.begin(), buff_sums.end(), sums_.begin() + curr_size);
343  assert(total_size <= counts_.capacity());
344  counts_.resize(total_size);
345  gpu_enabled::copy(buff_counts.begin(), buff_counts.end(), counts_.begin() + curr_size);
346 
347  // Sort sums_ and counts_ by (mean, -count).
348  double_sort::Iterator<RealType, IndexType> const begin(sums_.begin(), counts_.begin());
349  double_sort::Iterator<RealType, IndexType> const end(sums_.end(), counts_.end());
350  gpu_enabled::sort(begin, end, OrderByMeanAscCountDesc<RealType, IndexType>());
351 }
352 
353 // Return true if centroid was merged, false if not.
354 template <typename RealType, typename IndexType>
356  IndexType const max_count) {
357  if (counts_[curr_idx_] + centroid.nextCount() <= max_count) {
358  sums_[curr_idx_] += centroid.nextSum();
359  counts_[curr_idx_] += centroid.nextCount();
360  centroid.next_idx_ += centroid.inc_;
361  return true;
362  }
363  return false;
364 }
365 
366 template <typename RealType, typename IndexType>
368  curr_idx_ += inc_;
369  if (curr_idx_ != next_idx_) {
370  sums_[curr_idx_] = sums_[next_idx_];
371  counts_[curr_idx_] = counts_[next_idx_];
372  }
373  next_idx_ += inc_;
374 }
375 
376 template <typename RealType, typename IndexType>
378  if (forward) {
379  inc_ = 1;
380  curr_idx_ = ~IndexType(0);
381  } else {
382  inc_ = -1;
383  curr_idx_ = size();
384  }
385  static_assert(std::is_unsigned<IndexType>::value,
386  "IndexType must be an unsigned type.");
387  next_idx_ = curr_idx_ + inc_;
388 }
389 
390 #ifndef __CUDACC__
391 template <typename RealType, typename IndexType>
392 std::ostream& operator<<(std::ostream& out,
393  Centroids<RealType, IndexType> const& centroids) {
394  out << "Centroids<" << typeid(RealType).name() << ',' << typeid(IndexType).name()
395  << ">(size(" << centroids.size() << ") curr_idx_(" << centroids.curr_idx_
396  << ") next_idx_(" << centroids.next_idx_ << ") sums_(";
397  for (IndexType i = 0; i < centroids.sums_.size(); ++i) {
398  out << (i ? " " : "") << std::setprecision(20) << centroids.sums_[i];
399  }
400  out << ") counts_(";
401  for (IndexType i = 0; i < centroids.counts_.size(); ++i) {
402  out << (i ? " " : "") << centroids.counts_[i];
403  }
404  return out << "))";
405 }
406 #endif
407 
408 // Class CentroidsMerger<>
409 
410 template <typename RealType, typename IndexType>
414  bool const forward)
415  : buf_(buf)
416  , centroids_(centroids)
417  , total_weight_(centroids->totalWeight() + buf->totalWeight())
418  , forward_(forward) {
419  buf_->resetIndices(forward_);
420  centroids_->resetIndices(forward_);
421  mergeMinMax();
422  setCurrCentroid();
423 }
424 
425 template <typename RealType, typename IndexType>
428  if (buf_->hasNext()) {
429  if (centroids_->hasNext()) {
430  return (*buf_ < *centroids_) == forward_ ? buf_ : centroids_;
431  }
432  return buf_;
433  } else if (centroids_->hasNext()) {
434  return centroids_;
435  }
436  return nullptr;
437 }
438 
439 namespace {
440 
441 // Helper struct for mergeCentroids() for tracking skipped centroids.
442 // A consecutive sequence of incoming centroids with the same mean
443 // and exceed max_count are skipped before expensive reshuffling of memory.
444 template <typename RealType, typename IndexType>
445 class Skipped {
446  struct Data {
447  Centroids<RealType, IndexType>* centroid_{nullptr};
448  IndexType start_{0};
449  IndexType count_merged_{0};
450  IndexType count_skipped_{0};
451  } data_[2];
453 
454  DEVICE static void shiftCentroids(Data& data) {
455  if (data.count_merged_) {
456  shiftRange(data.centroid_->sums_.begin() + data.start_,
457  data.count_skipped_,
458  data.count_merged_,
459  data.centroid_->inc_);
460  shiftRange(data.centroid_->counts_.begin() + data.start_,
461  data.count_skipped_,
462  data.count_merged_,
463  data.centroid_->inc_);
464  data.start_ += data.centroid_->inc_ * data.count_merged_;
465  }
466  }
467  template <typename T>
468  DEVICE static void shiftRange(T* const begin,
469  IndexType skipped,
470  IndexType const merged,
471  int const inc) {
472 #ifdef __CUDACC__
473  T* src = begin + inc * (skipped - 1);
474  T* dst = src + inc * merged;
475  for (; skipped; --skipped, src -= inc, dst -= inc) {
476  *dst = *src;
477  }
478 #else
479  if (inc == 1) {
480  std::copy_backward(begin, begin + skipped, begin + skipped + merged);
481  } else {
482  std::copy(begin + 1 - skipped, begin + 1, begin + 1 - skipped - merged);
483  }
484 #endif
485  }
486 
487  public:
489  return data_[0].centroid_ != centroid;
490  }
492  return mean_.sum_ * next_centroid->nextCount() !=
493  next_centroid->nextSum() * mean_.count_;
494  }
496  IndexType const idx = index(next_centroid);
497  if (data_[idx].count_skipped_) {
498  ++data_[idx].count_merged_;
499  }
500  }
501  DEVICE operator bool() const { return data_[0].centroid_; }
502  // Shift skipped centroids over merged centroids, and rewind next_idx_.
504  shiftCentroids(data_[0]);
505  data_[0].centroid_->next_idx_ = data_[0].start_;
506  if (data_[1].centroid_) {
507  shiftCentroids(data_[1]);
508  data_[1].centroid_->next_idx_ = data_[1].start_;
509  }
510  }
512  mean_ = Centroid<RealType, IndexType>{next_centroid->nextSum(),
513  next_centroid->nextCount()};
514  data_[0] = {next_centroid, next_centroid->next_idx_, 0, 1};
515  next_centroid->next_idx_ += next_centroid->inc_;
516  }
518  IndexType const idx = index(next_centroid);
519  if (idx == 1 && data_[1].centroid_ == nullptr) {
520  data_[1] = {next_centroid, next_centroid->next_idx_, 0, 1};
521  } else {
522  if (data_[idx].count_merged_) {
523  shiftCentroids(data_[idx]);
524  data_[idx].count_merged_ = 0;
525  }
526  ++data_[idx].count_skipped_;
527  }
528  next_centroid->next_idx_ += next_centroid->inc_;
529  }
530 };
531 
532 } // namespace
533 
534 // Merge as many centroids as possible for count <= max_count.
535 template <typename RealType, typename IndexType>
536 DEVICE void CentroidsMerger<RealType, IndexType>::merge(IndexType const max_count) {
537  Skipped<RealType, IndexType> skipped;
538  while (auto* next_centroid = getNextCentroid()) {
539  if (skipped) {
540  if (skipped.isDifferentMean(next_centroid)) {
541  break;
542  } else if (curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
543  skipped.merged(next_centroid);
544  } else {
545  skipped.skipSubsequent(next_centroid);
546  }
547  } else if (!curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
548  skipped.skipFirst(next_centroid);
549  }
550  }
551  if (skipped) {
552  skipped.shiftCentroidsAndSetNext();
553  }
554 }
555 
556 // Track min/max without assuming the data points exist in any particular centroids.
557 // Otherwise to assume and track their existence in the min/max centroids introduces
558 // significant complexity. For example, the min centroid may not remain the min
559 // centroid if it merges into it the 3rd smallest centroid, skipping the 2nd due to
560 // the scaling function constraint.
561 // When the quantile() is calculated, the min/max data points are assumed to exist
562 // in the min/max centroids for the purposes of calculating the quantile within those
563 // centroids.
564 template <typename RealType, typename IndexType>
566  if (centroids_->max_ < buf_->max_) {
567  centroids_->max_ = buf_->max_;
568  }
569  if (buf_->min_ < centroids_->min_) {
570  centroids_->min_ = buf_->min_;
571  }
572 }
573 
574 // Assume curr_centroid_ is fully merged.
575 template <typename RealType, typename IndexType>
577  prefix_sum_ += curr_centroid_->currCount();
578  setCurrCentroid();
579 }
580 
581 template <typename RealType, typename IndexType>
583  if ((curr_centroid_ = getNextCentroid())) {
584  curr_centroid_->moveNextToCurrent();
585  }
586 }
587 
588 // class TDigest<>
589 
590 template <typename RealType, typename IndexType>
592  if (buf_.sums_.full()) {
593  mergeBuffer();
594  }
595  buf_.sums_.push_back(value);
596  buf_.counts_.push_back(1);
597 }
598 
599 // Assumes buf_ is allocated iff centroids_ is allocated.
600 template <typename RealType, typename IndexType>
602  if (buf_.capacity() == 0) {
603  auto* p0 = simple_allocator_->allocate(buf_allocate_ * sizeof(RealType));
604  auto* p1 = simple_allocator_->allocate(buf_allocate_ * sizeof(IndexType));
606  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, buf_allocate_),
607  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, buf_allocate_));
608  p0 = simple_allocator_->allocate(centroids_allocate_ * sizeof(RealType));
609  p1 = simple_allocator_->allocate(centroids_allocate_ * sizeof(IndexType));
610  centroids_ = Centroids<RealType, IndexType>(
611  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, centroids_allocate_),
612  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, centroids_allocate_));
613  }
614 }
615 
616 // Return total_weight * (f_inv(f(q_{i-1})+1) - q_{i-1})
617 template <typename RealType, typename IndexType>
618 DEVICE IndexType
620  IndexType const total_weight,
621  RealType const c) {
622  IndexType const max_bins = centroids_.capacity(); // "compression parameter" delta
623  if (total_weight <= max_bins) {
624  return 0;
625  } else if (use_linear_scaling_function_) {
626  // Whitepaper scaling function k=0.
627  return 2 * total_weight / max_bins;
628  } else {
629  // Whitepaper scaling function k=1.
630  RealType const x = 2.0 * sum / total_weight - 1; // = 2 q_{i-1} - 1
631  RealType const f_inv = 0.5 + 0.5 * std::sin(c + std::asin(x));
632  constexpr RealType eps = 1e-5; // round-off epsilon for floor().
633  IndexType const dsum = static_cast<IndexType>(total_weight * f_inv + eps);
634  return dsum < sum ? 0 : dsum - sum;
635  }
636 }
637 
638 // Assumes buf_ consists only of singletons.
639 template <typename RealType, typename IndexType>
641  if (buf_.size()) {
642  gpu_enabled::sort(buf_.sums_.begin(), buf_.sums_.end());
643  buf_.min_ = buf_.sums_.front();
644  buf_.max_ = buf_.sums_.back();
645  mergeCentroids(buf_);
646  }
647 }
648 
649 // [QE-383] Make concurrent calls to mergeBufferFinal() thread-safe.
650 template <typename RealType, typename IndexType>
652  auto const call_once = [this] {
653  mergeBuffer();
654  assert(centroids_.size() <= buf_.capacity());
655  partialSumOfCounts(buf_.counts_.data());
656  };
657 #ifdef __CUDACC__
658  call_once();
659 #else
660  std::call_once(merge_buffer_final_once_, call_once);
661 #endif
662 }
663 
664 template <typename RealType, typename IndexType>
666  IndexType* counts,
667  IndexType size) {
668  if (size) {
669  if (buf_.capacity() == 0) {
670  buf_ = Centroids<RealType, IndexType>(sums, counts, size); // Set capacity and size
671  } else {
672  buf_.sums_.set(sums, size); // Does not change capacity
673  buf_.counts_.set(counts, size);
674  }
675  gpu_enabled::fill(buf_.counts_.begin(), buf_.counts_.end(), IndexType(1));
676  buf_.min_ = buf_.sums_.front();
677  buf_.max_ = buf_.sums_.back();
678  mergeCentroids(buf_);
679  }
680 }
681 
682 // Require:
683 // * buf centroids are not empty and sorted by (mean, -count)
684 // * buf.min_ and buf.max_ are correctly set.
685 // During filling stage, buf=buf_.
686 // During reduction, buf_=centroids_[i]
687 template <typename RealType, typename IndexType>
690  constexpr RealType two_pi = 6.283185307179586476925286766559005768e+00;
691  // Hoisted constant used in maxCardinality().
692  RealType const c = two_pi / centroids_.capacity();
693  // Loop over sorted sequence of buf and centroids_.
694  // Some latter centroids may be merged into the current centroid, so the number
695  // of iterations is only at most equal to the number of initial centroids.
697  for (CM cm(&buf, &centroids_, forward_); cm.hasNext(); cm.next()) {
698  // cm.prefixSum() == 0 on first iteration.
699  // Max cardinality for current centroid to be fully merged based on scaling function.
700  IndexType const max_cardinality = maxCardinality(cm.prefixSum(), cm.totalWeight(), c);
701  cm.merge(max_cardinality);
702  }
703  // Combine sorted centroids buf[0..curr_idx_] + centroids_[0..curr_idx_] if forward
704  centroids_.appendAndSortCurrent(buf);
705  buf.clear();
706  forward_ ^= true; // alternate merge direction on each call
707 }
708 
709 namespace {
710 template <typename CountsIterator>
711 DEVICE bool isSingleton(CountsIterator itr) {
712  return *itr == 1;
713 }
714 } // namespace
715 
716 // Assumes x < centroids_.counts_.front().
717 template <typename RealType, typename IndexType>
718 DEVICE RealType TDigest<RealType, IndexType>::firstCentroid(RealType const x) const {
719  if (x < 1) {
720  return min();
721  } else if (centroids_.size() == 1) {
722  return oneCentroid(x);
723  } else if (centroids_.counts_.front() == 2) {
724  RealType const sum = centroids_.sums_.front();
725  return x == 1 ? 0.5 * sum : sum - min();
726  } else {
727  RealType const count = centroids_.counts_.front();
728  RealType const dx = x - RealType(0.5) * (1 + count);
729  RealType const mean = (centroids_.sums_.front() - min()) / (count - 1);
730  return mean + slope(0, 0 < dx) * dx;
731  }
732 }
733 
734 // x is between first and last centroids.
735 template <typename RealType, typename IndexType>
736 DEVICE RealType
738  IndexType const idx1,
739  IndexType const prefix_sum) const {
740  if (isSingleton(centroids_.counts_.begin() + idx1)) {
741  RealType const sum1 = centroids_.sums_[idx1];
742  if (x == prefix_sum - centroids_.counts_[idx1]) {
743  if (isSingleton(centroids_.counts_.begin() + idx1 - 1)) {
744  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
745  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
746  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
747  }
748  }
749  return sum1;
750  } else {
751  RealType const dx = x + RealType(0.5) * centroids_.counts_[idx1] - prefix_sum;
752  IndexType const idx2 = idx1 + 2 * (0 < dx) - 1;
753  return centroids_.mean(idx1) + slope(idx1, idx2) * dx;
754  }
755 }
756 
757 // Assumes N - centroids_.counts_.back() <= x < N, and there is more than 1 centroid.
758 template <typename RealType, typename IndexType>
760  IndexType const N) const {
761  if (N - 1 < x) {
762  return max();
763  }
764  IndexType const idx1 = centroids_.size() - 1;
765  RealType const sum1 = centroids_.sums_[idx1];
766  IndexType const count1 = centroids_.counts_[idx1];
767  if (count1 == 1) { // => x == N - 1
768  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
769  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
770  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
771  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
772  } else {
773  return sum1;
774  }
775  } else if (count1 == 2) { // => 3 <= N
776  if (x == N - 1) {
777  return 0.5 * sum1;
778  } else if (x == N - 2) {
779  RealType const sum2 = centroids_.sums_[idx1 - 1];
780  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
781  return 0.5 * (sum2 + sum1 - max());
782  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
783  return 0.5 * (sum2 - min() + sum1 - max());
784  }
785  }
786  return sum1 - max();
787  } else { // => 3 <= count1
788  RealType const dx = x + RealType(0.5) * (count1 + 1) - N;
789  RealType const mean = (sum1 - max()) / (count1 - 1);
790  return mean + slope(idx1, idx1 - (dx < 0)) * dx;
791  }
792 }
793 
794 // Assumes there is only 1 centroid, and 1 <= x < centroids_.counts_.front().
795 template <typename RealType, typename IndexType>
796 DEVICE RealType TDigest<RealType, IndexType>::oneCentroid(RealType const x) const {
797  IndexType const N = centroids_.counts_.front();
798  if (N - 1 < x) { // includes case N == 1
799  return max();
800  } else if (N == 2) { // x == 1
801  return 0.5 * centroids_.sums_.front();
802  } else if (N == 3) { // 1 <= x <= 2
803  if (x == 2) {
804  return 0.5 * (centroids_.sums_.front() - min());
805  } else {
806  RealType const s = centroids_.sums_.front() - max();
807  return x == 1 ? 0.5 * s : s - min();
808  }
809  } else { // 3 < N
810  RealType const dx = x - RealType(0.5) * N;
811  RealType const mean = (centroids_.sums_.front() - (min() + max())) / (N - 2);
812  RealType const slope = 2 * (0 < dx ? max() - mean : mean - min()) / (N - 2);
813  return mean + slope * dx;
814  }
815 }
816 
817 // Return a partial sum of centroids_.counts_ stored in buf.
818 template <typename RealType, typename IndexType>
820  IndexType* const buf) const {
821  gpu_enabled::partial_sum(centroids_.counts_.begin(), centroids_.counts_.end(), buf);
822  return {buf, centroids_.size()};
823 }
824 
825 template <typename RealType, typename IndexType>
826 DEVICE RealType
828  RealType const q) const {
829  if (centroids_.size()) {
830  IndexType const N = partial_sum.back();
831  RealType const x = q * N;
832  auto const it1 = gpu_enabled::upper_bound(partial_sum.begin(), partial_sum.end(), x);
833  if (it1 == partial_sum.begin()) {
834  return firstCentroid(x);
835  } else if (it1 == partial_sum.end()) { // <==> 1 <= q
836  return max();
837  } else if (it1 + 1 == partial_sum.end()) {
838  return lastCentroid(x, N);
839  } else {
840  return interiorCentroid(x, it1 - partial_sum.begin(), *it1);
841  }
842  } else {
843  return centroids_.nan;
844  }
845 }
846 
847 // Requirement: 1 < M and 0 <= idx1, idx2 < M where M = Number of centroids.
848 // Return slope of line segment connecting idx1 and idx2.
849 // If equal then assume it is the section contained within an extrema.
850 // Centroid for idx1 is not a singleton, but idx2 may be.
851 template <typename RealType, typename IndexType>
853  IndexType idx2) const {
854  IndexType const M = centroids_.size();
855  if (idx1 == idx2) { // Line segment is contained in either the first or last centroid.
856  RealType const n = static_cast<RealType>(centroids_.counts_[idx1]);
857  RealType const s = centroids_.sums_[idx1];
858  return idx1 == 0 ? 2 * (s - n * min()) / ((n - 1) * (n - 1))
859  : 2 * (n * max() - s) / ((n - 1) * (n - 1));
860  } else {
861  bool const min1 = idx1 == 0; // idx1 is the min centroid
862  bool const max1 = idx1 == M - 1; // idx1 is the max centroid
863  bool const min2 = idx2 == 0; // idx2 is the min centroid
864  bool const max2 = idx2 == M - 1; // idx2 is the max centroid
865  RealType const n1 = static_cast<RealType>(centroids_.counts_[idx1] - min1 - max1);
866  RealType const s1 = centroids_.sums_[idx1] - (min1 ? min() : max1 ? max() : 0);
867  RealType const s2 = centroids_.sums_[idx2] - (min2 ? min() : max2 ? max() : 0);
868  if (isSingleton(centroids_.counts_.begin() + idx2)) {
869  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - s1) / (n1 * n1);
870  } else {
871  RealType const n2 = static_cast<RealType>(centroids_.counts_[idx2] - min2 - max2);
872  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - n2 * s1) / (n1 * n2 * (n1 + n2));
873  }
874  }
875 }
876 
877 } // namespace detail
878 
880 
881 } // namespace quantile
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
static constexpr RealType infinity
Definition: quantile.h:62
DEVICE void push_back(RealType const value, RealType const count)
Definition: quantile.h:114
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
std::vector< RealType > sums_
Definition: quantile.h:170
DEVICE size_type capacity() const
Definition: VectorView.h:60
Centroids< RealType, IndexType > * buf_
Definition: quantile.h:135
DEVICE void skipSubsequent(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:517
std::ostream & operator<<(std::ostream &out, Centroids< RealType, IndexType > const &centroids)
Definition: quantile.h:392
bool const use_linear_scaling_function_
Definition: quantile.h:197
CentroidsMemory(size_t const size)
Definition: quantile.h:174
DEVICE RealType mean(IndexType const i) const
Definition: quantile.h:94
DEVICE bool index(Centroids< RealType, IndexType > *centroid) const
Definition: quantile.h:488
DEVICE void resetIndices(bool const forward)
Definition: quantile.h:377
DEVICE Centroids< RealType, IndexType > & centroids()
Definition: quantile.h:241
DEVICE void moveNextToCurrent()
Definition: quantile.h:367
DEVICE void add(RealType value)
Definition: quantile.h:591
DEVICE VectorView< IndexType const > partialSumOfCounts(IndexType *const buf) const
Definition: quantile.h:819
DEVICE RealType quantile(RealType const q) const
Definition: quantile.h:270
std::once_flag merge_buffer_final_once_
Definition: quantile.h:192
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
DEVICE void merged(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:495
DEVICE bool mergeIfFits(Centroids &centroid, IndexType const max_count)
Definition: quantile.h:355
DEVICE bool hasNext() const
Definition: quantile.h:92
DEVICE TDigest(Memory &mem)
Definition: quantile.h:226
DEVICE void mergeBuffer()
Definition: quantile.h:640
DEVICE IndexType totalWeight() const
Definition: quantile.h:297
VectorView< RealType > sums()
Definition: quantile.h:177
DEVICE T * data()
Definition: VectorView.h:64
VectorView< RealType > sums_
Definition: quantile.h:60
VectorView< IndexType > counts_
Definition: quantile.h:61
std::vector< IndexType > counts_
Definition: quantile.h:171
DEVICE void clear()
Definition: VectorView.h:63
DEVICE void mergeBufferFinal()
Definition: quantile.h:651
#define DEVICE
constexpr double a
Definition: Utm.h:32
DEVICE RealType currMean() const
Definition: quantile.h:88
DEVICE RealType quantile()
Definition: quantile.h:274
DEVICE void setCentroids(Memory &mem)
Definition: quantile.h:284
DEVICE size_type size() const
Definition: VectorView.h:83
DEVICE TDigest(RealType q, SimpleAllocator *simple_allocator, IndexType buf_allocate, IndexType centroids_allocate)
Definition: quantile.h:231
Centroids< RealType, IndexType > * centroids_
Definition: quantile.h:136
DEVICE RealType max() const
Definition: quantile.h:202
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
DEVICE void set(T *data, size_type const size)
Definition: VectorView.h:79
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
DEVICE Centroids(VectorView< RealType > sums, VectorView< IndexType > counts)
Definition: quantile.h:72
DEVICE T * begin() const
Definition: VectorView.h:59
DEVICE CentroidsMerger(Centroids< RealType, IndexType > *buf, Centroids< RealType, IndexType > *centroids, bool const forward)
Definition: quantile.h:411
DEVICE void mergeTDigest(TDigest &t_digest)
Definition: quantile.h:257
DEVICE IndexType nextCount() const
Definition: quantile.h:101
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
DEVICE void setCentroids(VectorView< RealType > const sums, VectorView< IndexType > const counts)
Definition: quantile.h:290
DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const
Definition: quantile.h:852
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
Centroids< RealType, IndexType > buf_
Definition: quantile.h:188
IndexType const buf_allocate_
Definition: quantile.h:199
DEVICE RealType nextSum() const
Definition: quantile.h:103
DEVICE RealType lastCentroid(RealType const x, IndexType const N) const
Definition: quantile.h:759
std::optional< RealType > const q_
Definition: quantile.h:196
DEVICE void skipFirst(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:511
DEVICE void allocate()
Definition: quantile.h:601
static DEVICE void shiftRange(T *const begin, IndexType skipped, IndexType const merged, int const inc)
Definition: quantile.h:468
DEVICE void setBuffer(Memory &mem)
Definition: quantile.h:277
DEVICE bool operator()(Value const &a, Value const &b) const
Definition: quantile.h:311
static constexpr RealType nan
Definition: quantile.h:63
DEVICE Centroids< RealType, IndexType > * getNextCentroid() const
Definition: quantile.h:427
DEVICE IndexType prefixSum() const
Definition: quantile.h:161
DEVICE Centroids(RealType *sums, IndexType *counts, IndexType const size)
Definition: quantile.h:69
SimpleAllocator *const simple_allocator_
Definition: quantile.h:198
DEVICE void merge(IndexType const max_count)
Definition: quantile.h:536
DEVICE bool operator<(Centroids const &b) const
Definition: quantile.h:107
DEVICE bool hasNext() const
Definition: quantile.h:153
IndexType const centroids_allocate_
Definition: quantile.h:200
Centroids< RealType, IndexType > * curr_centroid_
Definition: quantile.h:137
DEVICE IndexType maxCardinality(IndexType const sum, IndexType const total_weight, RealType const c)
Definition: quantile.h:619
DEVICE size_t size() const
Definition: quantile.h:121
DEVICE IndexType currCount() const
Definition: quantile.h:86
DEVICE IndexType totalWeight() const
Definition: quantile.h:123
DEVICE bool isDifferentMean(Centroids< RealType, IndexType > *next_centroid) const
Definition: quantile.h:491
DEVICE T0 value0() const
Definition: DoubleSort.h:87
constexpr unsigned N
Definition: Utm.h:110
DEVICE IndexType capacity() const
Definition: quantile.h:77
DEVICE IndexType totalWeight() const
Definition: quantile.h:163
DEVICE RealType interiorCentroid(RealType const x, IndexType const idx1, IndexType const prefix_sum) const
Definition: quantile.h:737
DEVICE RealType oneCentroid(RealType const x) const
Definition: quantile.h:796
DEVICE T1 value1() const
Definition: DoubleSort.h:88
DEVICE void mergeCentroids(Centroids< RealType, IndexType > &)
Definition: quantile.h:688
string name
Definition: setup.in.py:72
constexpr double n
Definition: Utm.h:38
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
DEVICE T & back()
Definition: VectorView.h:57
DEVICE RealType min() const
Definition: quantile.h:203
DEVICE RealType firstCentroid(RealType const x) const
Definition: quantile.h:718
DEVICE bool isSingleton(CountsIterator itr)
Definition: quantile.h:711
DEVICE void appendAndSortCurrent(Centroids &buff)
Definition: quantile.h:321
DEVICE bool hasCurr() const
Definition: quantile.h:90
DEVICE void mergeSorted(RealType *sums, IndexType *counts, IndexType size)
Definition: quantile.h:665
VectorView< IndexType > counts()
Definition: quantile.h:178
Centroids< RealType, IndexType > centroids_
Definition: quantile.h:189
DEVICE T * end() const
Definition: VectorView.h:67