OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
29 
30 std::unique_ptr<HashtableRecycler> OverlapsJoinHashTable::hash_table_cache_ =
31  std::make_unique<HashtableRecycler>(CacheItemType::OVERLAPS_HT,
33 std::unique_ptr<OverlapsTuningParamRecycler> OverlapsJoinHashTable::auto_tuner_cache_ =
34  std::make_unique<OverlapsTuningParamRecycler>();
35 
37 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
38  const std::shared_ptr<Analyzer::BinOper> condition,
39  const std::vector<InputTableInfo>& query_infos,
40  const Data_Namespace::MemoryLevel memory_level,
41  const JoinType join_type,
42  const int device_count,
43  ColumnCacheMap& column_cache,
44  Executor* executor,
45  const HashTableBuildDagMap& hashtable_build_dag_map,
46  const RegisteredQueryHint& query_hint,
47  const TableIdToNodeMap& table_id_to_node_map) {
48  decltype(std::chrono::steady_clock::now()) ts1, ts2;
49 
50  std::vector<InnerOuter> inner_outer_pairs;
51 
52  if (const auto range_expr =
53  dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
54  return RangeJoinHashTable::getInstance(condition,
55  range_expr,
56  query_infos,
57  memory_level,
58  join_type,
59  device_count,
60  column_cache,
61  executor,
62  hashtable_build_dag_map,
63  query_hint,
64  table_id_to_node_map);
65  } else {
66  inner_outer_pairs =
68  condition.get(), *executor->getCatalog(), executor->getTemporaryTables())
69  .first;
70  }
71  CHECK(!inner_outer_pairs.empty());
72 
73  const auto getHashTableType =
74  [](const std::shared_ptr<Analyzer::BinOper> condition,
75  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
77  if (condition->is_overlaps_oper()) {
78  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
79  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
80  inner_outer_pairs[0].second->get_type_info().is_array() &&
81  // Bounds vs constructed points, former should yield ManyToMany
82  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
83  layout = HashType::ManyToMany;
84  }
85  }
86  return layout;
87  };
88 
89  const auto layout = getHashTableType(condition, inner_outer_pairs);
90 
91  if (VLOGGING(1)) {
92  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
93  << " for qual: " << condition->toString();
94  ts1 = std::chrono::steady_clock::now();
95  }
96 
97  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
98  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
99 
100  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
101  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
102 
103  const auto& query_info =
104  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
105  .info;
106  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
107  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
108  throw TooManyHashEntries();
109  }
110 
111  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
112  join_type,
113  query_infos,
114  memory_level,
115  column_cache,
116  executor,
117  inner_outer_pairs,
118  device_count,
119  hashtable_build_dag_map,
120  table_id_to_node_map);
121  if (query_hint.isAnyQueryHintDelivered()) {
122  join_hash_table->registerQueryHint(query_hint);
123  }
124  try {
125  join_hash_table->reify(layout);
126  } catch (const HashJoinFail& e) {
127  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
128  "involved in overlaps join | ") +
129  e.what());
130  } catch (const ColumnarConversionNotSupported& e) {
131  throw HashJoinFail(std::string("Could not build hash tables for overlaps join | "
132  "Inner table too big. Attempt manual table reordering "
133  "or create a single fragment inner table. | ") +
134  e.what());
135  } catch (const std::exception& e) {
136  throw HashJoinFail(std::string("Failed to build hash tables for overlaps join | ") +
137  e.what());
138  }
139  if (VLOGGING(1)) {
140  ts2 = std::chrono::steady_clock::now();
141  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
142  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
143  << " ms";
144  }
145  return join_hash_table;
146 }
147 
148 namespace {
149 
151  const std::vector<double>& bucket_sizes,
152  const std::vector<double>& bucket_thresholds,
153  const double initial_value) {
154  std::vector<double> corrected_bucket_sizes(bucket_sizes);
155  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
156  if (bucket_sizes[i] == initial_value) {
157  corrected_bucket_sizes[i] = bucket_thresholds[i];
158  }
159  }
160  return corrected_bucket_sizes;
161 }
162 
163 std::vector<double> compute_bucket_sizes(
164  const std::vector<double>& bucket_thresholds,
165  const Data_Namespace::MemoryLevel effective_memory_level,
166  const JoinColumn& join_column,
167  const JoinColumnTypeInfo& join_column_type,
168  const std::vector<InnerOuter>& inner_outer_pairs,
169  const Executor* executor) {
170  // No coalesced keys for overlaps joins yet
171  CHECK_EQ(inner_outer_pairs.size(), 1u);
172 
173  const auto col = inner_outer_pairs[0].first;
174  CHECK(col);
175  const auto col_ti = col->get_type_info();
176  CHECK(col_ti.is_array());
177 
178  // TODO: Compute the number of dimensions for this overlaps key
179  const size_t num_dims{2};
180  const double initial_bin_value{0.0};
181  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
182  CHECK_EQ(bucket_thresholds.size(), num_dims);
183 
184  VLOG(1)
185  << "Computing x and y bucket sizes for overlaps hash join with maximum bucket size "
186  << std::to_string(bucket_thresholds[0]) << ", "
187  << std::to_string(bucket_thresholds[1]);
188 
189  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
190  const int thread_count = cpu_threads();
192  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
193  }
194 #ifdef HAVE_CUDA
195  else {
196  // Note that we compute the bucket sizes using only a single GPU
197  const int device_id = 0;
198  auto data_mgr = executor->getDataMgr();
199  CudaAllocator allocator(
200  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
201  auto device_bucket_sizes_gpu =
202  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
203  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
204  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
205  auto device_bucket_thresholds_gpu =
206  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
207 
208  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
209  join_column_gpu,
210  join_column_type_gpu,
211  device_bucket_thresholds_gpu);
212  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
213  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
214  bucket_sizes.size() * sizeof(double));
215  }
216 #endif
217  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
218  bucket_sizes, bucket_thresholds, initial_bin_value);
219 
220  VLOG(1) << "Computed x and y bucket sizes for overlaps hash join: ("
221  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
222 
223  return corrected_bucket_sizes;
224 }
225 
227  HashTableProps(const size_t entry_count,
228  const size_t emitted_keys_count,
229  const size_t hash_table_size,
230  const std::vector<double>& bucket_sizes)
231  : entry_count(entry_count)
232  , emitted_keys_count(emitted_keys_count)
233  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
234  : emitted_keys_count / (entry_count / 2.0))
235  , hash_table_size(hash_table_size)
236  , bucket_sizes(bucket_sizes) {}
237 
238  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
239 
240  size_t entry_count;
242  double keys_per_bin;
244  std::vector<double> bucket_sizes;
245 };
246 
247 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
248  os << " entry_count: " << props.entry_count << ", emitted_keys "
249  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
250  << ", keys per bin " << props.keys_per_bin;
251  return os;
252 }
253 
254 struct TuningState {
255  TuningState(const size_t overlaps_max_table_size_bytes,
256  const double overlaps_target_entries_per_bin)
257  : crt_props(HashTableProps::invalid())
258  , prev_props(HashTableProps::invalid())
259  , chosen_overlaps_threshold(-1)
260  , crt_step(0)
261  , crt_reverse_search_iteration(0)
262  , overlaps_max_table_size_bytes(overlaps_max_table_size_bytes)
263  , overlaps_target_entries_per_bin(overlaps_target_entries_per_bin) {}
264 
265  // current and previous props, allows for easy backtracking
268 
269  // value we are tuning for
271  enum class TuningDirection { SMALLER, LARGER };
272  TuningDirection tuning_direction{TuningDirection::SMALLER};
273 
274  // various constants / state
275  size_t crt_step; // 1 indexed
276  size_t crt_reverse_search_iteration; // 1 indexed
279  const size_t max_reverse_search_iterations{8};
280 
285  bool operator()(const HashTableProps& new_props, const bool new_overlaps_threshold) {
286  prev_props = crt_props;
287  crt_props = new_props;
288  crt_step++;
289 
290  if (hashTableTooBig() || keysPerBinIncreasing()) {
291  if (hashTableTooBig()) {
292  VLOG(1) << "Reached hash table size limit: " << overlaps_max_table_size_bytes
293  << " with " << crt_props.hash_table_size << " byte hash table, "
294  << crt_props.keys_per_bin << " keys per bin.";
295  } else if (keysPerBinIncreasing()) {
296  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
297  << crt_props.keys_per_bin;
298  CHECK(previousIterationValid());
299  }
300  if (previousIterationValid()) {
301  VLOG(1) << "Using previous threshold value " << chosen_overlaps_threshold;
302  crt_props = prev_props;
303  return false;
304  } else {
305  CHECK(hashTableTooBig());
306  crt_reverse_search_iteration++;
307  chosen_overlaps_threshold = new_overlaps_threshold;
308 
309  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
310  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
311  << ") of reverse tuning iterations. Aborting tuning";
312  // use the crt props, but don't bother trying to tune any farther
313  return false;
314  }
315 
316  if (crt_reverse_search_iteration > 1 &&
317  crt_props.hash_table_size == prev_props.hash_table_size) {
318  // hash table size is not changing, bail
319  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
320  << " bytes) and still above maximum allowed size ("
321  << overlaps_max_table_size_bytes << " bytes). Aborting tuning";
322  return false;
323  }
324 
325  // if the hash table is too big on the very first step, change direction towards
326  // larger bins to see if a slightly smaller hash table will fit
327  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
328  VLOG(1)
329  << "First iteration of overlaps tuning led to hash table size over "
330  "limit. Reversing search to try larger bin sizes (previous threshold: "
331  << chosen_overlaps_threshold << ")";
332  // Need to change direction of tuning to tune "up" towards larger bins
333  tuning_direction = TuningDirection::LARGER;
334  }
335  return true;
336  }
337  UNREACHABLE();
338  }
339 
340  chosen_overlaps_threshold = new_overlaps_threshold;
341 
342  if (keysPerBinUnderThreshold()) {
343  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
344  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
345  << overlaps_target_entries_per_bin << ". Terminating bucket size loop.";
346  return false;
347  }
348 
349  if (crt_reverse_search_iteration > 0) {
350  // We always take the first tuning iteration that succeeds when reversing
351  // direction, as if we're here we haven't had a successful iteration and we're
352  // "backtracking" our search by making bin sizes larger
353  VLOG(1) << "On reverse (larger tuning direction) search found workable "
354  << " hash table size of " << crt_props.hash_table_size
355  << " with keys per bin " << crt_props.keys_per_bin
356  << ". Terminating bucket size loop.";
357  return false;
358  }
359 
360  return true;
361  }
362 
363  bool hashTableTooBig() const {
364  return crt_props.hash_table_size > overlaps_max_table_size_bytes;
365  }
366 
367  bool keysPerBinIncreasing() const {
368  return crt_props.keys_per_bin > prev_props.keys_per_bin;
369  }
370 
371  bool previousIterationValid() const {
372  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
373  }
374 
376  return crt_props.keys_per_bin < overlaps_target_entries_per_bin;
377  }
378 };
379 
381  public:
382  BucketSizeTuner(const double bucket_threshold,
383  const double step,
384  const double min_threshold,
385  const Data_Namespace::MemoryLevel effective_memory_level,
386  const std::vector<ColumnsForDevice>& columns_per_device,
387  const std::vector<InnerOuter>& inner_outer_pairs,
388  const size_t table_tuple_count,
389  const Executor* executor)
390  : num_dims_(2) // Todo: allow varying number of dims
391  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
392  , step_(step)
393  , min_threshold_(min_threshold)
394  , effective_memory_level_(effective_memory_level)
395  , columns_per_device_(columns_per_device)
396  , inner_outer_pairs_(inner_outer_pairs)
397  , table_tuple_count_(table_tuple_count)
398  , executor_(executor) {
399  CHECK(!columns_per_device_.empty());
400  }
401 
402  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
403 
404  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
405  return tuneOneStep(tuning_direction, step_);
406  }
407 
408  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
409  const double step_overide) {
410  if (table_tuple_count_ == 0) {
411  return false;
412  }
413  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
414  return tuneSmallerOneStep(step_overide);
415  }
416  return tuneLargerOneStep(step_overide);
417  }
418 
419  auto getMinBucketSize() const {
420  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
421  }
422 
429  std::vector<double> getInverseBucketSizes() {
430  if (num_steps_ == 0) {
431  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
432  current_bucket_sizes_ = computeBucketSizes();
433  }
434  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
435  std::vector<double> inverse_bucket_sizes;
436  for (const auto s : current_bucket_sizes_) {
437  inverse_bucket_sizes.emplace_back(1.0 / s);
438  }
439  return inverse_bucket_sizes;
440  }
441 
442  private:
444  for (const auto& t : bucket_thresholds_) {
445  if (t < min_threshold_) {
446  return true;
447  }
448  }
449  return false;
450  }
451 
452  std::vector<double> computeBucketSizes() const {
453  if (table_tuple_count_ == 0) {
454  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
455  }
456  return compute_bucket_sizes(bucket_thresholds_,
457  effective_memory_level_,
458  columns_per_device_.front().join_columns[0],
459  columns_per_device_.front().join_column_types[0],
460  inner_outer_pairs_,
461  executor_);
462  }
463 
464  bool tuneSmallerOneStep(const double step_overide) {
465  if (!current_bucket_sizes_.empty()) {
466  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
467  bucket_thresholds_ = current_bucket_sizes_;
468  for (auto& t : bucket_thresholds_) {
469  t /= step_overide;
470  }
471  }
472  if (bucketThresholdsBelowMinThreshold()) {
473  VLOG(1) << "Aborting overlaps tuning as at least one bucket size is below min "
474  "threshold";
475  return false;
476  }
477  const auto next_bucket_sizes = computeBucketSizes();
478  if (next_bucket_sizes == current_bucket_sizes_) {
479  VLOG(1) << "Aborting overlaps tuning as bucket size is no longer changing.";
480  return false;
481  }
482 
483  current_bucket_sizes_ = next_bucket_sizes;
484  num_steps_++;
485  return true;
486  }
487 
488  bool tuneLargerOneStep(const double step_overide) {
489  if (!current_bucket_sizes_.empty()) {
490  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
491  bucket_thresholds_ = current_bucket_sizes_;
492  }
493  // If current_bucket_sizes was empty, we will start from our initial threshold
494  for (auto& t : bucket_thresholds_) {
495  t *= step_overide;
496  }
497  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
498  // written will pick the largest bin size below the threshold, meaning our bucket_size
499  // will never increase beyond the size of the largest polygon. This could mean that we
500  // can never make the bucket sizes large enough to get our hash table below the
501  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
502  // allows for optionally finding smallest extent above threshold, to mirror default
503  // behavior finding largest extent below threshold, and use former variant here
504  current_bucket_sizes_ = bucket_thresholds_;
505  num_steps_++;
506  return true;
507  }
508 
509  size_t num_dims_;
510  std::vector<double> bucket_thresholds_;
511  size_t num_steps_{0};
512  const double step_;
513  const double min_threshold_;
515  const std::vector<ColumnsForDevice>& columns_per_device_;
516  const std::vector<InnerOuter>& inner_outer_pairs_;
517  const size_t table_tuple_count_;
518  const Executor* executor_;
519 
520  std::vector<double> current_bucket_sizes_;
521 
522  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
523 };
524 
525 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
526  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed << "("
527  << tuner.bucket_thresholds_[0] << ", " << tuner.bucket_thresholds_[1] << ")"
528  << ", Step Size: " << std::fixed << tuner.step_ << ", Min: " << std::fixed
529  << tuner.min_threshold_;
530  return os;
531 }
532 
533 } // namespace
534 
536  auto timer = DEBUG_TIMER(__func__);
538  const auto& query_info =
540  .info;
541  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
542  << "for table_id: " << HashJoin::getInnerTableId(inner_outer_pairs_);
543  if (query_info.fragments.empty()) {
544  return;
545  }
546 
547  auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
548  std::optional<double> overlaps_threshold_override;
549  double overlaps_target_entries_per_bin = g_overlaps_target_entries_per_bin;
550  auto query_hint = getRegisteredQueryHint();
551  auto skip_hashtable_caching = false;
552  if (query_hint.isHintRegistered(QueryHint::kOverlapsBucketThreshold)) {
553  VLOG(1) << "Setting overlaps bucket threshold "
554  "\'overlaps_hashjoin_bucket_threshold\' via "
555  "query hint: "
556  << query_hint.overlaps_bucket_threshold;
557  overlaps_threshold_override = query_hint.overlaps_bucket_threshold;
558  }
559  if (query_hint.isHintRegistered(QueryHint::kOverlapsMaxSize)) {
560  std::ostringstream oss;
561  oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
562  "query hint";
563  if (!overlaps_threshold_override.has_value()) {
564  oss << ": " << overlaps_max_table_size_bytes << " -> "
565  << query_hint.overlaps_max_size;
566  overlaps_max_table_size_bytes = query_hint.overlaps_max_size;
567  } else {
568  oss << ", but is skipped since the query hint also changes the threshold "
569  "\'overlaps_hashjoin_bucket_threshold\'";
570  }
571  VLOG(1) << oss.str();
572  }
573  if (query_hint.isHintRegistered(QueryHint::kOverlapsNoCache)) {
574  VLOG(1) << "User requests to skip caching overlaps join hashtable and its tuned "
575  "parameters for this query";
576  skip_hashtable_caching = true;
577  }
578  if (query_hint.isHintRegistered(QueryHint::kOverlapsKeysPerBin)) {
579  VLOG(1) << "User requests to change a threshold \'overlaps_keys_per_bin\' via query "
580  "hint: "
581  << overlaps_target_entries_per_bin << " -> "
582  << query_hint.overlaps_keys_per_bin;
583  overlaps_target_entries_per_bin = query_hint.overlaps_keys_per_bin;
584  }
585 
586  auto data_mgr = executor_->getDataMgr();
587  // we prioritize CPU when building an overlaps join hashtable, but if we have GPU and
588  // user-given hint is given we selectively allow GPU to build it but even if we have GPU
589  // but user foces to set CPU as execution device type we should not allow to use GPU for
590  // building it
591  auto allow_gpu_hashtable_build =
594  if (allow_gpu_hashtable_build) {
595  if (data_mgr->gpusPresent() &&
597  VLOG(1) << "A user forces to build GPU hash table for this overlaps join operator";
598  } else {
599  allow_gpu_hashtable_build = false;
600  VLOG(1) << "A user forces to build GPU hash table for this overlaps join operator "
601  "but we "
602  "skip it since either GPU is not presented or CPU execution mode is set";
603  }
604  }
605 
606  std::vector<ColumnsForDevice> columns_per_device;
607  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
609  allow_gpu_hashtable_build) {
610  for (int device_id = 0; device_id < device_count_; ++device_id) {
611  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
612  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
613  }
614  }
615 
616  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
617  const auto shard_count = shardCount();
618  size_t total_num_tuples = 0;
619  for (int device_id = 0; device_id < device_count_; ++device_id) {
620  fragments_per_device.emplace_back(
621  shard_count
622  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
623  : query_info.fragments);
624  const size_t crt_num_tuples =
625  std::accumulate(fragments_per_device.back().begin(),
626  fragments_per_device.back().end(),
627  size_t(0),
628  [](const auto& sum, const auto& fragment) {
629  return sum + fragment.getNumTuples();
630  });
631  total_num_tuples += crt_num_tuples;
632  const auto columns_for_device =
633  fetchColumnsForDevice(fragments_per_device.back(),
634  device_id,
636  allow_gpu_hashtable_build
637  ? dev_buff_owners[device_id].get()
638  : nullptr);
639  columns_per_device.push_back(columns_for_device);
640  }
641 
642  // try to extract cache key for hash table and its relevant info
643  auto hashtable_access_path_info =
645  {},
646  condition_->get_optype(),
647  join_type_,
650  shard_count,
651  fragments_per_device,
652  executor_);
653  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
654  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
655  table_keys_ = hashtable_access_path_info.table_keys;
656 
657  if (overlaps_threshold_override) {
658  // compute bucket sizes based on the user provided threshold
659  BucketSizeTuner tuner(/*initial_threshold=*/*overlaps_threshold_override,
660  /*step=*/1.0,
661  /*min_threshold=*/0.0,
663  columns_per_device,
665  total_num_tuples,
666  executor_);
667  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
668 
669  auto [entry_count, emitted_keys_count] =
670  computeHashTableCounts(shard_count,
671  inverse_bucket_sizes,
672  columns_per_device,
673  overlaps_max_table_size_bytes,
674  *overlaps_threshold_override);
675  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
676  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
677  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
678  // one
679  generateCacheKey(overlaps_max_table_size_bytes,
680  *overlaps_threshold_override,
681  inverse_bucket_sizes,
682  fragments_per_device,
683  device_count_);
684  reifyImpl(columns_per_device,
685  query_info,
686  layout,
687  shard_count,
688  entry_count,
689  emitted_keys_count,
690  skip_hashtable_caching,
691  overlaps_max_table_size_bytes,
692  *overlaps_threshold_override);
693  } else {
694  double overlaps_bucket_threshold = std::numeric_limits<double>::max();
695  generateCacheKey(overlaps_max_table_size_bytes,
696  overlaps_bucket_threshold,
697  {},
698  fragments_per_device,
699  device_count_);
700  std::vector<size_t> per_device_chunk_key(device_count_);
701  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
702  inner_outer_pairs_.front().first->get_table_id() > 0) {
703  std::vector<int> alternative_table_key{
706  const auto table_keys =
707  std::unordered_set<size_t>{boost::hash_value(alternative_table_key)};
708 
709  for (int device_id = 0; device_id < device_count_; ++device_id) {
710  auto chunk_key_hash = boost::hash_value(composite_key_info_.cache_key_chunks);
711  boost::hash_combine(
712  chunk_key_hash,
713  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
714  per_device_chunk_key.push_back(chunk_key_hash);
717  columns_per_device.front().join_columns.front().num_elems,
718  chunk_key_hash,
719  condition_->get_optype(),
720  overlaps_max_table_size_bytes,
721  overlaps_bucket_threshold,
722  {}};
723  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
724  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
725  table_keys);
726  }
727  }
728 
729  auto cached_bucket_threshold =
730  auto_tuner_cache_->getItemFromCache(hashtable_cache_key_.front(),
733  if (cached_bucket_threshold) {
734  overlaps_bucket_threshold = cached_bucket_threshold->bucket_threshold;
735  auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
737  overlaps_max_table_size_bytes, overlaps_bucket_threshold, inverse_bucket_sizes);
738  generateCacheKey(overlaps_max_table_size_bytes,
739  overlaps_bucket_threshold,
740  inverse_bucket_sizes,
741  fragments_per_device,
742  device_count_);
743 
744  if (auto hash_table =
745  hash_table_cache_->getItemFromCache(hashtable_cache_key_[device_count_],
748  std::nullopt)) {
749  // if we already have a built hash table, we can skip the scans required for
750  // computing bucket size and tuple count
751  // reset as the hash table sizes can vary a bit
752  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
753  CHECK(hash_table);
754 
755  VLOG(1) << "Using cached hash table bucket size";
756 
757  reifyImpl(columns_per_device,
758  query_info,
759  layout,
760  shard_count,
761  hash_table->getEntryCount(),
762  hash_table->getEmittedKeysCount(),
763  skip_hashtable_caching,
764  overlaps_max_table_size_bytes,
765  overlaps_bucket_threshold);
766  } else {
767  VLOG(1) << "Computing bucket size for cached bucket threshold";
768  // compute bucket size using our cached tuner value
769  BucketSizeTuner tuner(/*initial_threshold=*/overlaps_bucket_threshold,
770  /*step=*/1.0,
771  /*min_threshold=*/0.0,
773  columns_per_device,
775  total_num_tuples,
776  executor_);
777 
778  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
779 
780  auto [entry_count, emitted_keys_count] =
781  computeHashTableCounts(shard_count,
782  inverse_bucket_sizes,
783  columns_per_device,
784  overlaps_max_table_size_bytes,
785  overlaps_bucket_threshold);
786  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
787 
788  generateCacheKey(overlaps_max_table_size_bytes,
789  overlaps_bucket_threshold,
790  inverse_bucket_sizes,
791  fragments_per_device,
792  device_count_);
793 
794  reifyImpl(columns_per_device,
795  query_info,
796  layout,
797  shard_count,
798  entry_count,
799  emitted_keys_count,
800  skip_hashtable_caching,
801  overlaps_max_table_size_bytes,
802  overlaps_bucket_threshold);
803  }
804  } else {
805  // compute bucket size using the auto tuner
806  BucketSizeTuner tuner(
807  /*initial_threshold=*/overlaps_bucket_threshold,
808  /*step=*/2.0,
809  /*min_threshold=*/1e-7,
811  columns_per_device,
813  total_num_tuples,
814  executor_);
815 
816  VLOG(1) << "Running overlaps join size auto tune with parameters: " << tuner;
817 
818  // manages the tuning state machine
819  TuningState tuning_state(overlaps_max_table_size_bytes,
820  overlaps_target_entries_per_bin);
821  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
822  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
823 
824  const auto [crt_entry_count, crt_emitted_keys_count] =
825  computeHashTableCounts(shard_count,
826  inverse_bucket_sizes,
827  columns_per_device,
828  tuning_state.overlaps_max_table_size_bytes,
829  tuning_state.chosen_overlaps_threshold);
830  const size_t hash_table_size = calculateHashTableSize(
831  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
832  HashTableProps crt_props(crt_entry_count,
833  crt_emitted_keys_count,
834  hash_table_size,
835  inverse_bucket_sizes);
836  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
837 
838  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
840  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
841  if (!should_continue) {
842  break;
843  }
844  }
845 
846  const auto& crt_props = tuning_state.crt_props;
847  // sanity check that the hash table size has not changed. this is a fairly
848  // inexpensive check to ensure the above algorithm is consistent
849  const size_t hash_table_size =
851  crt_props.emitted_keys_count,
852  crt_props.entry_count);
853  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
854 
856  hash_table_size > overlaps_max_table_size_bytes) {
857  VLOG(1) << "Could not find suitable overlaps join parameters to create hash "
858  "table under max allowed size ("
859  << overlaps_max_table_size_bytes << ") bytes.";
860  throw OverlapsHashTableTooBig(overlaps_max_table_size_bytes);
861  }
862 
863  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
865  VLOG(1) << "Final bucket sizes: ";
866  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
867  VLOG(1) << "dim[" << dim
868  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
869  }
870  CHECK_GE(tuning_state.chosen_overlaps_threshold, double(0));
871  generateCacheKey(tuning_state.overlaps_max_table_size_bytes,
872  tuning_state.chosen_overlaps_threshold,
873  {},
874  fragments_per_device,
875  device_count_);
876  const auto candidate_auto_tuner_cache_key = hashtable_cache_key_.front();
877  if (skip_hashtable_caching) {
878  VLOG(1) << "Skip to add tuned parameters to auto tuner";
879  } else {
880  AutoTunerMetaInfo meta_info{tuning_state.overlaps_max_table_size_bytes,
881  tuning_state.chosen_overlaps_threshold,
883  auto_tuner_cache_->putItemToCache(candidate_auto_tuner_cache_key,
884  meta_info,
887  0,
888  0);
889  }
890  overlaps_bucket_threshold = tuning_state.chosen_overlaps_threshold;
891  reifyImpl(columns_per_device,
892  query_info,
893  layout,
894  shard_count,
895  crt_props.entry_count,
896  crt_props.emitted_keys_count,
897  skip_hashtable_caching,
898  overlaps_max_table_size_bytes,
899  overlaps_bucket_threshold);
900  }
901  }
902 }
903 
904 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
905  size_t emitted_keys_count,
906  size_t entry_count) const {
907  const auto key_component_width = getKeyComponentWidth();
908  const auto key_component_count = number_of_dimensions;
909  const auto entry_size = key_component_count * key_component_width;
910  const auto keys_for_all_rows = emitted_keys_count;
911  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
912  const size_t hash_table_size =
913  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
914  return hash_table_size;
915 }
916 
918  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
919  const int device_id,
920  DeviceAllocator* dev_buff_owner) {
921  const auto& catalog = *executor_->getCatalog();
922  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
923 
924  std::vector<JoinColumn> join_columns;
925  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
926  std::vector<JoinColumnTypeInfo> join_column_types;
927  std::vector<std::shared_ptr<void>> malloc_owner;
928  for (const auto& inner_outer_pair : inner_outer_pairs_) {
929  const auto inner_col = inner_outer_pair.first;
930  const auto inner_cd = get_column_descriptor_maybe(
931  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
932  if (inner_cd && inner_cd->isVirtualCol) {
934  }
935  join_columns.emplace_back(fetchJoinColumn(inner_col,
936  fragments,
937  effective_memory_level,
938  device_id,
939  chunks_owner,
940  dev_buff_owner,
941  malloc_owner,
942  executor_,
943  &column_cache_));
944  const auto& ti = inner_col->get_type_info();
945  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
946  0,
947  0,
948  inline_int_null_value<int64_t>(),
949  false,
950  0,
952  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
953  }
954  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
955 }
956 
958  const size_t shard_count,
959  const std::vector<double>& inverse_bucket_sizes_for_dimension,
960  std::vector<ColumnsForDevice>& columns_per_device,
961  const size_t chosen_max_hashtable_size,
962  const double chosen_bucket_threshold) {
963  CHECK(!inverse_bucket_sizes_for_dimension.empty());
964  const auto [tuple_count, emitted_keys_count] =
965  approximateTupleCount(inverse_bucket_sizes_for_dimension,
966  columns_per_device,
967  chosen_max_hashtable_size,
968  chosen_bucket_threshold);
969  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
970 
971  return std::make_pair(
972  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
973  emitted_keys_count);
974 }
975 
977  const std::vector<double>& inverse_bucket_sizes_for_dimension,
978  std::vector<ColumnsForDevice>& columns_per_device,
979  const size_t chosen_max_hashtable_size,
980  const double chosen_bucket_threshold) {
981  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
982  CountDistinctDescriptor count_distinct_desc{
984  0,
985  11,
986  true,
987  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
990  1};
991  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
992 
993  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
994  if (columns_per_device.front().join_columns.front().num_elems == 0) {
995  return std::make_pair(0, 0);
996  }
997 
998  // TODO: state management in here should be revisited, but this should be safe enough
999  // for now
1000  // re-compute bucket counts per device based on global bucket size
1001  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
1002  auto& columns_for_device = columns_per_device[device_id];
1003  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
1005  }
1006 
1007  // Number of keys must match dimension of buckets
1008  CHECK_EQ(columns_per_device.front().join_columns.size(),
1009  columns_per_device.front().join_buckets.size());
1010  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1011  // Note that this path assumes each device has the same hash table (for GPU hash
1012  // join w/ hash table built on CPU)
1013  const auto cached_count_info =
1017  if (cached_count_info) {
1018  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
1019  << ", emitted keys count: " << cached_count_info->second;
1020  return *cached_count_info;
1021  }
1022  int thread_count = cpu_threads();
1023  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
1024  auto hll_result = &hll_buffer_all_cpus[0];
1025 
1026  std::vector<int32_t> num_keys_for_row;
1027  // TODO(adb): support multi-column overlaps join
1028  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
1029 
1031  num_keys_for_row,
1032  count_distinct_desc.bitmap_sz_bits,
1033  padded_size_bytes,
1034  columns_per_device.front().join_columns,
1035  columns_per_device.front().join_column_types,
1036  columns_per_device.front().join_buckets,
1037  thread_count);
1038  for (int i = 1; i < thread_count; ++i) {
1039  hll_unify(hll_result,
1040  hll_result + i * padded_size_bytes,
1041  1 << count_distinct_desc.bitmap_sz_bits);
1042  }
1043  return std::make_pair(
1044  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1045  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
1046  }
1047 #ifdef HAVE_CUDA
1048  auto data_mgr = executor_->getDataMgr();
1049  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
1050  for (auto& host_hll_buffer : host_hll_buffers) {
1051  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
1052  }
1053  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
1054  std::vector<std::future<void>> approximate_distinct_device_threads;
1055  for (int device_id = 0; device_id < device_count_; ++device_id) {
1056  approximate_distinct_device_threads.emplace_back(std::async(
1058  [device_id,
1059  &columns_per_device,
1060  &count_distinct_desc,
1061  data_mgr,
1062  &host_hll_buffers,
1063  &emitted_keys_count_device_threads] {
1064  auto allocator = std::make_unique<CudaAllocator>(
1065  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1066  auto device_hll_buffer =
1067  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1068  data_mgr->getCudaMgr()->zeroDeviceMem(
1069  device_hll_buffer,
1070  count_distinct_desc.bitmapPaddedSizeBytes(),
1071  device_id,
1073  const auto& columns_for_device = columns_per_device[device_id];
1074  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
1075  columns_for_device.join_columns, *allocator);
1076 
1077  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1078  const auto& inverse_bucket_sizes_for_dimension =
1079  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1080  auto inverse_bucket_sizes_gpu = allocator->alloc(
1081  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1082  allocator->copyToDevice(
1083  inverse_bucket_sizes_gpu,
1084  inverse_bucket_sizes_for_dimension.data(),
1085  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1086  const size_t row_counts_buffer_sz =
1087  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
1088  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1089  data_mgr->getCudaMgr()->zeroDeviceMem(
1090  row_counts_buffer,
1091  row_counts_buffer_sz,
1092  device_id,
1094  const auto key_handler =
1095  OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1096  join_columns_gpu,
1097  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
1098  const auto key_handler_gpu =
1099  transfer_flat_object_to_gpu(key_handler, *allocator);
1101  reinterpret_cast<uint8_t*>(device_hll_buffer),
1102  count_distinct_desc.bitmap_sz_bits,
1103  reinterpret_cast<int32_t*>(row_counts_buffer),
1104  key_handler_gpu,
1105  columns_for_device.join_columns[0].num_elems);
1106 
1107  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1108  allocator->copyFromDevice(
1109  &host_emitted_keys_count,
1110  row_counts_buffer +
1111  (columns_per_device.front().join_columns[0].num_elems - 1) *
1112  sizeof(int32_t),
1113  sizeof(int32_t));
1114 
1115  auto& host_hll_buffer = host_hll_buffers[device_id];
1116  allocator->copyFromDevice(&host_hll_buffer[0],
1117  device_hll_buffer,
1118  count_distinct_desc.bitmapPaddedSizeBytes());
1119  }));
1120  }
1121  for (auto& child : approximate_distinct_device_threads) {
1122  child.get();
1123  }
1124  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1125  auto& result_hll_buffer = host_hll_buffers.front();
1126  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1127  for (int device_id = 1; device_id < device_count_; ++device_id) {
1128  auto& host_hll_buffer = host_hll_buffers[device_id];
1129  hll_unify(hll_result,
1130  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1131  1 << count_distinct_desc.bitmap_sz_bits);
1132  }
1133  const size_t emitted_keys_count =
1134  std::accumulate(emitted_keys_count_device_threads.begin(),
1135  emitted_keys_count_device_threads.end(),
1136  0);
1137  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1138  emitted_keys_count);
1139 #else
1140  UNREACHABLE();
1141  return {0, 0};
1142 #endif // HAVE_CUDA
1143 }
1144 
1146  const std::vector<double>& inverse_bucket_sizes,
1147  std::vector<ColumnsForDevice>& columns_per_device,
1148  const size_t device_count) {
1149  // set global bucket size
1150  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1151 
1152  // re-compute bucket counts per device based on global bucket size
1153  CHECK_EQ(columns_per_device.size(), static_cast<size_t>(device_count));
1154  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1155  auto& columns_for_device = columns_per_device[device_id];
1156  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1158  }
1159 }
1160 
1162  return 8;
1163 }
1164 
1168 }
1169 
1170 void OverlapsJoinHashTable::reify(const HashType preferred_layout) {
1171  auto timer = DEBUG_TIMER(__func__);
1172  CHECK_LT(0, device_count_);
1174 
1175  CHECK(condition_->is_overlaps_oper());
1176  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1177  HashType layout;
1178  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1179  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1180  // bounds array
1181  layout = HashType::ManyToMany;
1182  } else {
1183  layout = HashType::OneToMany;
1184  }
1185  try {
1186  reifyWithLayout(layout);
1187  return;
1188  } catch (const std::exception& e) {
1189  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
1190  << e.what();
1191  throw;
1192  }
1193 }
1194 
1195 void OverlapsJoinHashTable::reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
1196  const Fragmenter_Namespace::TableInfo& query_info,
1197  const HashType layout,
1198  const size_t shard_count,
1199  const size_t entry_count,
1200  const size_t emitted_keys_count,
1201  const bool skip_hashtable_caching,
1202  const size_t chosen_max_hashtable_size,
1203  const double chosen_bucket_threshold) {
1204  std::vector<std::future<void>> init_threads;
1205  chosen_overlaps_bucket_threshold_ = chosen_bucket_threshold;
1206  chosen_overlaps_max_table_size_bytes_ = chosen_max_hashtable_size;
1210 
1211  for (int device_id = 0; device_id < device_count_; ++device_id) {
1212  const auto fragments =
1213  shard_count
1214  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1215  : query_info.fragments;
1216  init_threads.push_back(std::async(std::launch::async,
1218  this,
1219  columns_per_device[device_id],
1220  layout,
1221  entry_count,
1222  emitted_keys_count,
1223  skip_hashtable_caching,
1224  device_id,
1225  logger::thread_id()));
1226  }
1227  for (auto& init_thread : init_threads) {
1228  init_thread.wait();
1229  }
1230  for (auto& init_thread : init_threads) {
1231  init_thread.get();
1232  }
1233 }
1234 
1236  const HashType layout,
1237  const size_t entry_count,
1238  const size_t emitted_keys_count,
1239  const bool skip_hashtable_caching,
1240  const int device_id,
1241  const logger::ThreadId parent_thread_id) {
1242  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1243  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1245  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1246 
1247  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1248  VLOG(1) << "Building overlaps join hash table on CPU.";
1249  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1250  columns_for_device.join_column_types,
1251  columns_for_device.join_buckets,
1252  layout,
1253  entry_count,
1254  emitted_keys_count,
1255  skip_hashtable_caching);
1256  CHECK(hash_table);
1257 
1258 #ifdef HAVE_CUDA
1260  auto gpu_hash_table = copyCpuHashTableToGpu(
1261  hash_table, layout, entry_count, emitted_keys_count, device_id);
1262  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1263  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1264  } else {
1265 #else
1266  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1267 #endif
1268  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1269  hash_tables_for_device_[0] = hash_table;
1270 #ifdef HAVE_CUDA
1271  }
1272 #endif
1273  } else {
1274 #ifdef HAVE_CUDA
1275  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1276  columns_for_device.join_column_types,
1277  columns_for_device.join_buckets,
1278  layout,
1279  entry_count,
1280  emitted_keys_count,
1281  device_id);
1282  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1283  hash_tables_for_device_[device_id] = std::move(hash_table);
1284 #else
1285  UNREACHABLE();
1286 #endif
1287  }
1288 }
1289 
1290 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnCpu(
1291  const std::vector<JoinColumn>& join_columns,
1292  const std::vector<JoinColumnTypeInfo>& join_column_types,
1293  const std::vector<JoinBucketInfo>& join_bucket_info,
1294  const HashType layout,
1295  const size_t entry_count,
1296  const size_t emitted_keys_count,
1297  const bool skip_hashtable_caching) {
1298  auto timer = DEBUG_TIMER(__func__);
1299  decltype(std::chrono::steady_clock::now()) ts1, ts2;
1300  ts1 = std::chrono::steady_clock::now();
1301  CHECK(!join_columns.empty());
1302  CHECK(!join_bucket_info.empty());
1303  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1304  if (auto generic_hash_table =
1308  if (auto hash_table =
1309  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1310  VLOG(1) << "Using cached CPU hash table for initialization.";
1311  // See if a hash table of a different layout was returned.
1312  // If it was OneToMany, we can reuse it on ManyToMany.
1313  if (layout == HashType::ManyToMany &&
1314  hash_table->getLayout() == HashType::OneToMany) {
1315  // use the cached hash table
1317  return hash_table;
1318  }
1319  if (layout == hash_table->getLayout()) {
1320  return hash_table;
1321  }
1322  }
1323  }
1325  const auto key_component_count =
1326  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1327 
1328  const auto key_handler =
1329  OverlapsKeyHandler(key_component_count,
1330  &join_columns[0],
1331  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1334  dummy_str_proxy_translation_maps_ptrs_and_offsets;
1335  const auto err =
1336  builder.initHashTableOnCpu(&key_handler,
1338  join_columns,
1339  join_column_types,
1340  join_bucket_info,
1341  dummy_str_proxy_translation_maps_ptrs_and_offsets,
1342  entry_count,
1343  emitted_keys_count,
1344  layout,
1345  join_type_,
1348  ts2 = std::chrono::steady_clock::now();
1349  if (err) {
1350  throw HashJoinFail(
1351  std::string("Unrecognized error when initializing CPU overlaps hash table (") +
1352  std::to_string(err) + std::string(")"));
1353  }
1354  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1355  if (skip_hashtable_caching) {
1356  VLOG(1) << "Skip to cache overlaps join hashtable";
1357  } else {
1358  auto hashtable_build_time =
1359  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1362  hash_table,
1364  hashtable_build_time);
1365  }
1366  return hash_table;
1367 }
1368 
1369 #ifdef HAVE_CUDA
1370 
1371 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnGpu(
1372  const std::vector<JoinColumn>& join_columns,
1373  const std::vector<JoinColumnTypeInfo>& join_column_types,
1374  const std::vector<JoinBucketInfo>& join_bucket_info,
1375  const HashType layout,
1376  const size_t entry_count,
1377  const size_t emitted_keys_count,
1378  const size_t device_id) {
1380 
1381  VLOG(1) << "Building overlaps join hash table on GPU.";
1382 
1384  auto data_mgr = executor_->getDataMgr();
1385  CudaAllocator allocator(
1386  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1387  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1388  CHECK_EQ(join_columns.size(), 1u);
1389  CHECK(!join_bucket_info.empty());
1390  auto& inverse_bucket_sizes_for_dimension =
1391  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1392  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1393  inverse_bucket_sizes_for_dimension, allocator);
1394  const auto key_handler = OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1395  join_columns_gpu,
1396  inverse_bucket_sizes_gpu);
1397 
1398  const auto err = builder.initHashTableOnGpu(&key_handler,
1399  join_columns,
1400  layout,
1401  join_type_,
1404  entry_count,
1405  emitted_keys_count,
1406  device_id,
1407  executor_);
1408  if (err) {
1409  throw HashJoinFail(
1410  std::string("Unrecognized error when initializing GPU overlaps hash table (") +
1411  std::to_string(err) + std::string(")"));
1412  }
1413  return builder.getHashTable();
1414 }
1415 
1416 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::copyCpuHashTableToGpu(
1417  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
1418  const HashType layout,
1419  const size_t entry_count,
1420  const size_t emitted_keys_count,
1421  const size_t device_id) {
1423 
1424  auto data_mgr = executor_->getDataMgr();
1425 
1426  // copy hash table to GPU
1427  BaselineJoinHashTableBuilder gpu_builder;
1428  gpu_builder.allocateDeviceMemory(layout,
1431  entry_count,
1432  emitted_keys_count,
1433  device_id,
1434  executor_);
1435  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1436  CHECK(gpu_hash_table);
1437  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1438  CHECK(gpu_buffer_ptr);
1439 
1440  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1441  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1442  auto device_allocator = std::make_unique<CudaAllocator>(
1443  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1444  device_allocator->copyToDevice(
1445  gpu_buffer_ptr,
1446  cpu_hash_table->getCpuBuffer(),
1447  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1448  return gpu_hash_table;
1449 }
1450 
1451 #endif // HAVE_CUDA
1452 
1453 #define LL_CONTEXT executor_->cgen_state_->context_
1454 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1455 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1456 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1457 #define ROW_FUNC executor_->cgen_state_->row_func_
1458 
1460  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1461  const auto key_component_width = getKeyComponentWidth();
1462  CHECK(key_component_width == 4 || key_component_width == 8);
1463  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1464  llvm::Value* key_buff_lv{nullptr};
1465  switch (key_component_width) {
1466  case 4:
1467  key_buff_lv =
1468  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1469  break;
1470  case 8:
1471  key_buff_lv =
1472  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1473  break;
1474  default:
1475  CHECK(false);
1476  }
1477 
1478  const auto& inner_outer_pair = inner_outer_pairs_[0];
1479  const auto outer_geo = inner_outer_pair.second;
1480  const auto outer_geo_ti = outer_geo->get_type_info();
1481 
1482  llvm::Value* arr_ptr = nullptr;
1483  CodeGenerator code_generator(executor_);
1484  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1485 
1486  if (outer_geo_ti.is_geometry()) {
1487  // TODO(adb): for points we will use the coords array, but for other geometries we
1488  // will need to use the bounding box. For now only support points.
1489  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1490 
1491  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1492  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1493  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1494  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1495  outer_geo_col->get_table_id(), outer_geo_col->get_column_id() + 1);
1496  CHECK(coords_cd);
1497 
1498  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1499  "array_buff",
1500  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1501  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1502  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1503  << "Only TINYINT coordinates columns are supported in geo overlaps hash "
1504  "join.";
1505  arr_ptr = code_generator.castArrayPointer(array_ptr,
1506  coords_cd->columnType.get_elem_type());
1507  } else if (const auto outer_geo_function_operator =
1508  dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1509  // Process points dynamically constructed by geo function operators
1510  const auto outer_geo_function_operator_lvs =
1511  code_generator.codegen(outer_geo_function_operator, true, co);
1512  CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1513  arr_ptr = outer_geo_function_operator_lvs.front();
1514  } else if (const auto outer_geo_expr =
1515  dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1516  UNREACHABLE() << outer_geo_expr->toString();
1517  }
1518  } else if (outer_geo_ti.is_fixlen_array()) {
1519  // Process dynamically constructed points
1520  const auto outer_geo_cast_coord_array =
1521  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1522  CHECK_EQ(outer_geo_cast_coord_array->get_optype(), kCAST);
1523  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1524  outer_geo_cast_coord_array->get_operand());
1525  CHECK(outer_geo_coord_array);
1526  CHECK(outer_geo_coord_array->isLocalAlloc());
1527  CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1528  auto elem_size = (outer_geo_ti.get_compression() == kENCODING_GEOINT)
1529  ? sizeof(int32_t)
1530  : sizeof(double);
1531  CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1532  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1533  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1534  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1535  arr_ptr = LL_BUILDER.CreateGEP(
1536  array_ptr->getType()->getScalarType()->getPointerElementType(),
1537  array_ptr,
1538  LL_INT(0));
1539  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1540  }
1541  if (!arr_ptr) {
1542  LOG(FATAL) << "Overlaps key currently only supported for geospatial columns and "
1543  "constructed points.";
1544  }
1545 
1546  for (size_t i = 0; i < 2; i++) {
1547  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1548  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1549  key_buff_lv,
1550  LL_INT(i));
1551 
1552  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1553  // future compression schemes
1554  auto bucket_key =
1555  outer_geo_ti.get_compression() == kENCODING_GEOINT
1556  ? executor_->cgen_state_->emitExternalCall(
1557  "get_bucket_key_for_range_compressed",
1558  get_int_type(64, LL_CONTEXT),
1559  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1560  : executor_->cgen_state_->emitExternalCall(
1561  "get_bucket_key_for_range_double",
1562  get_int_type(64, LL_CONTEXT),
1563  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1564  const auto col_lv = LL_BUILDER.CreateSExt(
1565  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1566  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1567  }
1568  return key_buff_lv;
1569 }
1570 
1571 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
1572  const CompilationOptions& co) {
1573  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1574  const auto key_component_width = getKeyComponentWidth();
1575  CHECK(key_component_width == 4 || key_component_width == 8);
1576  auto hash_table = getHashTableForDevice(size_t(0));
1577  CHECK(hash_table);
1579 
1580  VLOG(1) << "Performing codgen for ManyToMany";
1581  const auto& inner_outer_pair = inner_outer_pairs_[0];
1582  const auto outer_col = inner_outer_pair.second;
1583 
1584  CodeGenerator code_generator(executor_);
1585  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1586  CHECK_EQ(col_lvs.size(), size_t(1));
1587 
1588  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1589  CHECK(outer_col_var);
1590  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1591  outer_col_var->get_table_id(), outer_col_var->get_column_id());
1592  CHECK(coords_cd);
1593 
1594  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1595  "array_buff",
1596  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1597  {col_lvs.front(), code_generator.posArg(outer_col)});
1598 
1599  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1600  // const auto arr_ptr =
1601  // code_generator.castArrayPointer(array_ptr,
1602  // coords_cd->columnType.get_elem_type());
1603  array_ptr->setName("array_ptr");
1604 
1605  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1606  "get_num_buckets_for_bounds",
1607  get_int_type(32, LL_CONTEXT),
1608  {array_ptr,
1609  LL_INT(0),
1610  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1611  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1612  num_keys_lv->setName("num_keys_lv");
1613 
1614  return {num_keys_lv, array_ptr};
1615 }
1616 
1618  const CompilationOptions& co,
1619  const size_t index) {
1620  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1621  if (getHashType() == HashType::ManyToMany) {
1622  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1623  const auto key_component_width = getKeyComponentWidth();
1624  CHECK(key_component_width == 4 || key_component_width == 8);
1625  auto many_to_many_args = codegenManyKey(co);
1626  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1627  const auto composite_dict_ptr_type =
1628  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1629  const auto composite_key_dict =
1630  hash_ptr->getType()->isPointerTy()
1631  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1632  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1633  const auto key_component_count = getKeyComponentCount();
1634 
1635  auto one_to_many_ptr = hash_ptr;
1636 
1637  if (one_to_many_ptr->getType()->isPointerTy()) {
1638  one_to_many_ptr =
1639  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1640  } else {
1641  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1642  }
1643 
1644  const auto composite_key_dict_size = offsetBufferOff();
1645  one_to_many_ptr =
1646  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1647 
1648  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1649  // this is likely the maximum value we can do that is safe to use across
1650  // all supported GPU architectures.
1651  const int max_array_size = 200;
1652  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1653  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1654  out_arr_lv->setName("out_arr");
1655 
1656  const auto casted_out_arr_lv =
1657  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1658 
1659  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1660 
1661  auto rowid_ptr_i32 =
1662  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1663 
1664  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1665  "get_candidate_rows",
1666  llvm::Type::getInt64Ty(LL_CONTEXT),
1667  {
1668  rowid_ptr_i32,
1669  LL_INT(max_array_size),
1670  many_to_many_args[1],
1671  LL_INT(0),
1674  many_to_many_args[0],
1675  LL_INT(key_component_count), // key_component_count
1676  composite_key_dict, // ptr to hash table
1677  LL_INT(getEntryCount()), // entry_count
1678  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1679  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1680  });
1681 
1682  const auto slot_lv = LL_INT(int64_t(0));
1683 
1684  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1685  } else {
1686  VLOG(1) << "Building codegenMatchingSet for Baseline";
1687  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1688  const auto key_component_width = getKeyComponentWidth();
1689  CHECK(key_component_width == 4 || key_component_width == 8);
1690  auto key_buff_lv = codegenKey(co);
1692  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1693  const auto composite_dict_ptr_type =
1694  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1695  const auto composite_key_dict =
1696  hash_ptr->getType()->isPointerTy()
1697  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1698  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1699  const auto key_component_count = getKeyComponentCount();
1700  const auto key = executor_->cgen_state_->emitExternalCall(
1701  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1702  get_int_type(64, LL_CONTEXT),
1703  {key_buff_lv,
1704  LL_INT(key_component_count),
1705  composite_key_dict,
1706  LL_INT(getEntryCount())});
1707  auto one_to_many_ptr = hash_ptr;
1708  if (one_to_many_ptr->getType()->isPointerTy()) {
1709  one_to_many_ptr =
1710  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1711  } else {
1712  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1713  }
1714  const auto composite_key_dict_size = offsetBufferOff();
1715  one_to_many_ptr =
1716  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1718  std::vector<llvm::Value*>{
1719  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1720  false,
1721  false,
1722  false,
1724  executor_);
1725  }
1726  UNREACHABLE();
1727  return HashJoinMatchingSet{};
1728 }
1729 
1731  const int device_id,
1732  bool raw) const {
1733  auto buffer = getJoinHashBuffer(device_type, device_id);
1734  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1735  auto hash_table = hash_tables_for_device_[device_id];
1736  CHECK(hash_table);
1737  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1738 #ifdef HAVE_CUDA
1739  std::unique_ptr<int8_t[]> buffer_copy;
1740  if (device_type == ExecutorDeviceType::GPU) {
1741  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1742  CHECK(executor_);
1743  auto data_mgr = executor_->getDataMgr();
1744  auto device_allocator = std::make_unique<CudaAllocator>(
1745  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1746 
1747  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1748  }
1749  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1750 #else
1751  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1752 #endif // HAVE_CUDA
1753  auto ptr2 = ptr1 + offsetBufferOff();
1754  auto ptr3 = ptr1 + countBufferOff();
1755  auto ptr4 = ptr1 + payloadBufferOff();
1756  CHECK(hash_table);
1757  const auto layout = getHashType();
1758  return HashTable::toString(
1759  "geo",
1760  getHashTypeString(layout),
1761  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1763  hash_table->getEntryCount(),
1764  ptr1,
1765  ptr2,
1766  ptr3,
1767  ptr4,
1768  buffer_size,
1769  raw);
1770 }
1771 
1772 std::set<DecodedJoinHashBufferEntry> OverlapsJoinHashTable::toSet(
1773  const ExecutorDeviceType device_type,
1774  const int device_id) const {
1775  auto buffer = getJoinHashBuffer(device_type, device_id);
1776  auto hash_table = getHashTableForDevice(device_id);
1777  CHECK(hash_table);
1778  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1779 #ifdef HAVE_CUDA
1780  std::unique_ptr<int8_t[]> buffer_copy;
1781  if (device_type == ExecutorDeviceType::GPU) {
1782  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1783  CHECK(executor_);
1784  auto data_mgr = executor_->getDataMgr();
1785  auto allocator = std::make_unique<CudaAllocator>(
1786  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1787 
1788  allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1789  }
1790  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1791 #else
1792  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1793 #endif // HAVE_CUDA
1794  auto ptr2 = ptr1 + offsetBufferOff();
1795  auto ptr3 = ptr1 + countBufferOff();
1796  auto ptr4 = ptr1 + payloadBufferOff();
1797  const auto layout = getHashType();
1798  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1800  hash_table->getEntryCount(),
1801  ptr1,
1802  ptr2,
1803  ptr3,
1804  ptr4,
1805  buffer_size);
1806 }
1807 
1809  const std::vector<InnerOuter>& inner_outer_pairs) const {
1812  this->executor_->getDataMgr()->gpusPresent() &&
1815  }
1816  // otherwise, try to build on CPU
1818 }
1819 
1821  try {
1823  } catch (...) {
1824  CHECK(false);
1825  }
1826  return 0;
1827 }
1828 
1830  QueryPlanHash key,
1831  CacheItemType item_type,
1832  DeviceIdentifier device_identifier) {
1833  auto timer = DEBUG_TIMER(__func__);
1834  VLOG(1) << "Checking CPU hash table cache.";
1836  HashtableCacheMetaInfo meta_info;
1838  auto cached_hashtable =
1839  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1840  if (cached_hashtable) {
1841  return cached_hashtable;
1842  }
1843  return nullptr;
1844 }
1845 
1846 std::optional<std::pair<size_t, size_t>>
1848  QueryPlanHash key,
1849  CacheItemType item_type,
1850  DeviceIdentifier device_identifier) {
1852  HashtableCacheMetaInfo metaInfo;
1854  auto cached_hashtable =
1855  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1856  if (cached_hashtable) {
1857  return std::make_pair(cached_hashtable->getEntryCount() / 2,
1858  cached_hashtable->getEmittedKeysCount());
1859  }
1860  return std::nullopt;
1861 }
1862 
1864  QueryPlanHash key,
1865  CacheItemType item_type,
1866  std::shared_ptr<HashTable> hashtable_ptr,
1867  DeviceIdentifier device_identifier,
1868  size_t hashtable_building_time) {
1870  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1871  HashtableCacheMetaInfo meta_info;
1873  meta_info.registered_query_hint = query_hint_;
1874  hash_table_cache_->putItemToCache(
1875  key,
1876  hashtable_ptr,
1877  item_type,
1878  device_identifier,
1879  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1880  hashtable_building_time,
1881  meta_info);
1882 }
1883 
1885  return condition_->get_optype() == kBW_EQ;
1886 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:446
llvm::Value * codegenKey(const CompilationOptions &)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
int getInnerTableId() const noexceptoverride
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:136
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:119
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count)
bool overlaps_allow_gpu_build
Definition: QueryHint.h:231
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:217
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:989
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const RegisteredQueryHint & getRegisteredQueryHint()
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:515
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:351
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:267
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
RegisteredQueryHint query_hint_
#define CHECK_GE(x, y)
Definition: Logger.h:236
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:375
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:235
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
const std::shared_ptr< Analyzer::BinOper > condition_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:101
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
future< Result > async(Fn &&fn, Args &&...args)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:38
ColumnCacheMap & column_cache_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
size_t payloadBufferOff() const noexceptoverride
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:288
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
TuningState(const size_t overlaps_max_table_size_bytes, const double overlaps_target_entries_per_bin)
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:106
HashTableBuildDagMap hashtable_build_dag_map_
#define LL_BUILDER
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:105
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::vector< QueryPlanHash > hashtable_cache_key_
virtual int getInnerTableId() const noexcept=0
#define AUTOMATIC_IR_METADATA(CGENSTATE)
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:269
#define VLOGGING(n)
Definition: Logger.h:221
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:233
std::optional< HashType > layout_override_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadId parent_thread_id)
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
#define CHECK_LE(x, y)
Definition: Logger.h:234
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:154
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:256
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:363
size_t QueryPlanHash
bool tuneOneStep(const TuningState::TuningDirection tuning_direction)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
HashtableCacheMetaInfo hashtable_cache_meta_info_
bool operator()(const HashTableProps &new_props, const bool new_overlaps_threshold)
ThreadId thread_id()
Definition: Logger.cpp:817
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
Definition: sqldefs.h:31
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:103
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
#define LL_CONTEXT
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:24
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
CompositeKeyInfo composite_key_info_
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
bool isBitwiseEq() const override
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:100
bool isAnyQueryHintDelivered() const
Definition: QueryHint.h:246
#define VLOG(n)
Definition: Logger.h:317
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:150
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:455