OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
quantile.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #pragma once
26 
27 #include "DoubleSort.h"
28 #include "SimpleAllocator.h"
29 #include "VectorView.h"
30 #include "gpu_enabled.h"
31 
32 #ifndef __CUDACC__
33 #include <iomanip>
34 #include <mutex>
35 #include <ostream>
36 #endif
37 
38 #include <cmath>
39 #include <limits>
40 #include <memory>
41 #include <numeric>
42 #include <optional>
43 #include <type_traits>
44 
45 namespace quantile {
46 
47 namespace detail {
48 
49 template <typename RealType, typename IndexType>
50 struct Centroid {
51  RealType sum_;
52  IndexType count_;
53 };
54 
55 template <typename RealType, typename IndexType>
56 struct Centroids {
57  IndexType curr_idx_; // used during mergeCentroids algorithm
58  IndexType next_idx_; // used during mergeCentroids algorithm
59  int inc_; // 1 or -1 : forward or reverse iteration
62  static constexpr RealType infinity = std::numeric_limits<RealType>::infinity();
63  static constexpr RealType nan = std::numeric_limits<RealType>::quiet_NaN();
64  RealType max_{-infinity};
65  RealType min_{infinity};
66 
67  Centroids() = default;
68 
69  DEVICE Centroids(RealType* sums, IndexType* counts, IndexType const size)
70  : sums_(sums, size, size), counts_(counts, size, size) {}
71 
73  : sums_(sums), counts_(counts) {}
74 
76 
77  DEVICE IndexType capacity() const { return sums_.capacity(); }
78 
79  DEVICE void clear() {
80  sums_.clear();
81  counts_.clear();
82  max_ = -infinity;
83  min_ = infinity;
84  }
85 
86  DEVICE IndexType currCount() const { return counts_[curr_idx_]; }
87 
88  DEVICE RealType currMean() const { return mean(curr_idx_); }
89 
90  DEVICE bool hasCurr() const { return curr_idx_ < size(); }
91 
92  DEVICE bool hasNext() const { return next_idx_ < size(); }
93 
94  DEVICE RealType mean(IndexType const i) const { return sums_[i] / counts_[i]; }
95 
96  // Return true if centroid was merged, false if not.
97  DEVICE bool mergeIfFits(Centroids& centroid, IndexType const max_count);
98 
100 
101  DEVICE IndexType nextCount() const { return counts_[next_idx_]; }
102 
103  DEVICE RealType nextSum() const { return sums_[next_idx_]; }
104 
105  // Assumes this->hasNext() and b.hasNext().
106  // Order by (nextMean(), -nextCount()).
107  DEVICE bool operator<(Centroids const& b) const {
108  // lhs < rhs is same as nextMean() < b.nextMean() without division
109  RealType const lhs = nextSum() * b.nextCount();
110  RealType const rhs = b.nextSum() * nextCount();
111  return lhs < rhs || (lhs == rhs && b.nextCount() < nextCount());
112  }
113 
114  DEVICE void push_back(RealType const value, RealType const count) {
115  sums_.push_back(value);
116  counts_.push_back(count);
117  }
118 
119  DEVICE void resetIndices(bool const forward);
120 
121  DEVICE size_t size() const { return sums_.size(); }
122 
123  DEVICE IndexType totalWeight() const {
124  return gpu_enabled::accumulate(counts_.begin(), counts_.end(), IndexType(0));
125  }
126 
127 #ifndef __CUDACC__
128  template <typename RealType2, typename IndexType2>
129  friend std::ostream& operator<<(std::ostream&, Centroids<RealType2, IndexType2> const&);
130 #endif
131 };
132 
133 template <typename RealType, typename IndexType>
138  IndexType prefix_sum_{0}; // Prefix-sum of centroid counts.
139  IndexType const total_weight_;
140  bool const forward_;
141 
142  DEVICE void mergeMinMax();
143  DEVICE void setCurrCentroid();
144 
145  public:
148  bool const forward);
149 
150  // nullptr if !hasNext()
152 
153  DEVICE bool hasNext() const { return buf_->hasNext() || centroids_->hasNext(); }
154 
155  // Merge as many centroids as possible for count <= max_count.
156  DEVICE void merge(IndexType const max_count);
157 
158  // Assume curr_centroid_ is fully merged.
159  DEVICE void next();
160 
161  DEVICE IndexType prefixSum() const { return prefix_sum_; }
162 
163  DEVICE IndexType totalWeight() const { return total_weight_; }
164 };
165 
166 // Instantiated on host for reserving memory for Centroids.
167 template <typename RealType, typename IndexType>
169  // TODO CUDA memory containers
170  std::vector<RealType> sums_;
171  std::vector<IndexType> counts_;
172 
173  public:
174  explicit CentroidsMemory(size_t const size) : sums_(size), counts_(size) {}
175  size_t nbytes() const { return sums_.size() * (sizeof(RealType) + sizeof(IndexType)); }
176  size_t size() const { return sums_.size(); }
177  VectorView<RealType> sums() { return {sums_.data(), sums_.size(), sums_.size()}; }
179  return {counts_.data(), counts_.size(), counts_.size()};
180  }
181 };
182 
183 template <typename RealType, typename IndexType = size_t>
184 class TDigest {
185  // buf_ has two purposes, unless an externally managed buffer is used instead:
186  // * Used as the incoming buffer that gets filled before merging into centroids_.
187  // * After all data has been merged, it is used as a partial sum for centroids_.
190  bool forward_{true}; // alternate direction on each call to mergeCentroids().
191 #ifndef __CUDACC__
193 #endif
195 
196  // simple_allocator_, buf_allocate_, centroids_allocate_ are used only by allocate().
197  std::optional<RealType> q_{std::nullopt}; // Optional preset quantile parameter.
200  IndexType buf_allocate_{0};
201  IndexType centroids_allocate_{0};
202 
203  DEVICE RealType max() const {
204  return centroids_.max_;
205  }
206  DEVICE RealType min() const {
207  return centroids_.min_;
208  }
209 
210  DEVICE IndexType maxCardinality(IndexType const sum,
211  IndexType const total_weight,
212  RealType const c);
213 
214  // Require: centroids are sorted by (mean, -count)
216 
217  DEVICE RealType firstCentroid(RealType const x) const;
218  DEVICE RealType interiorCentroid(RealType const x,
219  IndexType const idx1,
220  IndexType const prefix_sum) const;
221  DEVICE RealType lastCentroid(RealType const x, IndexType const N) const;
222  DEVICE RealType oneCentroid(RealType const x) const;
223 
224  DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const;
225 
226  public:
228 
229  TDigest() = default;
230 
232  : centroids_(mem.sums().data(), mem.counts().data(), mem.size()) {
233  centroids_.clear();
234  }
235 
236  DEVICE TDigest(RealType q,
237  SimpleAllocator* simple_allocator,
238  IndexType buf_allocate,
239  IndexType centroids_allocate)
240  : q_(q)
241  , use_linear_scaling_function_(q_ && 0.1 <= *q_ && *q_ <= 0.9)
242  , simple_allocator_(simple_allocator)
243  , buf_allocate_(buf_allocate)
244  , centroids_allocate_(centroids_allocate) {}
245 
246  // Called by approx_quantile_jit_rt(). Move everything except the unmovable mutex.
248  buf_ = std::move(rhs.buf_);
249  centroids_ = std::move(rhs.centroids_);
250  forward_ = std::move(rhs.forward_);
251  merge_buffer_final_called_ = std::move(rhs.merge_buffer_final_called_);
252  q_ = std::move(rhs.q_);
253  use_linear_scaling_function_ = std::move(rhs.use_linear_scaling_function_);
254  simple_allocator_ = std::move(rhs.simple_allocator_);
255  buf_allocate_ = std::move(rhs.buf_allocate_);
256  centroids_allocate_ = std::move(rhs.centroids_allocate_);
257  return *this;
258  }
259 
260  // Size of reserved buffer+centroids space used for each TDigest in bytes.
261  static IndexType nbytes(IndexType buf_allocate, IndexType centroids_allocate) {
262  return (buf_allocate + centroids_allocate) * (sizeof(RealType) + sizeof(IndexType));
263  }
264 
266  return centroids_;
267  }
268 
269  // Store value to buf_, and merge when full.
270  DEVICE void add(RealType value);
271 
272  // Allocate memory if needed by simple_allocator_.
273  DEVICE void allocate();
274 
275  DEVICE void mergeBuffer();
276 
277  // Calls mergeBuffer() w/ std::call_once and turns buf_ into a partial sum of counts.
278  DEVICE void mergeBufferFinal();
279 
280  // Import from sorted memory range. Ok to change values during merge.
281  DEVICE void mergeSorted(RealType* sums, IndexType* counts, IndexType size);
282 
283  DEVICE void mergeTDigest(TDigest& t_digest) {
284  mergeBuffer();
285  t_digest.mergeBuffer();
286  mergeCentroids(t_digest.centroids_);
287  }
288 
289  // Write partial sum of centroids_.counts_ into buf and return as a VectorView.
290  DEVICE VectorView<IndexType const> partialSumOfCounts(IndexType* const buf) const;
291 
293  RealType const q) const;
294 
295  // Must be called after mergeBufferFinal().
296  DEVICE RealType quantile(RealType const q) const {
297  return quantile({buf_.counts_.data(), centroids_.size()}, q);
298  }
299 
300  DEVICE RealType quantile() {
301  return q_ ? quantile(*q_) : centroids_.nan;
302  }
303 
304  // Assumes mem is externally managed.
305  DEVICE void setBuffer(Memory& mem) {
307  mem.sums().data(), mem.counts().data(), mem.size());
308  buf_.clear();
309  }
310 
311  // Assumes mem is externally managed.
314  mem.sums().data(), mem.counts().data(), mem.size());
315  centroids_.clear();
316  }
317 
319  VectorView<IndexType> const counts) {
321  centroids_.clear();
322  }
323 
324  // Total number of data points in centroids_.
325  DEVICE IndexType totalWeight() const {
326  return centroids_.totalWeight();
327  }
328 };
329 
330 // Class template member definitions
331 
332 // Class Centroids<>
333 
334 namespace {
335 
336 // Order by (mean, -count)
337 template <typename RealType, typename IndexType>
340  // Assume value1() is positive and use multiplication instead of division.
341  DEVICE bool operator()(Value const& a, Value const& b) const {
342  auto const lhs = a.value0() * b.value1();
343  auto const rhs = b.value0() * a.value1();
344  return lhs < rhs || (lhs == rhs && b.value1() < a.value1());
345  }
346 };
347 
348 } // namespace
349 
350 template <typename RealType, typename IndexType>
352  if (inc_ == -1 && curr_idx_ != 0) {
353  // Shift data to the left left by curr_idx_.
354  // Prefer to copy, but thrust::copy doesn't support overlapping ranges.
355  // Reverse instead, which gets sorted below.
356  // gpu_enabled::copy(sums_.begin() + curr_idx_, sums_.end(), sums_.begin());
357  // gpu_enabled::copy(counts_.begin() + curr_idx_, counts_.end(), counts_.begin());
358  gpu_enabled::reverse(sums_.begin(), sums_.end());
359  gpu_enabled::reverse(counts_.begin(), counts_.end());
360  }
361  // Shift VectorViews to the right by buff.curr_idx_.
362  IndexType const offset = inc_ == 1 ? 0 : buff.curr_idx_;
363  IndexType const buff_size =
364  inc_ == 1 ? buff.curr_idx_ + 1 : buff.size() - buff.curr_idx_;
365  VectorView<RealType> buff_sums(buff.sums_.begin() + offset, buff_size, buff_size);
366  VectorView<IndexType> buff_counts(buff.counts_.begin() + offset, buff_size, buff_size);
367  // Copy buff into sums_ and counts_.
368  IndexType const curr_size = inc_ == 1 ? curr_idx_ + 1 : size() - curr_idx_;
369  IndexType const total_size = curr_size + buff_sums.size();
370  assert(total_size <= sums_.capacity()); // TODO proof of inequality
371  sums_.resize(total_size);
372  gpu_enabled::copy(buff_sums.begin(), buff_sums.end(), sums_.begin() + curr_size);
373  assert(total_size <= counts_.capacity());
374  counts_.resize(total_size);
375  gpu_enabled::copy(buff_counts.begin(), buff_counts.end(), counts_.begin() + curr_size);
376 
377  // Sort sums_ and counts_ by (mean, -count).
378  double_sort::Iterator<RealType, IndexType> const begin(sums_.begin(), counts_.begin());
379  double_sort::Iterator<RealType, IndexType> const end(sums_.end(), counts_.end());
380  gpu_enabled::sort(begin, end, OrderByMeanAscCountDesc<RealType, IndexType>());
381 }
382 
383 // Return true if centroid was merged, false if not.
384 template <typename RealType, typename IndexType>
386  IndexType const max_count) {
387  if (counts_[curr_idx_] + centroid.nextCount() <= max_count) {
388  sums_[curr_idx_] += centroid.nextSum();
389  counts_[curr_idx_] += centroid.nextCount();
390  centroid.next_idx_ += centroid.inc_;
391  return true;
392  }
393  return false;
394 }
395 
396 template <typename RealType, typename IndexType>
398  curr_idx_ += inc_;
399  if (curr_idx_ != next_idx_) {
400  sums_[curr_idx_] = sums_[next_idx_];
401  counts_[curr_idx_] = counts_[next_idx_];
402  }
403  next_idx_ += inc_;
404 }
405 
406 template <typename RealType, typename IndexType>
408  if (forward) {
409  inc_ = 1;
410  curr_idx_ = ~IndexType(0);
411  } else {
412  inc_ = -1;
413  curr_idx_ = size();
414  }
415  static_assert(std::is_unsigned<IndexType>::value,
416  "IndexType must be an unsigned type.");
417  next_idx_ = curr_idx_ + inc_;
418 }
419 
420 #ifndef __CUDACC__
421 template <typename RealType, typename IndexType>
422 std::ostream& operator<<(std::ostream& out,
423  Centroids<RealType, IndexType> const& centroids) {
424  out << "Centroids<" << typeid(RealType).name() << ',' << typeid(IndexType).name()
425  << ">(size(" << centroids.size() << ") curr_idx_(" << centroids.curr_idx_
426  << ") next_idx_(" << centroids.next_idx_ << ") sums_(";
427  for (IndexType i = 0; i < centroids.sums_.size(); ++i) {
428  out << (i ? " " : "") << std::setprecision(20) << centroids.sums_[i];
429  }
430  out << ") counts_(";
431  for (IndexType i = 0; i < centroids.counts_.size(); ++i) {
432  out << (i ? " " : "") << centroids.counts_[i];
433  }
434  return out << "))";
435 }
436 #endif
437 
438 // Class CentroidsMerger<>
439 
440 template <typename RealType, typename IndexType>
444  bool const forward)
445  : buf_(buf)
446  , centroids_(centroids)
447  , total_weight_(centroids->totalWeight() + buf->totalWeight())
448  , forward_(forward) {
449  buf_->resetIndices(forward_);
450  centroids_->resetIndices(forward_);
451  mergeMinMax();
452  setCurrCentroid();
453 }
454 
455 template <typename RealType, typename IndexType>
458  if (buf_->hasNext()) {
459  if (centroids_->hasNext()) {
460  return (*buf_ < *centroids_) == forward_ ? buf_ : centroids_;
461  }
462  return buf_;
463  } else if (centroids_->hasNext()) {
464  return centroids_;
465  }
466  return nullptr;
467 }
468 
469 namespace {
470 
471 // Helper struct for mergeCentroids() for tracking skipped centroids.
472 // A consecutive sequence of incoming centroids with the same mean
473 // and exceed max_count are skipped before expensive reshuffling of memory.
474 template <typename RealType, typename IndexType>
475 class Skipped {
476  struct Data {
477  Centroids<RealType, IndexType>* centroid_{nullptr};
478  IndexType start_{0};
479  IndexType count_merged_{0};
480  IndexType count_skipped_{0};
481  } data_[2];
483 
484  DEVICE static void shiftCentroids(Data& data) {
485  if (data.count_merged_) {
486  shiftRange(data.centroid_->sums_.begin() + data.start_,
487  data.count_skipped_,
488  data.count_merged_,
489  data.centroid_->inc_);
490  shiftRange(data.centroid_->counts_.begin() + data.start_,
491  data.count_skipped_,
492  data.count_merged_,
493  data.centroid_->inc_);
494  data.start_ += data.centroid_->inc_ * data.count_merged_;
495  }
496  }
497  template <typename T>
498  DEVICE static void shiftRange(T* const begin,
499  IndexType skipped,
500  IndexType const merged,
501  int const inc) {
502 #ifdef __CUDACC__
503  T* src = begin + inc * (skipped - 1);
504  T* dst = src + inc * merged;
505  for (; skipped; --skipped, src -= inc, dst -= inc) {
506  *dst = *src;
507  }
508 #else
509  if (inc == 1) {
510  std::copy_backward(begin, begin + skipped, begin + skipped + merged);
511  } else {
512  std::copy(begin + 1 - skipped, begin + 1, begin + 1 - skipped - merged);
513  }
514 #endif
515  }
516 
517  public:
519  return data_[0].centroid_ != centroid;
520  }
522  return mean_.sum_ * next_centroid->nextCount() !=
523  next_centroid->nextSum() * mean_.count_;
524  }
526  IndexType const idx = index(next_centroid);
527  if (data_[idx].count_skipped_) {
528  ++data_[idx].count_merged_;
529  }
530  }
531  DEVICE operator bool() const {
532  return data_[0].centroid_;
533  }
534  // Shift skipped centroids over merged centroids, and rewind next_idx_.
536  shiftCentroids(data_[0]);
537  data_[0].centroid_->next_idx_ = data_[0].start_;
538  if (data_[1].centroid_) {
539  shiftCentroids(data_[1]);
540  data_[1].centroid_->next_idx_ = data_[1].start_;
541  }
542  }
544  mean_ = Centroid<RealType, IndexType>{next_centroid->nextSum(),
545  next_centroid->nextCount()};
546  data_[0] = {next_centroid, next_centroid->next_idx_, 0, 1};
547  next_centroid->next_idx_ += next_centroid->inc_;
548  }
550  IndexType const idx = index(next_centroid);
551  if (idx == 1 && data_[1].centroid_ == nullptr) {
552  data_[1] = {next_centroid, next_centroid->next_idx_, 0, 1};
553  } else {
554  if (data_[idx].count_merged_) {
555  shiftCentroids(data_[idx]);
556  data_[idx].count_merged_ = 0;
557  }
558  ++data_[idx].count_skipped_;
559  }
560  next_centroid->next_idx_ += next_centroid->inc_;
561  }
562 };
563 
564 } // namespace
565 
566 // Merge as many centroids as possible for count <= max_count.
567 template <typename RealType, typename IndexType>
568 DEVICE void CentroidsMerger<RealType, IndexType>::merge(IndexType const max_count) {
569  Skipped<RealType, IndexType> skipped;
570  while (auto* next_centroid = getNextCentroid()) {
571  if (skipped) {
572  if (skipped.isDifferentMean(next_centroid)) {
573  break;
574  } else if (curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
575  skipped.merged(next_centroid);
576  } else {
577  skipped.skipSubsequent(next_centroid);
578  }
579  } else if (!curr_centroid_->mergeIfFits(*next_centroid, max_count)) {
580  skipped.skipFirst(next_centroid);
581  }
582  }
583  if (skipped) {
584  skipped.shiftCentroidsAndSetNext();
585  }
586 }
587 
588 // Track min/max without assuming the data points exist in any particular centroids.
589 // Otherwise to assume and track their existence in the min/max centroids introduces
590 // significant complexity. For example, the min centroid may not remain the min
591 // centroid if it merges into it the 3rd smallest centroid, skipping the 2nd due to
592 // the scaling function constraint.
593 // When the quantile() is calculated, the min/max data points are assumed to exist
594 // in the min/max centroids for the purposes of calculating the quantile within those
595 // centroids.
596 template <typename RealType, typename IndexType>
598  if (centroids_->max_ < buf_->max_) {
599  centroids_->max_ = buf_->max_;
600  }
601  if (buf_->min_ < centroids_->min_) {
602  centroids_->min_ = buf_->min_;
603  }
604 }
605 
606 // Assume curr_centroid_ is fully merged.
607 template <typename RealType, typename IndexType>
609  prefix_sum_ += curr_centroid_->currCount();
610  setCurrCentroid();
611 }
612 
613 template <typename RealType, typename IndexType>
615  if ((curr_centroid_ = getNextCentroid())) {
616  curr_centroid_->moveNextToCurrent();
617  }
618 }
619 
620 // class TDigest<>
621 
622 template <typename RealType, typename IndexType>
624  if (buf_.sums_.full()) {
625  mergeBuffer();
626  }
627  buf_.sums_.push_back(value);
628  buf_.counts_.push_back(1);
629 }
630 
631 // Assumes buf_ is allocated iff centroids_ is allocated.
632 template <typename RealType, typename IndexType>
634  if (buf_.capacity() == 0) {
635  auto* p0 = simple_allocator_->allocate(buf_allocate_ * sizeof(RealType));
636  auto* p1 = simple_allocator_->allocate(buf_allocate_ * sizeof(IndexType));
638  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, buf_allocate_),
639  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, buf_allocate_));
640  p0 = simple_allocator_->allocate(centroids_allocate_ * sizeof(RealType));
641  p1 = simple_allocator_->allocate(centroids_allocate_ * sizeof(IndexType));
642  centroids_ = Centroids<RealType, IndexType>(
643  VectorView<RealType>(reinterpret_cast<RealType*>(p0), 0, centroids_allocate_),
644  VectorView<IndexType>(reinterpret_cast<IndexType*>(p1), 0, centroids_allocate_));
645  }
646 }
647 
648 // Return total_weight * (f_inv(f(q_{i-1})+1) - q_{i-1})
649 template <typename RealType, typename IndexType>
650 DEVICE IndexType
652  IndexType const total_weight,
653  RealType const c) {
654  IndexType const max_bins = centroids_.capacity(); // "compression parameter" delta
655  if (total_weight <= max_bins) {
656  return 0;
657  } else if (use_linear_scaling_function_) {
658  // Whitepaper scaling function k=0.
659  return 2 * total_weight / max_bins;
660  } else {
661  // Whitepaper scaling function k=1.
662  RealType const x = 2.0 * sum / total_weight - 1; // = 2 q_{i-1} - 1
663  RealType const f_inv = 0.5 + 0.5 * std::sin(c + std::asin(x));
664  constexpr RealType eps = 1e-5; // round-off epsilon for floor().
665  IndexType const dsum = static_cast<IndexType>(total_weight * f_inv + eps);
666  return dsum < sum ? 0 : dsum - sum;
667  }
668 }
669 
670 // Assumes buf_ consists only of singletons.
671 template <typename RealType, typename IndexType>
673  if (buf_.size()) {
674  gpu_enabled::sort(buf_.sums_.begin(), buf_.sums_.end());
675  buf_.min_ = buf_.sums_.front();
676  buf_.max_ = buf_.sums_.back();
677  mergeCentroids(buf_);
678  }
679 }
680 
681 // [QE-383] Make concurrent calls to mergeBufferFinal() thread-safe.
682 template <typename RealType, typename IndexType>
684 #ifndef __CUDACC__
685  std::lock_guard<std::mutex> lock_guard(merge_buffer_final_called_mutex_);
686 #endif
687  if (!merge_buffer_final_called_) {
688  mergeBuffer();
689  assert(centroids_.size() <= buf_.capacity());
690  partialSumOfCounts(buf_.counts_.data());
691  merge_buffer_final_called_ = true;
692  }
693 }
694 
695 template <typename RealType, typename IndexType>
697  IndexType* counts,
698  IndexType size) {
699  if (size) {
700  if (buf_.capacity() == 0) {
701  buf_ = Centroids<RealType, IndexType>(sums, counts, size); // Set capacity and size
702  } else {
703  buf_.sums_.set(sums, size); // Does not change capacity
704  buf_.counts_.set(counts, size);
705  }
706  gpu_enabled::fill(buf_.counts_.begin(), buf_.counts_.end(), IndexType(1));
707  buf_.min_ = buf_.sums_.front();
708  buf_.max_ = buf_.sums_.back();
709  mergeCentroids(buf_);
710  }
711 }
712 
713 // Require:
714 // * buf centroids are not empty and sorted by (mean, -count)
715 // * buf.min_ and buf.max_ are correctly set.
716 // During filling stage, buf=buf_.
717 // During reduction, buf_=centroids_[i]
718 template <typename RealType, typename IndexType>
721  constexpr RealType two_pi = 6.283185307179586476925286766559005768e+00;
722  // Hoisted constant used in maxCardinality().
723  RealType const c = two_pi / centroids_.capacity();
724  // Loop over sorted sequence of buf and centroids_.
725  // Some latter centroids may be merged into the current centroid, so the number
726  // of iterations is only at most equal to the number of initial centroids.
728  for (CM cm(&buf, &centroids_, forward_); cm.hasNext(); cm.next()) {
729  // cm.prefixSum() == 0 on first iteration.
730  // Max cardinality for current centroid to be fully merged based on scaling function.
731  IndexType const max_cardinality = maxCardinality(cm.prefixSum(), cm.totalWeight(), c);
732  cm.merge(max_cardinality);
733  }
734  // Combine sorted centroids buf[0..curr_idx_] + centroids_[0..curr_idx_] if forward
735  centroids_.appendAndSortCurrent(buf);
736  buf.clear();
737  forward_ ^= true; // alternate merge direction on each call
738 }
739 
740 namespace {
741 template <typename CountsIterator>
742 DEVICE bool isSingleton(CountsIterator itr) {
743  return *itr == 1;
744 }
745 } // namespace
746 
747 // Assumes x < centroids_.counts_.front().
748 template <typename RealType, typename IndexType>
749 DEVICE RealType TDigest<RealType, IndexType>::firstCentroid(RealType const x) const {
750  if (x < 1) {
751  return min();
752  } else if (centroids_.size() == 1) {
753  return oneCentroid(x);
754  } else if (centroids_.counts_.front() == 2) {
755  RealType const sum = centroids_.sums_.front();
756  return x == 1 ? 0.5 * sum : sum - min();
757  } else {
758  RealType const count = centroids_.counts_.front();
759  RealType const dx = x - RealType(0.5) * (1 + count);
760  RealType const mean = (centroids_.sums_.front() - min()) / (count - 1);
761  return mean + slope(0, 0 < dx) * dx;
762  }
763 }
764 
765 // x is between first and last centroids.
766 template <typename RealType, typename IndexType>
767 DEVICE RealType
769  IndexType const idx1,
770  IndexType const prefix_sum) const {
771  if (isSingleton(centroids_.counts_.begin() + idx1)) {
772  RealType const sum1 = centroids_.sums_[idx1];
773  if (x == prefix_sum - centroids_.counts_[idx1]) {
774  if (isSingleton(centroids_.counts_.begin() + idx1 - 1)) {
775  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
776  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
777  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
778  }
779  }
780  return sum1;
781  } else {
782  RealType const dx = x + RealType(0.5) * centroids_.counts_[idx1] - prefix_sum;
783  IndexType const idx2 = idx1 + 2 * (0 < dx) - 1;
784  return centroids_.mean(idx1) + slope(idx1, idx2) * dx;
785  }
786 }
787 
788 // Assumes N - centroids_.counts_.back() <= x < N, and there is more than 1 centroid.
789 template <typename RealType, typename IndexType>
791  IndexType const N) const {
792  if (N - 1 < x) {
793  return max();
794  }
795  IndexType const idx1 = centroids_.size() - 1;
796  RealType const sum1 = centroids_.sums_[idx1];
797  IndexType const count1 = centroids_.counts_[idx1];
798  if (count1 == 1) { // => x == N - 1
799  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
800  return 0.5 * (centroids_.sums_[idx1 - 1] + sum1);
801  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
802  return 0.5 * (centroids_.sums_[idx1 - 1] - min() + sum1);
803  } else {
804  return sum1;
805  }
806  } else if (count1 == 2) { // => 3 <= N
807  if (x == N - 1) {
808  return 0.5 * sum1;
809  } else if (x == N - 2) {
810  RealType const sum2 = centroids_.sums_[idx1 - 1];
811  if (isSingleton(centroids_.counts_.begin() + (idx1 - 1))) {
812  return 0.5 * (sum2 + sum1 - max());
813  } else if (idx1 == 1 && centroids_.counts_[0] == 2) {
814  return 0.5 * (sum2 - min() + sum1 - max());
815  }
816  }
817  return sum1 - max();
818  } else { // => 3 <= count1
819  RealType const dx = x + RealType(0.5) * (count1 + 1) - N;
820  RealType const mean = (sum1 - max()) / (count1 - 1);
821  return mean + slope(idx1, idx1 - (dx < 0)) * dx;
822  }
823 }
824 
825 // Assumes there is only 1 centroid, and 1 <= x < centroids_.counts_.front().
826 template <typename RealType, typename IndexType>
827 DEVICE RealType TDigest<RealType, IndexType>::oneCentroid(RealType const x) const {
828  IndexType const N = centroids_.counts_.front();
829  if (N - 1 < x) { // includes case N == 1
830  return max();
831  } else if (N == 2) { // x == 1
832  return 0.5 * centroids_.sums_.front();
833  } else if (N == 3) { // 1 <= x <= 2
834  if (x == 2) {
835  return 0.5 * (centroids_.sums_.front() - min());
836  } else {
837  RealType const s = centroids_.sums_.front() - max();
838  return x == 1 ? 0.5 * s : s - min();
839  }
840  } else { // 3 < N
841  RealType const dx = x - RealType(0.5) * N;
842  RealType const mean = (centroids_.sums_.front() - (min() + max())) / (N - 2);
843  RealType const slope = 2 * (0 < dx ? max() - mean : mean - min()) / (N - 2);
844  return mean + slope * dx;
845  }
846 }
847 
848 // Return a partial sum of centroids_.counts_ stored in buf.
849 template <typename RealType, typename IndexType>
851  IndexType* const buf) const {
852  gpu_enabled::partial_sum(centroids_.counts_.begin(), centroids_.counts_.end(), buf);
853  return {buf, centroids_.size()};
854 }
855 
856 template <typename RealType, typename IndexType>
857 DEVICE RealType
859  RealType const q) const {
860  if (centroids_.size()) {
861  IndexType const N = partial_sum.back();
862  RealType const x = q * N;
863  auto const it1 = gpu_enabled::upper_bound(partial_sum.begin(), partial_sum.end(), x);
864  if (it1 == partial_sum.begin()) {
865  return firstCentroid(x);
866  } else if (it1 == partial_sum.end()) { // <==> 1 <= q
867  return max();
868  } else if (it1 + 1 == partial_sum.end()) {
869  return lastCentroid(x, N);
870  } else {
871  return interiorCentroid(x, it1 - partial_sum.begin(), *it1);
872  }
873  } else {
874  return centroids_.nan;
875  }
876 }
877 
878 // Requirement: 1 < M and 0 <= idx1, idx2 < M where M = Number of centroids.
879 // Return slope of line segment connecting idx1 and idx2.
880 // If equal then assume it is the section contained within an extrema.
881 // Centroid for idx1 is not a singleton, but idx2 may be.
882 template <typename RealType, typename IndexType>
884  IndexType idx2) const {
885  IndexType const M = centroids_.size();
886  if (idx1 == idx2) { // Line segment is contained in either the first or last centroid.
887  RealType const n = static_cast<RealType>(centroids_.counts_[idx1]);
888  RealType const s = centroids_.sums_[idx1];
889  return idx1 == 0 ? 2 * (s - n * min()) / ((n - 1) * (n - 1))
890  : 2 * (n * max() - s) / ((n - 1) * (n - 1));
891  } else {
892  bool const min1 = idx1 == 0; // idx1 is the min centroid
893  bool const max1 = idx1 == M - 1; // idx1 is the max centroid
894  bool const min2 = idx2 == 0; // idx2 is the min centroid
895  bool const max2 = idx2 == M - 1; // idx2 is the max centroid
896  RealType const n1 = static_cast<RealType>(centroids_.counts_[idx1] - min1 - max1);
897  RealType const s1 = centroids_.sums_[idx1] - (min1 ? min() : max1 ? max() : 0);
898  RealType const s2 = centroids_.sums_[idx2] - (min2 ? min() : max2 ? max() : 0);
899  if (isSingleton(centroids_.counts_.begin() + idx2)) {
900  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - s1) / (n1 * n1);
901  } else {
902  RealType const n2 = static_cast<RealType>(centroids_.counts_[idx2] - min2 - max2);
903  return (idx1 < idx2 ? 2 : -2) * (n1 * s2 - n2 * s1) / (n1 * n2 * (n1 + n2));
904  }
905  }
906 }
907 
908 } // namespace detail
909 
911 
912 } // namespace quantile
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::lock_guard< T > lock_guard
static constexpr RealType infinity
Definition: quantile.h:62
DEVICE void push_back(RealType const value, RealType const count)
Definition: quantile.h:114
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
std::vector< RealType > sums_
Definition: quantile.h:170
DEVICE size_type capacity() const
Definition: VectorView.h:60
Centroids< RealType, IndexType > * buf_
Definition: quantile.h:135
DEVICE void skipSubsequent(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:549
std::ostream & operator<<(std::ostream &out, Centroids< RealType, IndexType > const &centroids)
Definition: quantile.h:422
CentroidsMemory(size_t const size)
Definition: quantile.h:174
DEVICE RealType mean(IndexType const i) const
Definition: quantile.h:94
DEVICE bool index(Centroids< RealType, IndexType > *centroid) const
Definition: quantile.h:518
DEVICE void resetIndices(bool const forward)
Definition: quantile.h:407
DEVICE Centroids< RealType, IndexType > & centroids()
Definition: quantile.h:265
DEVICE void moveNextToCurrent()
Definition: quantile.h:397
DEVICE void add(RealType value)
Definition: quantile.h:623
DEVICE VectorView< IndexType const > partialSumOfCounts(IndexType *const buf) const
Definition: quantile.h:850
DEVICE RealType quantile(RealType const q) const
Definition: quantile.h:296
SimpleAllocator * simple_allocator_
Definition: quantile.h:199
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
DEVICE void merged(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:525
DEVICE bool mergeIfFits(Centroids &centroid, IndexType const max_count)
Definition: quantile.h:385
DEVICE bool hasNext() const
Definition: quantile.h:92
DEVICE TDigest(Memory &mem)
Definition: quantile.h:231
DEVICE void mergeBuffer()
Definition: quantile.h:672
DEVICE IndexType totalWeight() const
Definition: quantile.h:325
TDigest & operator=(TDigest &&rhs)
Definition: quantile.h:247
VectorView< RealType > sums()
Definition: quantile.h:177
DEVICE T * data()
Definition: VectorView.h:64
VectorView< RealType > sums_
Definition: quantile.h:60
VectorView< IndexType > counts_
Definition: quantile.h:61
std::vector< IndexType > counts_
Definition: quantile.h:171
DEVICE void clear()
Definition: VectorView.h:63
DEVICE void mergeBufferFinal()
Definition: quantile.h:683
#define DEVICE
constexpr double a
Definition: Utm.h:32
DEVICE RealType currMean() const
Definition: quantile.h:88
DEVICE RealType quantile()
Definition: quantile.h:300
DEVICE void setCentroids(Memory &mem)
Definition: quantile.h:312
DEVICE size_type size() const
Definition: VectorView.h:83
DEVICE TDigest(RealType q, SimpleAllocator *simple_allocator, IndexType buf_allocate, IndexType centroids_allocate)
Definition: quantile.h:236
static IndexType nbytes(IndexType buf_allocate, IndexType centroids_allocate)
Definition: quantile.h:261
Centroids< RealType, IndexType > * centroids_
Definition: quantile.h:136
DEVICE RealType max() const
Definition: quantile.h:203
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
DEVICE void set(T *data, size_type const size)
Definition: VectorView.h:79
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
std::optional< RealType > q_
Definition: quantile.h:197
DEVICE Centroids(VectorView< RealType > sums, VectorView< IndexType > counts)
Definition: quantile.h:72
DEVICE T * begin() const
Definition: VectorView.h:59
DEVICE CentroidsMerger(Centroids< RealType, IndexType > *buf, Centroids< RealType, IndexType > *centroids, bool const forward)
Definition: quantile.h:441
DEVICE void mergeTDigest(TDigest &t_digest)
Definition: quantile.h:283
DEVICE IndexType nextCount() const
Definition: quantile.h:101
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
DEVICE void setCentroids(VectorView< RealType > const sums, VectorView< IndexType > const counts)
Definition: quantile.h:318
DEVICE RealType slope(IndexType const idx1, IndexType const idx2) const
Definition: quantile.h:883
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
Centroids< RealType, IndexType > buf_
Definition: quantile.h:188
DEVICE RealType nextSum() const
Definition: quantile.h:103
DEVICE RealType lastCentroid(RealType const x, IndexType const N) const
Definition: quantile.h:790
DEVICE void skipFirst(Centroids< RealType, IndexType > *next_centroid)
Definition: quantile.h:543
DEVICE void allocate()
Definition: quantile.h:633
static DEVICE void shiftRange(T *const begin, IndexType skipped, IndexType const merged, int const inc)
Definition: quantile.h:498
DEVICE void setBuffer(Memory &mem)
Definition: quantile.h:305
DEVICE bool operator()(Value const &a, Value const &b) const
Definition: quantile.h:341
static constexpr RealType nan
Definition: quantile.h:63
DEVICE Centroids< RealType, IndexType > * getNextCentroid() const
Definition: quantile.h:457
DEVICE IndexType prefixSum() const
Definition: quantile.h:161
std::mutex merge_buffer_final_called_mutex_
Definition: quantile.h:192
DEVICE Centroids(RealType *sums, IndexType *counts, IndexType const size)
Definition: quantile.h:69
DEVICE void merge(IndexType const max_count)
Definition: quantile.h:568
DEVICE bool operator<(Centroids const &b) const
Definition: quantile.h:107
DEVICE bool hasNext() const
Definition: quantile.h:153
Centroids< RealType, IndexType > * curr_centroid_
Definition: quantile.h:137
DEVICE IndexType maxCardinality(IndexType const sum, IndexType const total_weight, RealType const c)
Definition: quantile.h:651
DEVICE size_t size() const
Definition: quantile.h:121
DEVICE IndexType currCount() const
Definition: quantile.h:86
DEVICE IndexType totalWeight() const
Definition: quantile.h:123
DEVICE bool isDifferentMean(Centroids< RealType, IndexType > *next_centroid) const
Definition: quantile.h:521
DEVICE T0 value0() const
Definition: DoubleSort.h:87
constexpr unsigned N
Definition: Utm.h:110
DEVICE IndexType capacity() const
Definition: quantile.h:77
DEVICE IndexType totalWeight() const
Definition: quantile.h:163
DEVICE RealType interiorCentroid(RealType const x, IndexType const idx1, IndexType const prefix_sum) const
Definition: quantile.h:768
DEVICE RealType oneCentroid(RealType const x) const
Definition: quantile.h:827
IndexType centroids_allocate_
Definition: quantile.h:201
DEVICE T1 value1() const
Definition: DoubleSort.h:90
DEVICE void mergeCentroids(Centroids< RealType, IndexType > &)
Definition: quantile.h:719
string name
Definition: setup.in.py:72
constexpr double n
Definition: Utm.h:38
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
DEVICE T & back()
Definition: VectorView.h:57
DEVICE RealType min() const
Definition: quantile.h:206
DEVICE RealType firstCentroid(RealType const x) const
Definition: quantile.h:749
DEVICE bool isSingleton(CountsIterator itr)
Definition: quantile.h:742
DEVICE void appendAndSortCurrent(Centroids &buff)
Definition: quantile.h:351
DEVICE bool hasCurr() const
Definition: quantile.h:90
DEVICE void mergeSorted(RealType *sums, IndexType *counts, IndexType size)
Definition: quantile.h:696
VectorView< IndexType > counts()
Definition: quantile.h:178
Centroids< RealType, IndexType > centroids_
Definition: quantile.h:189
DEVICE T * end() const
Definition: VectorView.h:67