OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "QueryEngine/Execute.h"
27 
33 
41 
43 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
44  const std::shared_ptr<Analyzer::BinOper> condition,
45  const std::vector<InputTableInfo>& query_infos,
46  const Data_Namespace::MemoryLevel memory_level,
47  const int device_count,
48  ColumnCacheMap& column_cache,
49  Executor* executor,
50  const RegisteredQueryHint& query_hint) {
51  decltype(std::chrono::steady_clock::now()) ts1, ts2;
52  auto inner_outer_pairs = normalize_column_pairs(
53  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
54 
55  const auto getHashTableType =
56  [](const std::shared_ptr<Analyzer::BinOper> condition,
57  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
59  if (condition->is_overlaps_oper()) {
60  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
61  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
62  inner_outer_pairs[0].second->get_type_info().is_array() &&
63  // Bounds vs constructed points, former should yield ManyToMany
64  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
65  layout = HashType::ManyToMany;
66  }
67  }
68  return layout;
69  };
70 
71  auto layout = getHashTableType(condition, inner_outer_pairs);
72 
73  if (VLOGGING(1)) {
74  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
75  << " for qual: " << condition->toString();
76  ts1 = std::chrono::steady_clock::now();
77  }
78 
79  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
80  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
81 
82  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
83  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
84 
85  const auto& query_info =
86  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
87  .info;
88  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
89  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
90  throw TooManyHashEntries();
91  }
92 
93  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
94  query_infos,
95  memory_level,
96  column_cache,
97  executor,
98  inner_outer_pairs,
99  device_count);
100  if (query_hint.isAnyQueryHintDelivered()) {
101  join_hash_table->registerQueryHint(query_hint);
102  }
103  try {
104  join_hash_table->reify(layout);
105  } catch (const HashJoinFail& e) {
106  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
107  "involved in overlaps join | ") +
108  e.what());
109  } catch (const ColumnarConversionNotSupported& e) {
110  throw HashJoinFail(std::string("Could not build hash tables for overlaps join | "
111  "Inner table too big. Attempt manual table reordering "
112  "or create a single fragment inner table. | ") +
113  e.what());
114  } catch (const std::exception& e) {
115  throw HashJoinFail(std::string("Failed to build hash tables for overlaps join | ") +
116  e.what());
117  }
118  if (VLOGGING(1)) {
119  ts2 = std::chrono::steady_clock::now();
120  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
121  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
122  << " ms";
123  }
124  return join_hash_table;
125 }
126 
127 namespace {
128 
130  const std::vector<double>& bucket_sizes,
131  const std::vector<double>& bucket_thresholds,
132  const double initial_value) {
133  std::vector<double> corrected_bucket_sizes(bucket_sizes);
134  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
135  if (bucket_sizes[i] == initial_value) {
136  corrected_bucket_sizes[i] = bucket_thresholds[i];
137  }
138  }
139  return corrected_bucket_sizes;
140 }
141 
142 std::vector<double> compute_bucket_sizes(
143  const std::vector<double>& bucket_thresholds,
144  const Data_Namespace::MemoryLevel effective_memory_level,
145  const JoinColumn& join_column,
146  const JoinColumnTypeInfo& join_column_type,
147  const std::vector<InnerOuter>& inner_outer_pairs,
148  const Executor* executor) {
149  // No coalesced keys for overlaps joins yet
150  CHECK_EQ(inner_outer_pairs.size(), 1u);
151 
152  const auto col = inner_outer_pairs[0].first;
153  CHECK(col);
154  const auto col_ti = col->get_type_info();
155  CHECK(col_ti.is_array());
156 
157  // TODO: Compute the number of dimensions for this overlaps key
158  const size_t num_dims{2};
159  const double initial_bin_value{0.0};
160  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
161  CHECK_EQ(bucket_thresholds.size(), num_dims);
162 
163  VLOG(1)
164  << "Computing x and y bucket sizes for overlaps hash join with maximum bucket size "
165  << std::to_string(bucket_thresholds[0]) << ", "
166  << std::to_string(bucket_thresholds[1]);
167 
168  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
169  const int thread_count = cpu_threads();
171  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
172  }
173 #ifdef HAVE_CUDA
174  else {
175  // Note that we compute the bucket sizes using only a single GPU
176  const int device_id = 0;
177  auto& data_mgr = executor->getCatalog()->getDataMgr();
178  CudaAllocator allocator(&data_mgr, device_id);
179  auto device_bucket_sizes_gpu =
180  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
181  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
182  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
183  auto device_bucket_thresholds_gpu =
184  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
185 
186  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
187  join_column_gpu,
188  join_column_type_gpu,
189  device_bucket_thresholds_gpu);
190  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
191  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
192  bucket_sizes.size() * sizeof(double));
193  }
194 #endif
195  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
196  bucket_sizes, bucket_thresholds, initial_bin_value);
197 
198  VLOG(1) << "Computed x and y bucket sizes for overlaps hash join: ("
199  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
200 
201  return corrected_bucket_sizes;
202 }
203 
205  HashTableProps(const size_t entry_count,
206  const size_t emitted_keys_count,
207  const size_t hash_table_size,
208  const std::vector<double>& bucket_sizes)
209  : entry_count(entry_count)
210  , emitted_keys_count(emitted_keys_count)
211  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
212  : emitted_keys_count / (entry_count / 2.0))
213  , hash_table_size(hash_table_size)
214  , bucket_sizes(bucket_sizes) {}
215 
216  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
217 
218  size_t entry_count;
220  double keys_per_bin;
222  std::vector<double> bucket_sizes;
223 };
224 
225 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
226  os << " entry_count: " << props.entry_count << ", emitted_keys "
227  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
228  << ", keys per bin " << props.keys_per_bin;
229  return os;
230 }
231 
232 struct TuningState {
233  TuningState(const size_t overlaps_max_table_size_bytes,
234  const double overlaps_target_entries_per_bin)
235  : crt_props(HashTableProps::invalid())
236  , prev_props(HashTableProps::invalid())
237  , chosen_overlaps_threshold(-1)
238  , crt_step(0)
239  , crt_reverse_search_iteration(0)
240  , overlaps_max_table_size_bytes(overlaps_max_table_size_bytes)
241  , overlaps_target_entries_per_bin(overlaps_target_entries_per_bin) {}
242 
243  // current and previous props, allows for easy backtracking
246 
247  // value we are tuning for
249  enum class TuningDirection { SMALLER, LARGER };
250  TuningDirection tuning_direction{TuningDirection::SMALLER};
251 
252  // various constants / state
253  size_t crt_step; // 1 indexed
254  size_t crt_reverse_search_iteration; // 1 indexed
257  const size_t max_reverse_search_iterations{8};
258 
263  bool operator()(const HashTableProps& new_props, const bool new_overlaps_threshold) {
264  prev_props = crt_props;
265  crt_props = new_props;
266  crt_step++;
267 
268  if (hashTableTooBig() || keysPerBinIncreasing()) {
269  if (hashTableTooBig()) {
270  VLOG(1) << "Reached hash table size limit: " << overlaps_max_table_size_bytes
271  << " with " << crt_props.hash_table_size << " byte hash table, "
272  << crt_props.keys_per_bin << " keys per bin.";
273  } else if (keysPerBinIncreasing()) {
274  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
275  << crt_props.keys_per_bin;
276  CHECK(previousIterationValid());
277  }
278  if (previousIterationValid()) {
279  VLOG(1) << "Using previous threshold value " << chosen_overlaps_threshold;
280  crt_props = prev_props;
281  return false;
282  } else {
283  CHECK(hashTableTooBig());
284  crt_reverse_search_iteration++;
285  chosen_overlaps_threshold = new_overlaps_threshold;
286 
287  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
288  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
289  << ") of reverse tuning iterations. Aborting tuning";
290  // use the crt props, but don't bother trying to tune any farther
291  return false;
292  }
293 
294  if (crt_reverse_search_iteration > 1 &&
295  crt_props.hash_table_size == prev_props.hash_table_size) {
296  // hash table size is not changing, bail
297  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
298  << " bytes) and still above maximum allowed size ("
299  << overlaps_max_table_size_bytes << " bytes). Aborting tuning";
300  return false;
301  }
302 
303  // if the hash table is too big on the very first step, change direction towards
304  // larger bins to see if a slightly smaller hash table will fit
305  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
306  VLOG(1)
307  << "First iteration of overlaps tuning led to hash table size over "
308  "limit. Reversing search to try larger bin sizes (previous threshold: "
309  << chosen_overlaps_threshold << ")";
310  // Need to change direction of tuning to tune "up" towards larger bins
311  tuning_direction = TuningDirection::LARGER;
312  }
313  return true;
314  }
315  UNREACHABLE();
316  }
317 
318  chosen_overlaps_threshold = new_overlaps_threshold;
319 
320  if (keysPerBinUnderThreshold()) {
321  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
322  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
323  << overlaps_target_entries_per_bin << ". Terminating bucket size loop.";
324  return false;
325  }
326 
327  if (crt_reverse_search_iteration > 0) {
328  // We always take the first tuning iteration that succeeds when reversing
329  // direction, as if we're here we haven't had a successful iteration and we're
330  // "backtracking" our search by making bin sizes larger
331  VLOG(1) << "On reverse (larger tuning direction) search found workable "
332  << " hash table size of " << crt_props.hash_table_size
333  << " with keys per bin " << crt_props.keys_per_bin
334  << ". Terminating bucket size loop.";
335  return false;
336  }
337 
338  return true;
339  }
340 
341  bool hashTableTooBig() const {
342  return crt_props.hash_table_size > overlaps_max_table_size_bytes;
343  }
344 
345  bool keysPerBinIncreasing() const {
346  return crt_props.keys_per_bin > prev_props.keys_per_bin;
347  }
348 
349  bool previousIterationValid() const {
350  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
351  }
352 
354  return crt_props.keys_per_bin < overlaps_target_entries_per_bin;
355  }
356 };
357 
359  public:
360  BucketSizeTuner(const double bucket_threshold,
361  const double step,
362  const double min_threshold,
363  const Data_Namespace::MemoryLevel effective_memory_level,
364  const std::vector<ColumnsForDevice>& columns_per_device,
365  const std::vector<InnerOuter>& inner_outer_pairs,
366  const size_t table_tuple_count,
367  const Executor* executor)
368  : num_dims_(2) // Todo: allow varying number of dims
369  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
370  , step_(step)
371  , min_threshold_(min_threshold)
372  , effective_memory_level_(effective_memory_level)
373  , columns_per_device_(columns_per_device)
374  , inner_outer_pairs_(inner_outer_pairs)
375  , table_tuple_count_(table_tuple_count)
376  , executor_(executor) {
377  CHECK(!columns_per_device_.empty());
378  }
379 
380  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
381 
382  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
383  return tuneOneStep(tuning_direction, step_);
384  }
385 
386  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
387  const double step_overide) {
388  if (table_tuple_count_ == 0) {
389  return false;
390  }
391  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
392  return tuneSmallerOneStep(step_overide);
393  }
394  return tuneLargerOneStep(step_overide);
395  }
396 
397  auto getMinBucketSize() const {
398  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
399  }
400 
407  std::vector<double> getInverseBucketSizes() {
408  if (num_steps_ == 0) {
409  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
410  current_bucket_sizes_ = computeBucketSizes();
411  }
412  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
413  std::vector<double> inverse_bucket_sizes;
414  for (const auto s : current_bucket_sizes_) {
415  inverse_bucket_sizes.emplace_back(1.0 / s);
416  }
417  return inverse_bucket_sizes;
418  }
419 
420  private:
422  for (const auto& t : bucket_thresholds_) {
423  if (t < min_threshold_) {
424  return true;
425  }
426  }
427  return false;
428  }
429 
430  std::vector<double> computeBucketSizes() const {
431  if (table_tuple_count_ == 0) {
432  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
433  }
434  return compute_bucket_sizes(bucket_thresholds_,
435  effective_memory_level_,
436  columns_per_device_.front().join_columns[0],
437  columns_per_device_.front().join_column_types[0],
438  inner_outer_pairs_,
439  executor_);
440  }
441 
442  bool tuneSmallerOneStep(const double step_overide) {
443  if (!current_bucket_sizes_.empty()) {
444  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
445  bucket_thresholds_ = current_bucket_sizes_;
446  for (auto& t : bucket_thresholds_) {
447  t /= step_overide;
448  }
449  }
450  if (bucketThresholdsBelowMinThreshold()) {
451  VLOG(1) << "Aborting overlaps tuning as at least one bucket size is below min "
452  "threshold";
453  return false;
454  }
455  const auto next_bucket_sizes = computeBucketSizes();
456  if (next_bucket_sizes == current_bucket_sizes_) {
457  VLOG(1) << "Aborting overlaps tuning as bucket size is no longer changing.";
458  return false;
459  }
460 
461  current_bucket_sizes_ = next_bucket_sizes;
462  num_steps_++;
463  return true;
464  }
465 
466  bool tuneLargerOneStep(const double step_overide) {
467  if (!current_bucket_sizes_.empty()) {
468  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
469  bucket_thresholds_ = current_bucket_sizes_;
470  }
471  // If current_bucket_sizes was empty, we will start from our initial threshold
472  for (auto& t : bucket_thresholds_) {
473  t *= step_overide;
474  }
475  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
476  // written will pick the largest bin size below the threshold, meaning our bucket_size
477  // will never increase beyond the size of the largest polygon. This could mean that we
478  // can never make the bucket sizes large enough to get our hash table below the
479  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
480  // allows for optionally finding smallest extent above threshold, to mirror default
481  // behavior finding largest extent below threshold, and use former variant here
482  current_bucket_sizes_ = bucket_thresholds_;
483  num_steps_++;
484  return true;
485  }
486 
487  size_t num_dims_;
488  std::vector<double> bucket_thresholds_;
489  size_t num_steps_{0};
490  const double step_;
491  const double min_threshold_;
493  const std::vector<ColumnsForDevice>& columns_per_device_;
494  const std::vector<InnerOuter>& inner_outer_pairs_;
495  const size_t table_tuple_count_;
496  const Executor* executor_;
497 
498  std::vector<double> current_bucket_sizes_;
499 
500  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
501 };
502 
503 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
504  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed
505  << tuner.bucket_thresholds_[0] << ", Step Size: " << std::fixed << tuner.step_
506  << ", Min: " << std::fixed << tuner.min_threshold_;
507  return os;
508 }
509 
510 } // namespace
511 
513  auto timer = DEBUG_TIMER(__func__);
515  const auto& query_info =
517  .info;
518  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
519  << "for table_id: " << HashJoin::getInnerTableId(inner_outer_pairs_);
520  if (query_info.fragments.empty()) {
521  return;
522  }
523 
524  auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
525  std::optional<double> overlaps_threshold_override;
526  double overlaps_target_entries_per_bin = g_overlaps_target_entries_per_bin;
527  auto query_hint = getRegisteredQueryHint();
528  auto skip_hashtable_caching = false;
529  if (query_hint.isHintRegistered(QueryHint::kOverlapsBucketThreshold)) {
530  VLOG(1) << "Setting overlaps bucket threshold "
531  "\'overlaps_hashjoin_bucket_threshold\' via "
532  "query hint: "
533  << query_hint.overlaps_bucket_threshold;
534  overlaps_threshold_override = query_hint.overlaps_bucket_threshold;
535  }
536  if (query_hint.isHintRegistered(QueryHint::kOverlapsMaxSize)) {
537  std::ostringstream oss;
538  oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
539  "query hint";
540  if (!overlaps_threshold_override.has_value()) {
541  oss << ": " << overlaps_max_table_size_bytes << " -> "
542  << query_hint.overlaps_max_size;
543  overlaps_max_table_size_bytes = query_hint.overlaps_max_size;
544  } else {
545  oss << ", but is skipped since the query hint also changes the threshold "
546  "\'overlaps_hashjoin_bucket_threshold\'";
547  }
548  VLOG(1) << oss.str();
549  }
550  if (query_hint.isHintRegistered(QueryHint::kOverlapsNoCache)) {
551  VLOG(1) << "User requests to skip caching overlaps join hashtable and its tuned "
552  "parameters for this query";
553  skip_hashtable_caching = true;
554  }
555  if (query_hint.isHintRegistered(QueryHint::kOverlapsKeysPerBin)) {
556  VLOG(1) << "User requests to change a threshold \'overlaps_keys_per_bin\' via query "
557  "hint: "
558  << overlaps_target_entries_per_bin << " -> "
559  << query_hint.overlaps_keys_per_bin;
560  overlaps_target_entries_per_bin = query_hint.overlaps_keys_per_bin;
561  }
562 
563  std::vector<ColumnsForDevice> columns_per_device;
564  const auto catalog = executor_->getCatalog();
565  CHECK(catalog);
566  auto& data_mgr = catalog->getDataMgr();
567  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
569  for (int device_id = 0; device_id < device_count_; ++device_id) {
570  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
571  }
572  }
573  const auto shard_count = shardCount();
574  size_t total_num_tuples = 0;
575  for (int device_id = 0; device_id < device_count_; ++device_id) {
576  const auto fragments =
577  shard_count
578  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
579  : query_info.fragments;
580  const size_t crt_num_tuples =
581  std::accumulate(fragments.begin(),
582  fragments.end(),
583  size_t(0),
584  [](const auto& sum, const auto& fragment) {
585  return sum + fragment.getNumTuples();
586  });
587  total_num_tuples += crt_num_tuples;
588  const auto columns_for_device =
589  fetchColumnsForDevice(fragments,
590  device_id,
592  ? dev_buff_owners[device_id].get()
593  : nullptr);
594  columns_per_device.push_back(columns_for_device);
595  }
596 
597  // Prepare to calculate the size of the hash table.
598  const auto composite_key_info =
600 
601  auto cache_key_contains_intermediate_table = [](const auto cache_key) {
602  for (auto key : cache_key.chunk_keys) {
603  CHECK_GE(key.size(), size_t(2));
604  if (key[1] < 0) {
605  return true;
606  }
607  }
608  return false;
609  };
610 
611  if (overlaps_threshold_override) {
612  // compute bucket sizes based on the user provided threshold
613  BucketSizeTuner tuner(/*initial_threshold=*/*overlaps_threshold_override,
614  /*step=*/1.0,
615  /*min_threshold=*/0.0,
617  columns_per_device,
619  total_num_tuples,
620  executor_);
621  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
622 
623  auto [entry_count, emitted_keys_count] =
624  computeHashTableCounts(shard_count,
625  inverse_bucket_sizes,
626  columns_per_device,
627  overlaps_max_table_size_bytes,
628  *overlaps_threshold_override);
629  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
630  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
631  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
632  // one
633  reifyImpl(columns_per_device,
634  query_info,
635  layout,
636  shard_count,
637  entry_count,
638  emitted_keys_count,
639  skip_hashtable_caching,
640  overlaps_max_table_size_bytes,
641  *overlaps_threshold_override);
642  } else {
643  double overlaps_bucket_threshold = std::numeric_limits<double>::max();
644  OverlapsHashTableCacheKey cache_key{
645  columns_per_device.front().join_columns.front().num_elems,
646  composite_key_info.cache_key_chunks,
647  condition_->get_optype(),
648  overlaps_max_table_size_bytes,
649  overlaps_bucket_threshold};
650  auto cached_bucket_threshold_opt = auto_tuner_cache_->get(cache_key);
651  if (cached_bucket_threshold_opt) {
652  overlaps_bucket_threshold = cached_bucket_threshold_opt->first;
653  auto inverse_bucket_sizes = cached_bucket_threshold_opt->second;
654 
655  OverlapsHashTableCacheKey hash_table_cache_key(cache_key,
656  overlaps_max_table_size_bytes,
657  overlaps_bucket_threshold,
658  inverse_bucket_sizes);
659  if (auto hash_table_cache_opt =
660  hash_table_cache_->getWithKey(hash_table_cache_key)) {
661  // if we already have a built hash table, we can skip the scans required for
662  // computing bucket size and tuple count
663  auto key = hash_table_cache_opt->first;
664  // reset as the hash table sizes can vary a bit
666  key.inverse_bucket_sizes, columns_per_device, device_count_);
667  auto hash_table = hash_table_cache_opt->second;
668  CHECK(hash_table);
669 
670  VLOG(1) << "Using cached hash table bucket size";
671 
672  reifyImpl(columns_per_device,
673  query_info,
674  layout,
675  shard_count,
676  hash_table->getEntryCount(),
677  hash_table->getEmittedKeysCount(),
678  skip_hashtable_caching,
679  overlaps_max_table_size_bytes,
680  overlaps_bucket_threshold);
681  } else {
682  VLOG(1) << "Computing bucket size for cached bucket threshold";
683  // compute bucket size using our cached tuner value
684  BucketSizeTuner tuner(/*initial_threshold=*/overlaps_bucket_threshold,
685  /*step=*/1.0,
686  /*min_threshold=*/0.0,
688  columns_per_device,
690  total_num_tuples,
691  executor_);
692 
693  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
694 
695  auto [entry_count, emitted_keys_count] =
696  computeHashTableCounts(shard_count,
697  inverse_bucket_sizes,
698  columns_per_device,
699  overlaps_max_table_size_bytes,
700  overlaps_bucket_threshold);
701  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
702 
703  reifyImpl(columns_per_device,
704  query_info,
705  layout,
706  shard_count,
707  entry_count,
708  emitted_keys_count,
709  skip_hashtable_caching,
710  overlaps_max_table_size_bytes,
711  overlaps_bucket_threshold);
712  }
713  } else {
714  // compute bucket size using the auto tuner
715  BucketSizeTuner tuner(
716  /*initial_threshold=*/overlaps_bucket_threshold,
717  /*step=*/2.0,
718  /*min_threshold=*/1e-7,
720  columns_per_device,
722  total_num_tuples,
723  executor_);
724 
725  VLOG(1) << "Running overlaps join size auto tune with parameters: " << tuner;
726 
727  // manages the tuning state machine
728  TuningState tuning_state(overlaps_max_table_size_bytes,
729  overlaps_target_entries_per_bin);
730  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
731  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
732 
733  const auto [crt_entry_count, crt_emitted_keys_count] =
734  computeHashTableCounts(shard_count,
735  inverse_bucket_sizes,
736  columns_per_device,
737  tuning_state.overlaps_max_table_size_bytes,
738  tuning_state.chosen_overlaps_threshold);
739  const size_t hash_table_size = calculateHashTableSize(
740  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
741  HashTableProps crt_props(crt_entry_count,
742  crt_emitted_keys_count,
743  hash_table_size,
744  inverse_bucket_sizes);
745  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
746 
747  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
749  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
750  if (!should_continue) {
751  break;
752  }
753  }
754 
755  const auto& crt_props = tuning_state.crt_props;
756  // sanity check that the hash table size has not changed. this is a fairly
757  // inexpensive check to ensure the above algorithm is consistent
758  const size_t hash_table_size =
760  crt_props.emitted_keys_count,
761  crt_props.entry_count);
762  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
763 
765  hash_table_size > overlaps_max_table_size_bytes) {
766  VLOG(1) << "Could not find suitable overlaps join parameters to create hash "
767  "table under max allowed size ("
768  << overlaps_max_table_size_bytes << ") bytes.";
769  throw OverlapsHashTableTooBig(overlaps_max_table_size_bytes);
770  }
771 
772  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
774  VLOG(1) << "Final bucket sizes: ";
775  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
776  VLOG(1) << "dim[" << dim
777  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
778  }
779  CHECK_GE(tuning_state.chosen_overlaps_threshold, double(0));
780  if (!cache_key_contains_intermediate_table(cache_key)) {
781  if (skip_hashtable_caching) {
782  VLOG(1) << "Skip to add tuned parameters to auto tuner";
783  } else {
784  auto cache_value = std::make_pair(tuning_state.chosen_overlaps_threshold,
786  auto_tuner_cache_->insert(cache_key, cache_value);
787  }
788  }
789  overlaps_bucket_threshold = tuning_state.chosen_overlaps_threshold;
790  reifyImpl(columns_per_device,
791  query_info,
792  layout,
793  shard_count,
794  crt_props.entry_count,
795  crt_props.emitted_keys_count,
796  skip_hashtable_caching,
797  overlaps_max_table_size_bytes,
798  overlaps_bucket_threshold);
799  }
800  }
801 }
802 
803 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
804  size_t emitted_keys_count,
805  size_t entry_count) const {
806  const auto key_component_width = getKeyComponentWidth();
807  const auto key_component_count = number_of_dimensions;
808  const auto entry_size = key_component_count * key_component_width;
809  const auto keys_for_all_rows = emitted_keys_count;
810  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
811  const size_t hash_table_size =
812  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
813  return hash_table_size;
814 }
815 
817  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
818  const int device_id,
819  DeviceAllocator* dev_buff_owner) {
820  const auto& catalog = *executor_->getCatalog();
821  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
822 
823  std::vector<JoinColumn> join_columns;
824  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
825  std::vector<JoinColumnTypeInfo> join_column_types;
826  std::vector<std::shared_ptr<void>> malloc_owner;
827  for (const auto& inner_outer_pair : inner_outer_pairs_) {
828  const auto inner_col = inner_outer_pair.first;
829  const auto inner_cd = get_column_descriptor_maybe(
830  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
831  if (inner_cd && inner_cd->isVirtualCol) {
833  }
834  join_columns.emplace_back(fetchJoinColumn(inner_col,
835  fragments,
836  effective_memory_level,
837  device_id,
838  chunks_owner,
839  dev_buff_owner,
840  malloc_owner,
841  executor_,
842  &column_cache_));
843  const auto& ti = inner_col->get_type_info();
844  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
845  0,
846  0,
847  inline_int_null_value<int64_t>(),
848  false,
849  0,
851  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
852  }
853  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
854 }
855 
857  const size_t shard_count,
858  const std::vector<double>& inverse_bucket_sizes_for_dimension,
859  std::vector<ColumnsForDevice>& columns_per_device,
860  const size_t chosen_max_hashtable_size,
861  const double chosen_bucket_threshold) {
862  CHECK(!inverse_bucket_sizes_for_dimension.empty());
863  const auto [tuple_count, emitted_keys_count] =
864  approximateTupleCount(inverse_bucket_sizes_for_dimension,
865  columns_per_device,
866  chosen_max_hashtable_size,
867  chosen_bucket_threshold);
868  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
869 
870  return std::make_pair(
871  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
872  emitted_keys_count);
873 }
874 
876  const std::vector<double>& inverse_bucket_sizes_for_dimension,
877  std::vector<ColumnsForDevice>& columns_per_device,
878  const size_t chosen_max_hashtable_size,
879  const double chosen_bucket_threshold) {
880  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
881  CountDistinctDescriptor count_distinct_desc{
883  0,
884  11,
885  true,
886  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
889  1};
890  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
891 
892  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
893  if (columns_per_device.front().join_columns.front().num_elems == 0) {
894  return std::make_pair(0, 0);
895  }
896 
897  // TODO: state management in here should be revisited, but this should be safe enough
898  // for now
899  // re-compute bucket counts per device based on global bucket size
900  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
901  auto& columns_for_device = columns_per_device[device_id];
902  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
904  }
905 
906  // Number of keys must match dimension of buckets
907  CHECK_EQ(columns_per_device.front().join_columns.size(),
908  columns_per_device.front().join_buckets.size());
909  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
910  // Note that this path assumes each device has the same hash table (for GPU hash join
911  // w/ hash table built on CPU)
912  const auto composite_key_info =
914  OverlapsHashTableCacheKey cache_key{
915  columns_per_device.front().join_columns.front().num_elems,
916  composite_key_info.cache_key_chunks,
917  condition_->get_optype(),
918  chosen_max_hashtable_size,
919  chosen_bucket_threshold,
920  inverse_bucket_sizes_for_dimension};
921  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
922  if (cached_count_info) {
923  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
924  << ", emitted keys count: " << cached_count_info->second;
925  return *cached_count_info;
926  }
927  int thread_count = cpu_threads();
928  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
929  auto hll_result = &hll_buffer_all_cpus[0];
930 
931  std::vector<int32_t> num_keys_for_row;
932  // TODO(adb): support multi-column overlaps join
933  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
934 
936  num_keys_for_row,
937  count_distinct_desc.bitmap_sz_bits,
938  padded_size_bytes,
939  columns_per_device.front().join_columns,
940  columns_per_device.front().join_column_types,
941  columns_per_device.front().join_buckets,
942  thread_count);
943  for (int i = 1; i < thread_count; ++i) {
944  hll_unify(hll_result,
945  hll_result + i * padded_size_bytes,
946  1 << count_distinct_desc.bitmap_sz_bits);
947  }
948  return std::make_pair(
949  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
950  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
951  }
952 #ifdef HAVE_CUDA
953  auto& data_mgr = executor_->getCatalog()->getDataMgr();
954  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
955  for (auto& host_hll_buffer : host_hll_buffers) {
956  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
957  }
958  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
959  std::vector<std::future<void>> approximate_distinct_device_threads;
960  for (int device_id = 0; device_id < device_count_; ++device_id) {
961  approximate_distinct_device_threads.emplace_back(std::async(
962  std::launch::async,
963  [device_id,
964  &columns_per_device,
965  &count_distinct_desc,
966  &data_mgr,
967  &host_hll_buffers,
968  &emitted_keys_count_device_threads] {
969  CudaAllocator allocator(&data_mgr, device_id);
970  auto device_hll_buffer =
971  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
972  data_mgr.getCudaMgr()->zeroDeviceMem(
973  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
974  const auto& columns_for_device = columns_per_device[device_id];
975  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
976  columns_for_device.join_columns, allocator);
977 
978  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
979  const auto& inverse_bucket_sizes_for_dimension =
980  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
981  auto inverse_bucket_sizes_gpu =
982  allocator.alloc(inverse_bucket_sizes_for_dimension.size() * sizeof(double));
983  copy_to_gpu(&data_mgr,
984  reinterpret_cast<CUdeviceptr>(inverse_bucket_sizes_gpu),
985  inverse_bucket_sizes_for_dimension.data(),
986  inverse_bucket_sizes_for_dimension.size() * sizeof(double),
987  device_id);
988  const size_t row_counts_buffer_sz =
989  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
990  auto row_counts_buffer = allocator.alloc(row_counts_buffer_sz);
991  data_mgr.getCudaMgr()->zeroDeviceMem(
992  row_counts_buffer, row_counts_buffer_sz, device_id);
993  const auto key_handler =
994  OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
995  join_columns_gpu,
996  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
997  const auto key_handler_gpu =
998  transfer_flat_object_to_gpu(key_handler, allocator);
1000  reinterpret_cast<uint8_t*>(device_hll_buffer),
1001  count_distinct_desc.bitmap_sz_bits,
1002  reinterpret_cast<int32_t*>(row_counts_buffer),
1003  key_handler_gpu,
1004  columns_for_device.join_columns[0].num_elems);
1005 
1006  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1007  copy_from_gpu(&data_mgr,
1008  &host_emitted_keys_count,
1009  reinterpret_cast<CUdeviceptr>(
1010  row_counts_buffer +
1011  (columns_per_device.front().join_columns[0].num_elems - 1) *
1012  sizeof(int32_t)),
1013  sizeof(int32_t),
1014  device_id);
1015 
1016  auto& host_hll_buffer = host_hll_buffers[device_id];
1017  copy_from_gpu(&data_mgr,
1018  &host_hll_buffer[0],
1019  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
1020  count_distinct_desc.bitmapPaddedSizeBytes(),
1021  device_id);
1022  }));
1023  }
1024  for (auto& child : approximate_distinct_device_threads) {
1025  child.get();
1026  }
1027  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1028  auto& result_hll_buffer = host_hll_buffers.front();
1029  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1030  for (int device_id = 1; device_id < device_count_; ++device_id) {
1031  auto& host_hll_buffer = host_hll_buffers[device_id];
1032  hll_unify(hll_result,
1033  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1034  1 << count_distinct_desc.bitmap_sz_bits);
1035  }
1036  const size_t emitted_keys_count =
1037  std::accumulate(emitted_keys_count_device_threads.begin(),
1038  emitted_keys_count_device_threads.end(),
1039  0);
1040  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1041  emitted_keys_count);
1042 #else
1043  UNREACHABLE();
1044  return {0, 0};
1045 #endif // HAVE_CUDA
1046 }
1047 
1049  const std::vector<double>& inverse_bucket_sizes,
1050  std::vector<ColumnsForDevice>& columns_per_device,
1051  const size_t device_count) {
1052  // set global bucket size
1053  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1054 
1055  // re-compute bucket counts per device based on global bucket size
1056  CHECK_EQ(columns_per_device.size(), size_t(device_count));
1057  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1058  auto& columns_for_device = columns_per_device[device_id];
1059  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1061  }
1062 }
1063 
1065  return 8;
1066 }
1067 
1071 }
1072 
1073 void OverlapsJoinHashTable::reify(const HashType preferred_layout) {
1074  auto timer = DEBUG_TIMER(__func__);
1075  CHECK_LT(0, device_count_);
1076  const auto composite_key_info =
1078 
1079  CHECK(condition_->is_overlaps_oper());
1080  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1081  HashType layout;
1082  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1083  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1084  // bounds array
1085  layout = HashType::ManyToMany;
1086  } else {
1087  layout = HashType::OneToMany;
1088  }
1089  try {
1090  reifyWithLayout(layout);
1091  return;
1092  } catch (const std::exception& e) {
1093  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
1094  << e.what();
1095  throw;
1096  }
1097 }
1098 
1099 void OverlapsJoinHashTable::reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
1100  const Fragmenter_Namespace::TableInfo& query_info,
1101  const HashType layout,
1102  const size_t shard_count,
1103  const size_t entry_count,
1104  const size_t emitted_keys_count,
1105  const bool skip_hashtable_caching,
1106  const size_t chosen_max_hashtable_size,
1107  const double chosen_bucket_threshold) {
1108  std::vector<std::future<void>> init_threads;
1109  for (int device_id = 0; device_id < device_count_; ++device_id) {
1110  const auto fragments =
1111  shard_count
1112  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1113  : query_info.fragments;
1114  init_threads.push_back(std::async(std::launch::async,
1116  this,
1117  columns_per_device[device_id],
1118  layout,
1119  entry_count,
1120  emitted_keys_count,
1121  skip_hashtable_caching,
1122  chosen_max_hashtable_size,
1123  chosen_bucket_threshold,
1124  device_id,
1125  logger::thread_id()));
1126  }
1127  for (auto& init_thread : init_threads) {
1128  init_thread.wait();
1129  }
1130  for (auto& init_thread : init_threads) {
1131  init_thread.get();
1132  }
1133 }
1134 
1136  const HashType layout,
1137  const size_t entry_count,
1138  const size_t emitted_keys_count,
1139  const bool skip_hashtable_caching,
1140  const size_t chosen_max_hashtable_size,
1141  const double chosen_bucket_threshold,
1142  const int device_id,
1143  const logger::ThreadId parent_thread_id) {
1144  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1145  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1147  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1148 
1149  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1150  VLOG(1) << "Building overlaps join hash table on CPU.";
1151  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1152  columns_for_device.join_column_types,
1153  columns_for_device.join_buckets,
1154  layout,
1155  entry_count,
1156  emitted_keys_count,
1157  skip_hashtable_caching,
1158  chosen_max_hashtable_size,
1159  chosen_bucket_threshold);
1160  CHECK(hash_table);
1161 
1162 #ifdef HAVE_CUDA
1164  auto gpu_hash_table = copyCpuHashTableToGpu(
1165  std::move(hash_table), layout, entry_count, emitted_keys_count, device_id);
1166  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
1167  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1168  } else {
1169 #else
1170  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1171 #endif
1172  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1173  hash_tables_for_device_[0] = std::move(hash_table);
1174 #ifdef HAVE_CUDA
1175  }
1176 #endif
1177  } else {
1178 #ifdef HAVE_CUDA
1179  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1180  columns_for_device.join_column_types,
1181  columns_for_device.join_buckets,
1182  layout,
1183  entry_count,
1184  emitted_keys_count,
1185  device_id);
1186  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
1187  hash_tables_for_device_[device_id] = std::move(hash_table);
1188 #else
1189  UNREACHABLE();
1190 #endif
1191  }
1192 }
1193 
1194 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnCpu(
1195  const std::vector<JoinColumn>& join_columns,
1196  const std::vector<JoinColumnTypeInfo>& join_column_types,
1197  const std::vector<JoinBucketInfo>& join_bucket_info,
1198  const HashType layout,
1199  const size_t entry_count,
1200  const size_t emitted_keys_count,
1201  const bool skip_hashtable_caching,
1202  const size_t chosen_max_hashtable_size,
1203  const double chosen_bucket_threshold) {
1204  auto timer = DEBUG_TIMER(__func__);
1205  const auto composite_key_info =
1207  CHECK(!join_columns.empty());
1208  CHECK(!join_bucket_info.empty());
1209  OverlapsHashTableCacheKey cache_key{join_columns.front().num_elems,
1210  composite_key_info.cache_key_chunks,
1211  condition_->get_optype(),
1212  chosen_max_hashtable_size,
1213  chosen_bucket_threshold,
1215 
1216  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1217  if (auto generic_hash_table = initHashTableOnCpuFromCache(cache_key)) {
1218  if (auto hash_table =
1219  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1220  VLOG(1) << "Using cached CPU hash table for initialization.";
1221  // See if a hash table of a different layout was returned.
1222  // If it was OneToMany, we can reuse it on ManyToMany.
1223  if (layout == HashType::ManyToMany &&
1224  hash_table->getLayout() == HashType::OneToMany) {
1225  // use the cached hash table
1227  }
1228  return hash_table;
1229  }
1230  }
1232  const auto key_component_count =
1233  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1234 
1235  const auto key_handler =
1236  OverlapsKeyHandler(key_component_count,
1237  &join_columns[0],
1238  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1239  const auto catalog = executor_->getCatalog();
1240  BaselineJoinHashTableBuilder builder(catalog);
1241  const auto err = builder.initHashTableOnCpu(&key_handler,
1242  composite_key_info,
1243  join_columns,
1244  join_column_types,
1245  join_bucket_info,
1246  entry_count,
1247  emitted_keys_count,
1248  layout,
1251  if (err) {
1252  throw HashJoinFail(
1253  std::string("Unrecognized error when initializing CPU overlaps hash table (") +
1254  std::to_string(err) + std::string(")"));
1255  }
1256  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1258  if (skip_hashtable_caching) {
1259  VLOG(1) << "Skip to cache overlaps join hashtable";
1260  } else {
1261  putHashTableOnCpuToCache(cache_key, hash_table);
1262  }
1263  }
1264  return hash_table;
1265 }
1266 
1267 #ifdef HAVE_CUDA
1268 
1269 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnGpu(
1270  const std::vector<JoinColumn>& join_columns,
1271  const std::vector<JoinColumnTypeInfo>& join_column_types,
1272  const std::vector<JoinBucketInfo>& join_bucket_info,
1273  const HashType layout,
1274  const size_t entry_count,
1275  const size_t emitted_keys_count,
1276  const size_t device_id) {
1278 
1279  VLOG(1) << "Building overlaps join hash table on GPU.";
1280 
1281  const auto catalog = executor_->getCatalog();
1282  CHECK(catalog);
1283  BaselineJoinHashTableBuilder builder(catalog);
1284 
1285  auto& data_mgr = catalog->getDataMgr();
1286  CudaAllocator allocator(&data_mgr, device_id);
1287  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1288  CHECK_EQ(join_columns.size(), 1u);
1289  CHECK(!join_bucket_info.empty());
1290  auto& inverse_bucket_sizes_for_dimension =
1291  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1292  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1293  inverse_bucket_sizes_for_dimension, allocator);
1294  const auto key_handler = OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1295  join_columns_gpu,
1296  inverse_bucket_sizes_gpu);
1297 
1298  const auto err = builder.initHashTableOnGpu(&key_handler,
1299  join_columns,
1300  layout,
1303  entry_count,
1304  emitted_keys_count,
1305  device_id);
1306  if (err) {
1307  throw HashJoinFail(
1308  std::string("Unrecognized error when initializing GPU overlaps hash table (") +
1309  std::to_string(err) + std::string(")"));
1310  }
1311  return builder.getHashTable();
1312 }
1313 
1314 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::copyCpuHashTableToGpu(
1315  std::shared_ptr<BaselineHashTable>&& cpu_hash_table,
1316  const HashType layout,
1317  const size_t entry_count,
1318  const size_t emitted_keys_count,
1319  const size_t device_id) {
1321 
1322  auto catalog = executor_->getCatalog();
1323  CHECK(catalog);
1324  auto& data_mgr = catalog->getDataMgr();
1325 
1326  // copy hash table to GPU
1327  BaselineJoinHashTableBuilder gpu_builder(executor_->catalog_);
1328  gpu_builder.allocateDeviceMemory(layout,
1331  entry_count,
1332  emitted_keys_count,
1333  device_id);
1334  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1335  CHECK(gpu_hash_table);
1336  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1337  CHECK(gpu_buffer_ptr);
1338 
1339  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1340  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1341  copy_to_gpu(&data_mgr,
1342  reinterpret_cast<CUdeviceptr>(gpu_buffer_ptr),
1343  cpu_hash_table->getCpuBuffer(),
1344  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1345  device_id);
1346  return gpu_hash_table;
1347 }
1348 
1349 #endif // HAVE_CUDA
1350 
1351 #define LL_CONTEXT executor_->cgen_state_->context_
1352 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1353 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1354 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1355 #define ROW_FUNC executor_->cgen_state_->row_func_
1356 
1358  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1359  const auto key_component_width = getKeyComponentWidth();
1360  CHECK(key_component_width == 4 || key_component_width == 8);
1361  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1362  llvm::Value* key_buff_lv{nullptr};
1363  switch (key_component_width) {
1364  case 4:
1365  key_buff_lv =
1366  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1367  break;
1368  case 8:
1369  key_buff_lv =
1370  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1371  break;
1372  default:
1373  CHECK(false);
1374  }
1375 
1376  const auto& inner_outer_pair = inner_outer_pairs_[0];
1377  const auto outer_geo = inner_outer_pair.second;
1378  const auto outer_geo_ti = outer_geo->get_type_info();
1379 
1380  llvm::Value* arr_ptr = nullptr;
1381  CodeGenerator code_generator(executor_);
1382  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1383 
1384  if (outer_geo_ti.is_geometry()) {
1385  // TODO(adb): for points we will use the coords array, but for other geometries we
1386  // will need to use the bounding box. For now only support points.
1387  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1388 
1389  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1390  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1391  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1392  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1393  outer_geo_col->get_table_id(), outer_geo_col->get_column_id() + 1);
1394  CHECK(coords_cd);
1395 
1396  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1397  "array_buff",
1398  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1399  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1400  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1401  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
1402  arr_ptr = code_generator.castArrayPointer(array_ptr,
1403  coords_cd->columnType.get_elem_type());
1404  }
1405  } else if (outer_geo_ti.is_fixlen_array()) {
1406  // Process dynamically constructed points
1407  const auto outer_geo_cast_coord_array =
1408  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1409  CHECK(outer_geo_cast_coord_array->get_optype() == kCAST);
1410  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1411  outer_geo_cast_coord_array->get_operand());
1412  CHECK(outer_geo_coord_array);
1413  CHECK(outer_geo_coord_array->isLocalAlloc());
1414  CHECK(outer_geo_coord_array->getElementCount() == 2);
1415  CHECK(outer_geo_ti.get_size() == 2 * sizeof(double));
1416  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1417  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1418  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1419  arr_ptr = LL_BUILDER.CreateGEP(array_ptr, LL_INT(0));
1420  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1421  }
1422  if (!arr_ptr) {
1423  LOG(FATAL) << "Overlaps key currently only supported for geospatial columns and "
1424  "constructed points.";
1425  }
1426 
1427  for (size_t i = 0; i < 2; i++) {
1428  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
1429 
1430  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1431  // future compression schemes
1432  auto bucket_key =
1433  outer_geo_ti.get_compression() == kENCODING_GEOINT
1434  ? executor_->cgen_state_->emitExternalCall(
1435  "get_bucket_key_for_range_compressed",
1436  get_int_type(64, LL_CONTEXT),
1437  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1438  : executor_->cgen_state_->emitExternalCall(
1439  "get_bucket_key_for_range_double",
1440  get_int_type(64, LL_CONTEXT),
1441  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1442  const auto col_lv = LL_BUILDER.CreateSExt(
1443  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1444  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1445  }
1446  return key_buff_lv;
1447 }
1448 
1449 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
1450  const CompilationOptions& co) {
1451  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1452  const auto key_component_width = getKeyComponentWidth();
1453  CHECK(key_component_width == 4 || key_component_width == 8);
1454  auto hash_table = getHashTableForDevice(size_t(0));
1455  CHECK(hash_table);
1457 
1458  VLOG(1) << "Performing codgen for ManyToMany";
1459  const auto& inner_outer_pair = inner_outer_pairs_[0];
1460  const auto outer_col = inner_outer_pair.second;
1461 
1462  CodeGenerator code_generator(executor_);
1463  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1464  CHECK_EQ(col_lvs.size(), size_t(1));
1465 
1466  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1467  CHECK(outer_col_var);
1468  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1469  outer_col_var->get_table_id(), outer_col_var->get_column_id());
1470  CHECK(coords_cd);
1471 
1472  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1473  "array_buff",
1474  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1475  {col_lvs.front(), code_generator.posArg(outer_col)});
1476 
1477  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1478  // const auto arr_ptr =
1479  // code_generator.castArrayPointer(array_ptr,
1480  // coords_cd->columnType.get_elem_type());
1481  array_ptr->setName("array_ptr");
1482 
1483  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1484  "get_num_buckets_for_bounds",
1485  get_int_type(32, LL_CONTEXT),
1486  {array_ptr,
1487  LL_INT(0),
1488  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1489  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1490  num_keys_lv->setName("num_keys_lv");
1491 
1492  return {num_keys_lv, array_ptr};
1493 }
1494 
1496  const CompilationOptions& co,
1497  const size_t index) {
1498  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1499  if (getHashType() == HashType::ManyToMany) {
1500  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1501  const auto key_component_width = getKeyComponentWidth();
1502  CHECK(key_component_width == 4 || key_component_width == 8);
1503  auto many_to_many_args = codegenManyKey(co);
1504  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1505  const auto composite_dict_ptr_type =
1506  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1507  const auto composite_key_dict =
1508  hash_ptr->getType()->isPointerTy()
1509  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1510  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1511  const auto key_component_count = getKeyComponentCount();
1512 
1513  auto one_to_many_ptr = hash_ptr;
1514 
1515  if (one_to_many_ptr->getType()->isPointerTy()) {
1516  one_to_many_ptr =
1517  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1518  } else {
1519  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1520  }
1521 
1522  const auto composite_key_dict_size = offsetBufferOff();
1523  one_to_many_ptr =
1524  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1525 
1526  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1527  // this is likely the maximum value we can do that is safe to use across
1528  // all supported GPU architectures.
1529  const int max_array_size = 200;
1530  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1531  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1532  out_arr_lv->setName("out_arr");
1533 
1534  const auto casted_out_arr_lv =
1535  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1536 
1537  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1538 
1539  auto rowid_ptr_i32 =
1540  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1541 
1542  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1543  "get_candidate_rows",
1544  llvm::Type::getInt64Ty(LL_CONTEXT),
1545  {
1546  rowid_ptr_i32,
1547  LL_INT(max_array_size),
1548  many_to_many_args[1],
1549  LL_INT(0),
1552  many_to_many_args[0],
1553  LL_INT(key_component_count), // key_component_count
1554  composite_key_dict, // ptr to hash table
1555  LL_INT(getEntryCount()), // entry_count
1556  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1557  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1558  });
1559 
1560  const auto slot_lv = LL_INT(int64_t(0));
1561 
1562  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1563  } else {
1564  VLOG(1) << "Building codegenMatchingSet for Baseline";
1565  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1566  const auto key_component_width = getKeyComponentWidth();
1567  CHECK(key_component_width == 4 || key_component_width == 8);
1568  auto key_buff_lv = codegenKey(co);
1570  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1571  const auto composite_dict_ptr_type =
1572  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1573  const auto composite_key_dict =
1574  hash_ptr->getType()->isPointerTy()
1575  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1576  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1577  const auto key_component_count = getKeyComponentCount();
1578  const auto key = executor_->cgen_state_->emitExternalCall(
1579  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1580  get_int_type(64, LL_CONTEXT),
1581  {key_buff_lv,
1582  LL_INT(key_component_count),
1583  composite_key_dict,
1584  LL_INT(getEntryCount())});
1585  auto one_to_many_ptr = hash_ptr;
1586  if (one_to_many_ptr->getType()->isPointerTy()) {
1587  one_to_many_ptr =
1588  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1589  } else {
1590  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1591  }
1592  const auto composite_key_dict_size = offsetBufferOff();
1593  one_to_many_ptr =
1594  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1596  std::vector<llvm::Value*>{
1597  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1598  false,
1599  false,
1600  false,
1602  executor_);
1603  }
1604  UNREACHABLE();
1605  return HashJoinMatchingSet{};
1606 }
1607 
1609  const int device_id,
1610  bool raw) const {
1611  auto buffer = getJoinHashBuffer(device_type, device_id);
1612  CHECK_LT(device_id, hash_tables_for_device_.size());
1613  auto hash_table = hash_tables_for_device_[device_id];
1614  CHECK(hash_table);
1615  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1616 #ifdef HAVE_CUDA
1617  std::unique_ptr<int8_t[]> buffer_copy;
1618  if (device_type == ExecutorDeviceType::GPU) {
1619  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1620  CHECK(executor_);
1621  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1622 
1623  copy_from_gpu(&data_mgr,
1624  buffer_copy.get(),
1625  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1626  buffer_size,
1627  device_id);
1628  }
1629  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1630 #else
1631  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1632 #endif // HAVE_CUDA
1633  auto ptr2 = ptr1 + offsetBufferOff();
1634  auto ptr3 = ptr1 + countBufferOff();
1635  auto ptr4 = ptr1 + payloadBufferOff();
1636  CHECK(hash_table);
1637  const auto layout = getHashType();
1638  return HashTable::toString(
1639  "geo",
1640  getHashTypeString(layout),
1641  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1643  hash_table->getEntryCount(),
1644  ptr1,
1645  ptr2,
1646  ptr3,
1647  ptr4,
1648  buffer_size,
1649  raw);
1650 }
1651 
1652 std::set<DecodedJoinHashBufferEntry> OverlapsJoinHashTable::toSet(
1653  const ExecutorDeviceType device_type,
1654  const int device_id) const {
1655  auto buffer = getJoinHashBuffer(device_type, device_id);
1656  auto hash_table = getHashTableForDevice(device_id);
1657  CHECK(hash_table);
1658  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1659 #ifdef HAVE_CUDA
1660  std::unique_ptr<int8_t[]> buffer_copy;
1661  if (device_type == ExecutorDeviceType::GPU) {
1662  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1663  CHECK(executor_);
1664  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1665  copy_from_gpu(&data_mgr,
1666  buffer_copy.get(),
1667  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1668  buffer_size,
1669  device_id);
1670  }
1671  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1672 #else
1673  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1674 #endif // HAVE_CUDA
1675  auto ptr2 = ptr1 + offsetBufferOff();
1676  auto ptr3 = ptr1 + countBufferOff();
1677  auto ptr4 = ptr1 + payloadBufferOff();
1678  const auto layout = getHashType();
1679  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1681  hash_table->getEntryCount(),
1682  ptr1,
1683  ptr2,
1684  ptr3,
1685  ptr4,
1686  buffer_size);
1687 }
1688 
1690  const std::vector<InnerOuter>& inner_outer_pairs) const {
1691  // always build on CPU
1694  return memory_level_;
1695  }
1697 }
1698 
1700  try {
1702  } catch (...) {
1703  CHECK(false);
1704  }
1705  return 0;
1706 }
1707 
1709  const OverlapsHashTableCacheKey& key) {
1710  auto timer = DEBUG_TIMER(__func__);
1711  VLOG(1) << "Checking CPU hash table cache.";
1713  auto hash_table_opt = hash_table_cache_->getWithKey(key);
1714  if (hash_table_opt) {
1716  hash_table_opt->first.inverse_bucket_sizes);
1717  return hash_table_opt->second;
1718  }
1719  return nullptr;
1720 }
1721 
1722 std::optional<std::pair<size_t, size_t>>
1724  const OverlapsHashTableCacheKey& key) {
1725  for (auto chunk_key : key.chunk_keys) {
1726  CHECK_GE(chunk_key.size(), size_t(2));
1727  if (chunk_key[1] < 0) {
1728  return std::nullopt;
1729  ;
1730  }
1731  }
1732 
1734  auto hash_table_opt = hash_table_cache_->getWithKey(key);
1735  if (hash_table_opt) {
1736  auto hash_table = hash_table_opt->second;
1737  return std::make_pair(hash_table->getEntryCount() / 2,
1738  hash_table->getEmittedKeysCount());
1739  }
1740  return std::nullopt;
1741 }
1742 
1744  const OverlapsHashTableCacheKey& key,
1745  std::shared_ptr<HashTable> hash_table) {
1746  for (auto chunk_key : key.chunk_keys) {
1747  CHECK_GE(chunk_key.size(), size_t(2));
1748  if (chunk_key[1] < 0) {
1749  return;
1750  }
1751  }
1753  hash_table_cache_->insert(key, hash_table);
1754 }
llvm::Value * codegenKey(const CompilationOptions &)
#define CHECK_EQ(x, y)
Definition: Logger.h:211
int getInnerTableId() const noexceptoverride
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:233
void reifyWithLayout(const HashType layout)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
void copyFromDevice(int8_t *host_dst, const int8_t *device_src, const size_t num_bytes) const override
std::vector< double > BucketSizes
ExecutorDeviceType
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
bool overlaps_allow_gpu_build
Definition: QueryHint.h:169
#define const
#define LOG(tag)
Definition: Logger.h:194
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
const RegisteredQueryHint & getRegisteredQueryHint()
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(const OverlapsHashTableCacheKey &key)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:512
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:269
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:247
RegisteredQueryHint query_hint_
#define CHECK_GE(x, y)
Definition: Logger.h:216
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:324
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:160
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:215
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
const std::shared_ptr< Analyzer::BinOper > condition_
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(const OverlapsHashTableCacheKey &)
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:80
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:221
void reify(const HashType preferred_layout)
int count
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
ColumnCacheMap & column_cache_
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
void putHashTableOnCpuToCache(const OverlapsHashTableCacheKey &key, std::shared_ptr< HashTable > hash_table)
const std::vector< InputTableInfo > & query_infos_
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
size_t payloadBufferOff() const noexceptoverride
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
static std::unique_ptr< OverlapsHashTableCache< OverlapsHashTableCacheKey, HashTableCacheValue > > hash_table_cache_
TuningState(const size_t overlaps_max_table_size_bytes, const double overlaps_target_entries_per_bin)
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:99
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:705
#define LL_BUILDER
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const size_t key_component_width, const size_t key_component_count)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:98
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::shared_ptr< HashTable > HashTableCacheValue
virtual int getInnerTableId() const noexcept=0
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:214
#define VLOGGING(n)
Definition: Logger.h:201
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:28
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:319
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:213
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold, const int device_id, const logger::ThreadId parent_thread_id)
std::optional< HashType > layout_override_
int8_t * alloc(const size_t num_bytes) override
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
#define CHECK_LE(x, y)
Definition: Logger.h:214
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:133
static std::unique_ptr< HashTableCache< OverlapsHashTableCacheKey, std::pair< BucketThreshold, BucketSizes > > > auto_tuner_cache_
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:312
bool tuneOneStep(const TuningState::TuningDirection tuning_direction)
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
const bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:203
bool operator()(const HashTableProps &new_props, const bool new_overlaps_threshold)
ThreadId thread_id()
Definition: Logger.cpp:732
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
#define CHECK(condition)
Definition: Logger.h:203
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:319
char * t
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
const std::vector< ChunkKey > chunk_keys
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:82
#define LL_CONTEXT
int cpu_threads()
Definition: thread_count.h:24
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:79
bool isAnyQueryHintDelivered() const
Definition: QueryHint.h:187
#define VLOG(n)
Definition: Logger.h:297
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:129