OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
29 
30 std::unique_ptr<HashtableRecycler> OverlapsJoinHashTable::hash_table_cache_ =
31  std::make_unique<HashtableRecycler>(CacheItemType::OVERLAPS_HT,
33 std::unique_ptr<OverlapsTuningParamRecycler> OverlapsJoinHashTable::auto_tuner_cache_ =
34  std::make_unique<OverlapsTuningParamRecycler>();
35 
37 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
38  const std::shared_ptr<Analyzer::BinOper> condition,
39  const std::vector<InputTableInfo>& query_infos,
40  const Data_Namespace::MemoryLevel memory_level,
41  const JoinType join_type,
42  const int device_count,
43  ColumnCacheMap& column_cache,
44  Executor* executor,
45  const HashTableBuildDagMap& hashtable_build_dag_map,
46  const RegisteredQueryHint& query_hints,
47  const TableIdToNodeMap& table_id_to_node_map) {
48  decltype(std::chrono::steady_clock::now()) ts1, ts2;
49 
50  std::vector<InnerOuter> inner_outer_pairs;
51 
52  if (const auto range_expr =
53  dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
54  return RangeJoinHashTable::getInstance(condition,
55  range_expr,
56  query_infos,
57  memory_level,
58  join_type,
59  device_count,
60  column_cache,
61  executor,
62  hashtable_build_dag_map,
63  query_hints,
64  table_id_to_node_map);
65  } else {
66  inner_outer_pairs =
67  HashJoin::normalizeColumnPairs(condition.get(), executor->getTemporaryTables())
68  .first;
69  }
70  CHECK(!inner_outer_pairs.empty());
71 
72  const auto getHashTableType =
73  [](const std::shared_ptr<Analyzer::BinOper> condition,
74  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
76  if (condition->is_overlaps_oper()) {
77  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
78  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
79  inner_outer_pairs[0].second->get_type_info().is_array() &&
80  // Bounds vs constructed points, former should yield ManyToMany
81  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
82  layout = HashType::ManyToMany;
83  }
84  }
85  return layout;
86  };
87 
88  const auto layout = getHashTableType(condition, inner_outer_pairs);
89 
90  if (VLOGGING(1)) {
91  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
92  << " for qual: " << condition->toString();
93  ts1 = std::chrono::steady_clock::now();
94  }
95 
96  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
97  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
98 
99  VLOG(1) << "table_key = " << query_infos[0].table_key << " has " << qi_0 << " tuples.";
100  VLOG(1) << "table_key = " << query_infos[1].table_key << " has " << qi_1 << " tuples.";
101 
102  const auto& query_info =
103  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
104  .info;
105  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
106  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
107  throw TooManyHashEntries();
108  }
109 
110  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
111  join_type,
112  query_infos,
113  memory_level,
114  column_cache,
115  executor,
116  inner_outer_pairs,
117  device_count,
118  query_hints,
119  hashtable_build_dag_map,
120  table_id_to_node_map);
121  try {
122  join_hash_table->reify(layout);
123  } catch (const HashJoinFail& e) {
124  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
125  "involved in overlaps join | ") +
126  e.what());
127  } catch (const ColumnarConversionNotSupported& e) {
128  throw HashJoinFail(std::string("Could not build hash tables for overlaps join | "
129  "Inner table too big. Attempt manual table reordering "
130  "or create a single fragment inner table. | ") +
131  e.what());
132  } catch (const JoinHashTableTooBig& e) {
133  throw e;
134  } catch (const std::exception& e) {
135  throw HashJoinFail(std::string("Failed to build hash tables for overlaps join | ") +
136  e.what());
137  }
138  if (VLOGGING(1)) {
139  ts2 = std::chrono::steady_clock::now();
140  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
141  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
142  << " ms";
143  }
144  return join_hash_table;
145 }
146 
147 namespace {
148 
150  const std::vector<double>& bucket_sizes,
151  const std::vector<double>& bucket_thresholds,
152  const double initial_value) {
153  std::vector<double> corrected_bucket_sizes(bucket_sizes);
154  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
155  if (bucket_sizes[i] == initial_value) {
156  corrected_bucket_sizes[i] = bucket_thresholds[i];
157  }
158  }
159  return corrected_bucket_sizes;
160 }
161 
162 std::vector<double> compute_bucket_sizes(
163  const std::vector<double>& bucket_thresholds,
164  const Data_Namespace::MemoryLevel effective_memory_level,
165  const JoinColumn& join_column,
166  const JoinColumnTypeInfo& join_column_type,
167  const std::vector<InnerOuter>& inner_outer_pairs,
168  const Executor* executor) {
169  // No coalesced keys for overlaps joins yet
170  CHECK_EQ(inner_outer_pairs.size(), 1u);
171 
172  const auto col = inner_outer_pairs[0].first;
173  CHECK(col);
174  const auto col_ti = col->get_type_info();
175  CHECK(col_ti.is_array());
176 
177  // TODO: Compute the number of dimensions for this overlaps key
178  const size_t num_dims{2};
179  const double initial_bin_value{0.0};
180  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
181  CHECK_EQ(bucket_thresholds.size(), num_dims);
182 
183  VLOG(1)
184  << "Computing x and y bucket sizes for overlaps hash join with maximum bucket size "
185  << std::to_string(bucket_thresholds[0]) << ", "
186  << std::to_string(bucket_thresholds[1]);
187 
188  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
189  const int thread_count = cpu_threads();
191  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
192  }
193 #ifdef HAVE_CUDA
194  else {
195  // Note that we compute the bucket sizes using only a single GPU
196  const int device_id = 0;
197  auto data_mgr = executor->getDataMgr();
198  CudaAllocator allocator(
199  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
200  auto device_bucket_sizes_gpu =
201  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
202  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
203  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
204  auto device_bucket_thresholds_gpu =
205  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
206 
207  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
208  join_column_gpu,
209  join_column_type_gpu,
210  device_bucket_thresholds_gpu);
211  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
212  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
213  bucket_sizes.size() * sizeof(double));
214  }
215 #endif
216  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
217  bucket_sizes, bucket_thresholds, initial_bin_value);
218 
219  VLOG(1) << "Computed x and y bucket sizes for overlaps hash join: ("
220  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
221 
222  return corrected_bucket_sizes;
223 }
224 
226  HashTableProps(const size_t entry_count,
227  const size_t emitted_keys_count,
228  const size_t hash_table_size,
229  const std::vector<double>& bucket_sizes)
230  : entry_count(entry_count)
231  , emitted_keys_count(emitted_keys_count)
232  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
233  : emitted_keys_count / (entry_count / 2.0))
234  , hash_table_size(hash_table_size)
235  , bucket_sizes(bucket_sizes) {}
236 
237  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
238 
239  size_t entry_count;
241  double keys_per_bin;
243  std::vector<double> bucket_sizes;
244 };
245 
246 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
247  os << " entry_count: " << props.entry_count << ", emitted_keys "
248  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
249  << ", keys per bin " << props.keys_per_bin;
250  return os;
251 }
252 
253 struct TuningState {
254  TuningState(const size_t overlaps_max_table_size_bytes,
255  const double overlaps_target_entries_per_bin)
256  : crt_props(HashTableProps::invalid())
257  , prev_props(HashTableProps::invalid())
258  , chosen_overlaps_threshold(-1)
259  , crt_step(0)
260  , crt_reverse_search_iteration(0)
261  , overlaps_max_table_size_bytes(overlaps_max_table_size_bytes)
262  , overlaps_target_entries_per_bin(overlaps_target_entries_per_bin) {}
263 
264  // current and previous props, allows for easy backtracking
267 
268  // value we are tuning for
270  enum class TuningDirection { SMALLER, LARGER };
271  TuningDirection tuning_direction{TuningDirection::SMALLER};
272 
273  // various constants / state
274  size_t crt_step; // 1 indexed
275  size_t crt_reverse_search_iteration; // 1 indexed
278  const size_t max_reverse_search_iterations{8};
279 
284  bool operator()(const HashTableProps& new_props, const bool new_overlaps_threshold) {
285  prev_props = crt_props;
286  crt_props = new_props;
287  crt_step++;
288 
289  if (hashTableTooBig() || keysPerBinIncreasing()) {
290  if (hashTableTooBig()) {
291  VLOG(1) << "Reached hash table size limit: " << overlaps_max_table_size_bytes
292  << " with " << crt_props.hash_table_size << " byte hash table, "
293  << crt_props.keys_per_bin << " keys per bin.";
294  } else if (keysPerBinIncreasing()) {
295  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
296  << crt_props.keys_per_bin;
297  CHECK(previousIterationValid());
298  }
299  if (previousIterationValid()) {
300  VLOG(1) << "Using previous threshold value " << chosen_overlaps_threshold;
301  crt_props = prev_props;
302  return false;
303  } else {
304  CHECK(hashTableTooBig());
305  crt_reverse_search_iteration++;
306  chosen_overlaps_threshold = new_overlaps_threshold;
307 
308  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
309  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
310  << ") of reverse tuning iterations. Aborting tuning";
311  // use the crt props, but don't bother trying to tune any farther
312  return false;
313  }
314 
315  if (crt_reverse_search_iteration > 1 &&
316  crt_props.hash_table_size == prev_props.hash_table_size) {
317  // hash table size is not changing, bail
318  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
319  << " bytes) and still above maximum allowed size ("
320  << overlaps_max_table_size_bytes << " bytes). Aborting tuning";
321  return false;
322  }
323 
324  // if the hash table is too big on the very first step, change direction towards
325  // larger bins to see if a slightly smaller hash table will fit
326  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
327  VLOG(1)
328  << "First iteration of overlaps tuning led to hash table size over "
329  "limit. Reversing search to try larger bin sizes (previous threshold: "
330  << chosen_overlaps_threshold << ")";
331  // Need to change direction of tuning to tune "up" towards larger bins
332  tuning_direction = TuningDirection::LARGER;
333  }
334  return true;
335  }
336  UNREACHABLE();
337  }
338 
339  chosen_overlaps_threshold = new_overlaps_threshold;
340 
341  if (keysPerBinUnderThreshold()) {
342  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
343  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
344  << overlaps_target_entries_per_bin << ". Terminating bucket size loop.";
345  return false;
346  }
347 
348  if (crt_reverse_search_iteration > 0) {
349  // We always take the first tuning iteration that succeeds when reversing
350  // direction, as if we're here we haven't had a successful iteration and we're
351  // "backtracking" our search by making bin sizes larger
352  VLOG(1) << "On reverse (larger tuning direction) search found workable "
353  << " hash table size of " << crt_props.hash_table_size
354  << " with keys per bin " << crt_props.keys_per_bin
355  << ". Terminating bucket size loop.";
356  return false;
357  }
358 
359  return true;
360  }
361 
362  bool hashTableTooBig() const {
363  return crt_props.hash_table_size > overlaps_max_table_size_bytes;
364  }
365 
366  bool keysPerBinIncreasing() const {
367  return crt_props.keys_per_bin > prev_props.keys_per_bin;
368  }
369 
370  bool previousIterationValid() const {
371  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
372  }
373 
375  return crt_props.keys_per_bin < overlaps_target_entries_per_bin;
376  }
377 };
378 
380  public:
381  BucketSizeTuner(const double bucket_threshold,
382  const double step,
383  const double min_threshold,
384  const Data_Namespace::MemoryLevel effective_memory_level,
385  const std::vector<ColumnsForDevice>& columns_per_device,
386  const std::vector<InnerOuter>& inner_outer_pairs,
387  const size_t table_tuple_count,
388  const Executor* executor)
389  : num_dims_(2) // Todo: allow varying number of dims
390  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
391  , step_(step)
392  , min_threshold_(min_threshold)
393  , effective_memory_level_(effective_memory_level)
394  , columns_per_device_(columns_per_device)
395  , inner_outer_pairs_(inner_outer_pairs)
396  , table_tuple_count_(table_tuple_count)
397  , executor_(executor) {
398  CHECK(!columns_per_device_.empty());
399  }
400 
401  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
402 
403  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
404  return tuneOneStep(tuning_direction, step_);
405  }
406 
407  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
408  const double step_overide) {
409  if (table_tuple_count_ == 0) {
410  return false;
411  }
412  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
413  return tuneSmallerOneStep(step_overide);
414  }
415  return tuneLargerOneStep(step_overide);
416  }
417 
418  auto getMinBucketSize() const {
419  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
420  }
421 
428  std::vector<double> getInverseBucketSizes() {
429  if (num_steps_ == 0) {
430  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
431  current_bucket_sizes_ = computeBucketSizes();
432  }
433  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
434  std::vector<double> inverse_bucket_sizes;
435  for (const auto s : current_bucket_sizes_) {
436  inverse_bucket_sizes.emplace_back(1.0 / s);
437  }
438  return inverse_bucket_sizes;
439  }
440 
441  private:
443  for (const auto& t : bucket_thresholds_) {
444  if (t < min_threshold_) {
445  return true;
446  }
447  }
448  return false;
449  }
450 
451  std::vector<double> computeBucketSizes() const {
452  if (table_tuple_count_ == 0) {
453  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
454  }
455  return compute_bucket_sizes(bucket_thresholds_,
456  effective_memory_level_,
457  columns_per_device_.front().join_columns[0],
458  columns_per_device_.front().join_column_types[0],
459  inner_outer_pairs_,
460  executor_);
461  }
462 
463  bool tuneSmallerOneStep(const double step_overide) {
464  if (!current_bucket_sizes_.empty()) {
465  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
466  bucket_thresholds_ = current_bucket_sizes_;
467  for (auto& t : bucket_thresholds_) {
468  t /= step_overide;
469  }
470  }
471  if (bucketThresholdsBelowMinThreshold()) {
472  VLOG(1) << "Aborting overlaps tuning as at least one bucket size is below min "
473  "threshold";
474  return false;
475  }
476  const auto next_bucket_sizes = computeBucketSizes();
477  if (next_bucket_sizes == current_bucket_sizes_) {
478  VLOG(1) << "Aborting overlaps tuning as bucket size is no longer changing.";
479  return false;
480  }
481 
482  current_bucket_sizes_ = next_bucket_sizes;
483  num_steps_++;
484  return true;
485  }
486 
487  bool tuneLargerOneStep(const double step_overide) {
488  if (!current_bucket_sizes_.empty()) {
489  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
490  bucket_thresholds_ = current_bucket_sizes_;
491  }
492  // If current_bucket_sizes was empty, we will start from our initial threshold
493  for (auto& t : bucket_thresholds_) {
494  t *= step_overide;
495  }
496  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
497  // written will pick the largest bin size below the threshold, meaning our bucket_size
498  // will never increase beyond the size of the largest polygon. This could mean that we
499  // can never make the bucket sizes large enough to get our hash table below the
500  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
501  // allows for optionally finding smallest extent above threshold, to mirror default
502  // behavior finding largest extent below threshold, and use former variant here
503  current_bucket_sizes_ = bucket_thresholds_;
504  num_steps_++;
505  return true;
506  }
507 
508  size_t num_dims_;
509  std::vector<double> bucket_thresholds_;
510  size_t num_steps_{0};
511  const double step_;
512  const double min_threshold_;
514  const std::vector<ColumnsForDevice>& columns_per_device_;
515  const std::vector<InnerOuter>& inner_outer_pairs_;
516  const size_t table_tuple_count_;
517  const Executor* executor_;
518 
519  std::vector<double> current_bucket_sizes_;
520 
521  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
522 };
523 
524 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
525  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed << "("
526  << tuner.bucket_thresholds_[0] << ", " << tuner.bucket_thresholds_[1] << ")"
527  << ", Step Size: " << std::fixed << tuner.step_ << ", Min: " << std::fixed
528  << tuner.min_threshold_;
529  return os;
530 }
531 
532 } // namespace
533 
535  auto timer = DEBUG_TIMER(__func__);
537  const auto& query_info =
539  .info;
540  auto [db_id, table_id] = HashJoin::getInnerTableId(inner_outer_pairs_);
541  VLOG(1) << "Reify with layout " << getHashTypeString(layout) << "for db_id: " << db_id
542  << ", table_id: " << table_id;
543  if (query_info.fragments.empty()) {
544  return;
545  }
546 
547  auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
548  std::optional<double> overlaps_threshold_override;
549  double overlaps_target_entries_per_bin = g_overlaps_target_entries_per_bin;
550  auto skip_hashtable_caching = false;
552  VLOG(1) << "Setting overlaps bucket threshold "
553  "\'overlaps_hashjoin_bucket_threshold\' via "
554  "query hint: "
556  overlaps_threshold_override = query_hints_.overlaps_bucket_threshold;
557  }
559  std::ostringstream oss;
560  oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
561  "query hint";
562  if (!overlaps_threshold_override.has_value()) {
563  oss << ": " << overlaps_max_table_size_bytes << " -> "
565  overlaps_max_table_size_bytes = query_hints_.overlaps_max_size;
566  } else {
567  oss << ", but is skipped since the query hint also changes the threshold "
568  "\'overlaps_hashjoin_bucket_threshold\'";
569  }
570  VLOG(1) << oss.str();
571  }
573  VLOG(1) << "User requests to skip caching overlaps join hashtable and its tuned "
574  "parameters for this query";
575  skip_hashtable_caching = true;
576  }
578  VLOG(1) << "User requests to change a threshold \'overlaps_keys_per_bin\' via query "
579  "hint: "
580  << overlaps_target_entries_per_bin << " -> "
582  overlaps_target_entries_per_bin = query_hints_.overlaps_keys_per_bin;
583  }
584 
585  auto data_mgr = executor_->getDataMgr();
586  // we prioritize CPU when building an overlaps join hashtable, but if we have GPU and
587  // user-given hint is given we selectively allow GPU to build it but even if we have GPU
588  // but user foces to set CPU as execution device type we should not allow to use GPU for
589  // building it
590  auto allow_gpu_hashtable_build =
593  if (allow_gpu_hashtable_build) {
594  if (data_mgr->gpusPresent() &&
596  VLOG(1) << "A user forces to build GPU hash table for this overlaps join operator";
597  } else {
598  allow_gpu_hashtable_build = false;
599  VLOG(1) << "A user forces to build GPU hash table for this overlaps join operator "
600  "but we "
601  "skip it since either GPU is not presented or CPU execution mode is set";
602  }
603  }
604 
605  std::vector<ColumnsForDevice> columns_per_device;
606  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
608  allow_gpu_hashtable_build) {
609  for (int device_id = 0; device_id < device_count_; ++device_id) {
610  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
611  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
612  }
613  }
614 
615  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
616  const auto shard_count = shardCount();
617  size_t total_num_tuples = 0;
618  for (int device_id = 0; device_id < device_count_; ++device_id) {
619  fragments_per_device.emplace_back(
620  shard_count
621  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
622  : query_info.fragments);
623  const size_t crt_num_tuples =
624  std::accumulate(fragments_per_device.back().begin(),
625  fragments_per_device.back().end(),
626  size_t(0),
627  [](const auto& sum, const auto& fragment) {
628  return sum + fragment.getNumTuples();
629  });
630  total_num_tuples += crt_num_tuples;
631  const auto columns_for_device =
632  fetchColumnsForDevice(fragments_per_device.back(),
633  device_id,
635  allow_gpu_hashtable_build
636  ? dev_buff_owners[device_id].get()
637  : nullptr);
638  columns_per_device.push_back(columns_for_device);
639  }
640 
641  // try to extract cache key for hash table and its relevant info
642  auto hashtable_access_path_info =
644  {},
645  condition_->get_optype(),
646  join_type_,
649  shard_count,
650  fragments_per_device,
651  executor_);
652  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
653  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
654  table_keys_ = hashtable_access_path_info.table_keys;
655 
656  auto get_inner_table_key = [this]() {
657  auto col_var = inner_outer_pairs_.front().first;
658  return col_var->getTableKey();
659  };
660 
661  if (table_keys_.empty()) {
662  const auto& table_key = get_inner_table_key();
665  }
666  CHECK(!table_keys_.empty());
667 
668  if (overlaps_threshold_override) {
669  // compute bucket sizes based on the user provided threshold
670  BucketSizeTuner tuner(/*initial_threshold=*/*overlaps_threshold_override,
671  /*step=*/1.0,
672  /*min_threshold=*/0.0,
674  columns_per_device,
676  total_num_tuples,
677  executor_);
678  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
679 
680  auto [entry_count, emitted_keys_count] =
681  computeHashTableCounts(shard_count,
682  inverse_bucket_sizes,
683  columns_per_device,
684  overlaps_max_table_size_bytes,
685  *overlaps_threshold_override);
686  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
687  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
688  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
689  // one
690  generateCacheKey(overlaps_max_table_size_bytes,
691  *overlaps_threshold_override,
692  inverse_bucket_sizes,
693  fragments_per_device,
694  device_count_);
695  reifyImpl(columns_per_device,
696  query_info,
697  layout,
698  shard_count,
699  entry_count,
700  emitted_keys_count,
701  skip_hashtable_caching,
702  overlaps_max_table_size_bytes,
703  *overlaps_threshold_override);
704  } else {
705  double overlaps_bucket_threshold = std::numeric_limits<double>::max();
706  generateCacheKey(overlaps_max_table_size_bytes,
707  overlaps_bucket_threshold,
708  {},
709  fragments_per_device,
710  device_count_);
711  std::vector<size_t> per_device_chunk_key;
712  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
713  get_inner_table_key().table_id > 0) {
714  for (int device_id = 0; device_id < device_count_; ++device_id) {
715  auto chunk_key_hash = boost::hash_value(composite_key_info_.cache_key_chunks);
716  boost::hash_combine(
717  chunk_key_hash,
718  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
719  per_device_chunk_key.push_back(chunk_key_hash);
722  columns_per_device.front().join_columns.front().num_elems,
723  chunk_key_hash,
724  condition_->get_optype(),
725  overlaps_max_table_size_bytes,
726  overlaps_bucket_threshold,
727  {}};
728  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
729  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
730  table_keys_);
731  }
732  }
733 
734  auto cached_bucket_threshold =
735  auto_tuner_cache_->getItemFromCache(hashtable_cache_key_.front(),
738  if (cached_bucket_threshold) {
739  overlaps_bucket_threshold = cached_bucket_threshold->bucket_threshold;
740  auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
742  overlaps_max_table_size_bytes, overlaps_bucket_threshold, inverse_bucket_sizes);
743  generateCacheKey(overlaps_max_table_size_bytes,
744  overlaps_bucket_threshold,
745  inverse_bucket_sizes,
746  fragments_per_device,
747  device_count_);
748 
749  if (auto hash_table =
750  hash_table_cache_->getItemFromCache(hashtable_cache_key_[device_count_],
753  std::nullopt)) {
754  // if we already have a built hash table, we can skip the scans required for
755  // computing bucket size and tuple count
756  // reset as the hash table sizes can vary a bit
757  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
758  CHECK(hash_table);
759 
760  VLOG(1) << "Using cached hash table bucket size";
761 
762  reifyImpl(columns_per_device,
763  query_info,
764  layout,
765  shard_count,
766  hash_table->getEntryCount(),
767  hash_table->getEmittedKeysCount(),
768  skip_hashtable_caching,
769  overlaps_max_table_size_bytes,
770  overlaps_bucket_threshold);
771  } else {
772  VLOG(1) << "Computing bucket size for cached bucket threshold";
773  // compute bucket size using our cached tuner value
774  BucketSizeTuner tuner(/*initial_threshold=*/overlaps_bucket_threshold,
775  /*step=*/1.0,
776  /*min_threshold=*/0.0,
778  columns_per_device,
780  total_num_tuples,
781  executor_);
782 
783  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
784 
785  auto [entry_count, emitted_keys_count] =
786  computeHashTableCounts(shard_count,
787  inverse_bucket_sizes,
788  columns_per_device,
789  overlaps_max_table_size_bytes,
790  overlaps_bucket_threshold);
791  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
792 
793  generateCacheKey(overlaps_max_table_size_bytes,
794  overlaps_bucket_threshold,
795  inverse_bucket_sizes,
796  fragments_per_device,
797  device_count_);
798 
799  reifyImpl(columns_per_device,
800  query_info,
801  layout,
802  shard_count,
803  entry_count,
804  emitted_keys_count,
805  skip_hashtable_caching,
806  overlaps_max_table_size_bytes,
807  overlaps_bucket_threshold);
808  }
809  } else {
810  // compute bucket size using the auto tuner
811  BucketSizeTuner tuner(
812  /*initial_threshold=*/overlaps_bucket_threshold,
813  /*step=*/2.0,
814  /*min_threshold=*/1e-7,
816  columns_per_device,
818  total_num_tuples,
819  executor_);
820 
821  VLOG(1) << "Running overlaps join size auto tune with parameters: " << tuner;
822 
823  // manages the tuning state machine
824  TuningState tuning_state(overlaps_max_table_size_bytes,
825  overlaps_target_entries_per_bin);
826  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
827  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
828 
829  const auto [crt_entry_count, crt_emitted_keys_count] =
830  computeHashTableCounts(shard_count,
831  inverse_bucket_sizes,
832  columns_per_device,
833  tuning_state.overlaps_max_table_size_bytes,
834  tuning_state.chosen_overlaps_threshold);
835  const size_t hash_table_size = calculateHashTableSize(
836  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
837  HashTableProps crt_props(crt_entry_count,
838  crt_emitted_keys_count,
839  hash_table_size,
840  inverse_bucket_sizes);
841  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
842 
843  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
845  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
846  if (!should_continue) {
847  break;
848  }
849  }
850 
851  const auto& crt_props = tuning_state.crt_props;
852  // sanity check that the hash table size has not changed. this is a fairly
853  // inexpensive check to ensure the above algorithm is consistent
854  const size_t hash_table_size =
856  crt_props.emitted_keys_count,
857  crt_props.entry_count);
858  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
859 
861  hash_table_size > overlaps_max_table_size_bytes) {
862  VLOG(1) << "Could not find suitable overlaps join parameters to create hash "
863  "table under max allowed size ("
864  << overlaps_max_table_size_bytes << ") bytes.";
865  throw OverlapsHashTableTooBig(overlaps_max_table_size_bytes);
866  }
867 
868  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
870  VLOG(1) << "Final bucket sizes: ";
871  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
872  VLOG(1) << "dim[" << dim
873  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
874  }
875  CHECK_GE(tuning_state.chosen_overlaps_threshold, double(0));
876  generateCacheKey(tuning_state.overlaps_max_table_size_bytes,
877  tuning_state.chosen_overlaps_threshold,
878  {},
879  fragments_per_device,
880  device_count_);
881  const auto candidate_auto_tuner_cache_key = hashtable_cache_key_.front();
882  if (skip_hashtable_caching) {
883  VLOG(1) << "Skip to add tuned parameters to auto tuner";
884  } else {
885  AutoTunerMetaInfo meta_info{tuning_state.overlaps_max_table_size_bytes,
886  tuning_state.chosen_overlaps_threshold,
888  auto_tuner_cache_->putItemToCache(candidate_auto_tuner_cache_key,
889  meta_info,
892  0,
893  0);
894  }
895  overlaps_bucket_threshold = tuning_state.chosen_overlaps_threshold;
896  reifyImpl(columns_per_device,
897  query_info,
898  layout,
899  shard_count,
900  crt_props.entry_count,
901  crt_props.emitted_keys_count,
902  skip_hashtable_caching,
903  overlaps_max_table_size_bytes,
904  overlaps_bucket_threshold);
905  }
906  }
907 }
908 
909 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
910  size_t emitted_keys_count,
911  size_t entry_count) const {
912  const auto key_component_width = getKeyComponentWidth();
913  const auto key_component_count = number_of_dimensions;
914  const auto entry_size = key_component_count * key_component_width;
915  const auto keys_for_all_rows = emitted_keys_count;
916  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
917  const size_t hash_table_size =
918  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
919  return hash_table_size;
920 }
921 
923  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
924  const int device_id,
925  DeviceAllocator* dev_buff_owner) {
926  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
927 
928  std::vector<JoinColumn> join_columns;
929  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
930  std::vector<JoinColumnTypeInfo> join_column_types;
931  std::vector<std::shared_ptr<void>> malloc_owner;
932  for (const auto& inner_outer_pair : inner_outer_pairs_) {
933  const auto inner_col = inner_outer_pair.first;
934  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
935  if (inner_cd && inner_cd->isVirtualCol) {
937  }
938  join_columns.emplace_back(fetchJoinColumn(inner_col,
939  fragments,
940  effective_memory_level,
941  device_id,
942  chunks_owner,
943  dev_buff_owner,
944  malloc_owner,
945  executor_,
946  &column_cache_));
947  const auto& ti = inner_col->get_type_info();
948  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
949  0,
950  0,
951  inline_int_null_value<int64_t>(),
952  false,
953  0,
955  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
956  }
957  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
958 }
959 
961  const size_t shard_count,
962  const std::vector<double>& inverse_bucket_sizes_for_dimension,
963  std::vector<ColumnsForDevice>& columns_per_device,
964  const size_t chosen_max_hashtable_size,
965  const double chosen_bucket_threshold) {
966  CHECK(!inverse_bucket_sizes_for_dimension.empty());
967  const auto [tuple_count, emitted_keys_count] =
968  approximateTupleCount(inverse_bucket_sizes_for_dimension,
969  columns_per_device,
970  chosen_max_hashtable_size,
971  chosen_bucket_threshold);
972  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
973 
974  return std::make_pair(
975  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
976  emitted_keys_count);
977 }
978 
980  const std::vector<double>& inverse_bucket_sizes_for_dimension,
981  std::vector<ColumnsForDevice>& columns_per_device,
982  const size_t chosen_max_hashtable_size,
983  const double chosen_bucket_threshold) {
984  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
985  CountDistinctDescriptor count_distinct_desc{
987  0,
988  11,
989  true,
990  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
993  1};
994  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
995 
996  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
997  if (columns_per_device.front().join_columns.front().num_elems == 0) {
998  return std::make_pair(0, 0);
999  }
1000 
1001  // TODO: state management in here should be revisited, but this should be safe enough
1002  // for now
1003  // re-compute bucket counts per device based on global bucket size
1004  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
1005  auto& columns_for_device = columns_per_device[device_id];
1006  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
1008  }
1009 
1010  // Number of keys must match dimension of buckets
1011  CHECK_EQ(columns_per_device.front().join_columns.size(),
1012  columns_per_device.front().join_buckets.size());
1013  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1014  // Note that this path assumes each device has the same hash table (for GPU hash
1015  // join w/ hash table built on CPU)
1016  const auto cached_count_info =
1020  if (cached_count_info) {
1021  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
1022  << ", emitted keys count: " << cached_count_info->second;
1023  return *cached_count_info;
1024  }
1025  int thread_count = cpu_threads();
1026  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
1027  auto hll_result = &hll_buffer_all_cpus[0];
1028 
1029  std::vector<int32_t> num_keys_for_row;
1030  // TODO(adb): support multi-column overlaps join
1031  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
1032 
1034  num_keys_for_row,
1035  count_distinct_desc.bitmap_sz_bits,
1036  padded_size_bytes,
1037  columns_per_device.front().join_columns,
1038  columns_per_device.front().join_column_types,
1039  columns_per_device.front().join_buckets,
1040  thread_count);
1041  for (int i = 1; i < thread_count; ++i) {
1042  hll_unify(hll_result,
1043  hll_result + i * padded_size_bytes,
1044  1 << count_distinct_desc.bitmap_sz_bits);
1045  }
1046  return std::make_pair(
1047  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1048  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
1049  }
1050 #ifdef HAVE_CUDA
1051  auto data_mgr = executor_->getDataMgr();
1052  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
1053  for (auto& host_hll_buffer : host_hll_buffers) {
1054  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
1055  }
1056  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
1057  std::vector<std::future<void>> approximate_distinct_device_threads;
1058  for (int device_id = 0; device_id < device_count_; ++device_id) {
1059  approximate_distinct_device_threads.emplace_back(std::async(
1061  [device_id,
1062  &columns_per_device,
1063  &count_distinct_desc,
1064  data_mgr,
1065  &host_hll_buffers,
1066  &emitted_keys_count_device_threads] {
1067  auto allocator = std::make_unique<CudaAllocator>(
1068  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1069  auto device_hll_buffer =
1070  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1071  data_mgr->getCudaMgr()->zeroDeviceMem(
1072  device_hll_buffer,
1073  count_distinct_desc.bitmapPaddedSizeBytes(),
1074  device_id,
1076  const auto& columns_for_device = columns_per_device[device_id];
1077  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
1078  columns_for_device.join_columns, *allocator);
1079 
1080  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1081  const auto& inverse_bucket_sizes_for_dimension =
1082  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1083  auto inverse_bucket_sizes_gpu = allocator->alloc(
1084  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1085  allocator->copyToDevice(
1086  inverse_bucket_sizes_gpu,
1087  inverse_bucket_sizes_for_dimension.data(),
1088  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1089  const size_t row_counts_buffer_sz =
1090  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
1091  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1092  data_mgr->getCudaMgr()->zeroDeviceMem(
1093  row_counts_buffer,
1094  row_counts_buffer_sz,
1095  device_id,
1097  const auto key_handler =
1098  OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1099  join_columns_gpu,
1100  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
1101  const auto key_handler_gpu =
1102  transfer_flat_object_to_gpu(key_handler, *allocator);
1104  reinterpret_cast<uint8_t*>(device_hll_buffer),
1105  count_distinct_desc.bitmap_sz_bits,
1106  reinterpret_cast<int32_t*>(row_counts_buffer),
1107  key_handler_gpu,
1108  columns_for_device.join_columns[0].num_elems);
1109 
1110  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1111  allocator->copyFromDevice(
1112  &host_emitted_keys_count,
1113  row_counts_buffer +
1114  (columns_per_device.front().join_columns[0].num_elems - 1) *
1115  sizeof(int32_t),
1116  sizeof(int32_t));
1117 
1118  auto& host_hll_buffer = host_hll_buffers[device_id];
1119  allocator->copyFromDevice(&host_hll_buffer[0],
1120  device_hll_buffer,
1121  count_distinct_desc.bitmapPaddedSizeBytes());
1122  }));
1123  }
1124  for (auto& child : approximate_distinct_device_threads) {
1125  child.get();
1126  }
1127  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1128  auto& result_hll_buffer = host_hll_buffers.front();
1129  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1130  for (int device_id = 1; device_id < device_count_; ++device_id) {
1131  auto& host_hll_buffer = host_hll_buffers[device_id];
1132  hll_unify(hll_result,
1133  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1134  1 << count_distinct_desc.bitmap_sz_bits);
1135  }
1136  const size_t emitted_keys_count =
1137  std::accumulate(emitted_keys_count_device_threads.begin(),
1138  emitted_keys_count_device_threads.end(),
1139  0);
1140  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1141  emitted_keys_count);
1142 #else
1143  UNREACHABLE();
1144  return {0, 0};
1145 #endif // HAVE_CUDA
1146 }
1147 
1149  const std::vector<double>& inverse_bucket_sizes,
1150  std::vector<ColumnsForDevice>& columns_per_device,
1151  const size_t device_count) {
1152  // set global bucket size
1153  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1154 
1155  // re-compute bucket counts per device based on global bucket size
1156  CHECK_EQ(columns_per_device.size(), static_cast<size_t>(device_count));
1157  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1158  auto& columns_for_device = columns_per_device[device_id];
1159  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1161  }
1162 }
1163 
1165  return 8;
1166 }
1167 
1171 }
1172 
1173 void OverlapsJoinHashTable::reify(const HashType preferred_layout) {
1174  auto timer = DEBUG_TIMER(__func__);
1175  CHECK_LT(0, device_count_);
1177 
1178  CHECK(condition_->is_overlaps_oper());
1179  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1180  HashType layout;
1181  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1182  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1183  // bounds array
1184  layout = HashType::ManyToMany;
1185  } else {
1186  layout = HashType::OneToMany;
1187  }
1188  try {
1189  reifyWithLayout(layout);
1190  return;
1191  } catch (const JoinHashTableTooBig& e) {
1192  throw e;
1193  } catch (const std::exception& e) {
1194  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
1195  << e.what();
1196  throw;
1197  }
1198 }
1199 
1200 void OverlapsJoinHashTable::reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
1201  const Fragmenter_Namespace::TableInfo& query_info,
1202  const HashType layout,
1203  const size_t shard_count,
1204  const size_t entry_count,
1205  const size_t emitted_keys_count,
1206  const bool skip_hashtable_caching,
1207  const size_t chosen_max_hashtable_size,
1208  const double chosen_bucket_threshold) {
1209  std::vector<std::future<void>> init_threads;
1210  chosen_overlaps_bucket_threshold_ = chosen_bucket_threshold;
1211  chosen_overlaps_max_table_size_bytes_ = chosen_max_hashtable_size;
1215 
1216  for (int device_id = 0; device_id < device_count_; ++device_id) {
1217  const auto fragments =
1218  shard_count
1219  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1220  : query_info.fragments;
1221  init_threads.push_back(std::async(std::launch::async,
1223  this,
1224  columns_per_device[device_id],
1225  layout,
1226  entry_count,
1227  emitted_keys_count,
1228  skip_hashtable_caching,
1229  device_id,
1231  }
1232  for (auto& init_thread : init_threads) {
1233  init_thread.wait();
1234  }
1235  for (auto& init_thread : init_threads) {
1236  init_thread.get();
1237  }
1238 }
1239 
1241  const ColumnsForDevice& columns_for_device,
1242  const HashType layout,
1243  const size_t entry_count,
1244  const size_t emitted_keys_count,
1245  const bool skip_hashtable_caching,
1246  const int device_id,
1247  const logger::ThreadLocalIds parent_thread_local_ids) {
1248  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1249  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
1250  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1252  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1253 
1254  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1255  VLOG(1) << "Building overlaps join hash table on CPU.";
1256  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1257  columns_for_device.join_column_types,
1258  columns_for_device.join_buckets,
1259  layout,
1260  entry_count,
1261  emitted_keys_count,
1262  skip_hashtable_caching);
1263  CHECK(hash_table);
1264 
1265 #ifdef HAVE_CUDA
1267  auto gpu_hash_table = copyCpuHashTableToGpu(
1268  hash_table, layout, entry_count, emitted_keys_count, device_id);
1269  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1270  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1271  } else {
1272 #else
1273  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1274 #endif
1275  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1276  hash_tables_for_device_[0] = hash_table;
1277 #ifdef HAVE_CUDA
1278  }
1279 #endif
1280  } else {
1281 #ifdef HAVE_CUDA
1282  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1283  columns_for_device.join_column_types,
1284  columns_for_device.join_buckets,
1285  layout,
1286  entry_count,
1287  emitted_keys_count,
1288  device_id);
1289  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1290  hash_tables_for_device_[device_id] = std::move(hash_table);
1291 #else
1292  UNREACHABLE();
1293 #endif
1294  }
1295 }
1296 
1297 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnCpu(
1298  const std::vector<JoinColumn>& join_columns,
1299  const std::vector<JoinColumnTypeInfo>& join_column_types,
1300  const std::vector<JoinBucketInfo>& join_bucket_info,
1301  const HashType layout,
1302  const size_t entry_count,
1303  const size_t emitted_keys_count,
1304  const bool skip_hashtable_caching) {
1305  auto timer = DEBUG_TIMER(__func__);
1306  decltype(std::chrono::steady_clock::now()) ts1, ts2;
1307  ts1 = std::chrono::steady_clock::now();
1308  CHECK(!join_columns.empty());
1309  CHECK(!join_bucket_info.empty());
1310  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1311  if (auto generic_hash_table =
1315  if (auto hash_table =
1316  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1317  VLOG(1) << "Using cached CPU hash table for initialization.";
1318  // See if a hash table of a different layout was returned.
1319  // If it was OneToMany, we can reuse it on ManyToMany.
1320  if (layout == HashType::ManyToMany &&
1321  hash_table->getLayout() == HashType::OneToMany) {
1322  // use the cached hash table
1324  return hash_table;
1325  }
1326  if (layout == hash_table->getLayout()) {
1327  return hash_table;
1328  }
1329  }
1330  }
1332  const auto key_component_count =
1333  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1334 
1335  const auto key_handler =
1336  OverlapsKeyHandler(key_component_count,
1337  &join_columns[0],
1338  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1341  dummy_str_proxy_translation_maps_ptrs_and_offsets;
1342  const auto err =
1343  builder.initHashTableOnCpu(&key_handler,
1345  join_columns,
1346  join_column_types,
1347  join_bucket_info,
1348  dummy_str_proxy_translation_maps_ptrs_and_offsets,
1349  entry_count,
1350  emitted_keys_count,
1351  layout,
1352  join_type_,
1355  query_hints_);
1356  ts2 = std::chrono::steady_clock::now();
1357  if (err) {
1358  throw HashJoinFail(
1359  std::string("Unrecognized error when initializing CPU overlaps hash table (") +
1360  std::to_string(err) + std::string(")"));
1361  }
1362  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1363  if (skip_hashtable_caching) {
1364  VLOG(1) << "Skip to cache overlaps join hashtable";
1365  } else {
1366  auto hashtable_build_time =
1367  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1370  hash_table,
1372  hashtable_build_time);
1373  }
1374  return hash_table;
1375 }
1376 
1377 #ifdef HAVE_CUDA
1378 
1379 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnGpu(
1380  const std::vector<JoinColumn>& join_columns,
1381  const std::vector<JoinColumnTypeInfo>& join_column_types,
1382  const std::vector<JoinBucketInfo>& join_bucket_info,
1383  const HashType layout,
1384  const size_t entry_count,
1385  const size_t emitted_keys_count,
1386  const size_t device_id) {
1388 
1389  VLOG(1) << "Building overlaps join hash table on GPU.";
1390 
1392  auto data_mgr = executor_->getDataMgr();
1393  CudaAllocator allocator(
1394  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1395  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1396  CHECK_EQ(join_columns.size(), 1u);
1397  CHECK(!join_bucket_info.empty());
1398  auto& inverse_bucket_sizes_for_dimension =
1399  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1400  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1401  inverse_bucket_sizes_for_dimension, allocator);
1402  const auto key_handler = OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1403  join_columns_gpu,
1404  inverse_bucket_sizes_gpu);
1405 
1406  const auto err = builder.initHashTableOnGpu(&key_handler,
1407  join_columns,
1408  layout,
1409  join_type_,
1412  entry_count,
1413  emitted_keys_count,
1414  device_id,
1415  executor_,
1416  query_hints_);
1417  if (err) {
1418  throw HashJoinFail(
1419  std::string("Unrecognized error when initializing GPU overlaps hash table (") +
1420  std::to_string(err) + std::string(")"));
1421  }
1422  return builder.getHashTable();
1423 }
1424 
1425 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::copyCpuHashTableToGpu(
1426  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
1427  const HashType layout,
1428  const size_t entry_count,
1429  const size_t emitted_keys_count,
1430  const size_t device_id) {
1432 
1433  auto data_mgr = executor_->getDataMgr();
1434 
1435  // copy hash table to GPU
1436  BaselineJoinHashTableBuilder gpu_builder;
1437  gpu_builder.allocateDeviceMemory(layout,
1440  entry_count,
1441  emitted_keys_count,
1442  device_id,
1443  executor_,
1444  query_hints_);
1445  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1446  CHECK(gpu_hash_table);
1447  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1448  CHECK(gpu_buffer_ptr);
1449 
1450  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1451  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1452  auto device_allocator = std::make_unique<CudaAllocator>(
1453  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1454  device_allocator->copyToDevice(
1455  gpu_buffer_ptr,
1456  cpu_hash_table->getCpuBuffer(),
1457  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1458  return gpu_hash_table;
1459 }
1460 
1461 #endif // HAVE_CUDA
1462 
1463 #define LL_CONTEXT executor_->cgen_state_->context_
1464 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1465 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1466 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1467 #define ROW_FUNC executor_->cgen_state_->row_func_
1468 
1470  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1471  const auto key_component_width = getKeyComponentWidth();
1472  CHECK(key_component_width == 4 || key_component_width == 8);
1473  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1474  llvm::Value* key_buff_lv{nullptr};
1475  switch (key_component_width) {
1476  case 4:
1477  key_buff_lv =
1478  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1479  break;
1480  case 8:
1481  key_buff_lv =
1482  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1483  break;
1484  default:
1485  CHECK(false);
1486  }
1487 
1488  const auto& inner_outer_pair = inner_outer_pairs_[0];
1489  const auto outer_geo = inner_outer_pair.second;
1490  const auto outer_geo_ti = outer_geo->get_type_info();
1491 
1492  llvm::Value* arr_ptr = nullptr;
1493  CodeGenerator code_generator(executor_);
1494  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1495 
1496  if (outer_geo_ti.is_geometry()) {
1497  // TODO(adb): for points we will use the coords array, but for other geometries we
1498  // will need to use the bounding box. For now only support points.
1499  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1500 
1501  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1502  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1503  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1504  auto column_key = outer_geo_col->getColumnKey();
1505  column_key.column_id = column_key.column_id + 1;
1506  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(column_key);
1507  CHECK(coords_cd);
1508 
1509  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1510  "array_buff",
1511  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1512  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1513  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1514  << "Only TINYINT coordinates columns are supported in geo overlaps hash "
1515  "join.";
1516  arr_ptr = code_generator.castArrayPointer(array_ptr,
1517  coords_cd->columnType.get_elem_type());
1518  } else if (const auto outer_geo_function_operator =
1519  dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1520  // Process points dynamically constructed by geo function operators
1521  const auto outer_geo_function_operator_lvs =
1522  code_generator.codegen(outer_geo_function_operator, true, co);
1523  CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1524  arr_ptr = outer_geo_function_operator_lvs.front();
1525  } else if (const auto outer_geo_expr =
1526  dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1527  UNREACHABLE() << outer_geo_expr->toString();
1528  }
1529  } else if (outer_geo_ti.is_fixlen_array()) {
1530  // Process dynamically constructed points
1531  const auto outer_geo_cast_coord_array =
1532  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1533  CHECK_EQ(outer_geo_cast_coord_array->get_optype(), kCAST);
1534  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1535  outer_geo_cast_coord_array->get_operand());
1536  CHECK(outer_geo_coord_array);
1537  CHECK(outer_geo_coord_array->isLocalAlloc());
1538  CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1539  auto elem_size = (outer_geo_ti.get_compression() == kENCODING_GEOINT)
1540  ? sizeof(int32_t)
1541  : sizeof(double);
1542  CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1543  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1544  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1545  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1546  arr_ptr = LL_BUILDER.CreateGEP(
1547  array_ptr->getType()->getScalarType()->getPointerElementType(),
1548  array_ptr,
1549  LL_INT(0));
1550  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1551  }
1552  if (!arr_ptr) {
1553  LOG(FATAL) << "Overlaps key currently only supported for geospatial columns and "
1554  "constructed points.";
1555  }
1556 
1557  for (size_t i = 0; i < 2; i++) {
1558  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1559  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1560  key_buff_lv,
1561  LL_INT(i));
1562 
1563  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1564  // future compression schemes
1565  auto bucket_key =
1566  outer_geo_ti.get_compression() == kENCODING_GEOINT
1567  ? executor_->cgen_state_->emitExternalCall(
1568  "get_bucket_key_for_range_compressed",
1569  get_int_type(64, LL_CONTEXT),
1570  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1571  : executor_->cgen_state_->emitExternalCall(
1572  "get_bucket_key_for_range_double",
1573  get_int_type(64, LL_CONTEXT),
1574  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1575  const auto col_lv = LL_BUILDER.CreateSExt(
1576  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1577  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1578  }
1579  return key_buff_lv;
1580 }
1581 
1582 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
1583  const CompilationOptions& co) {
1584  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1585  const auto key_component_width = getKeyComponentWidth();
1586  CHECK(key_component_width == 4 || key_component_width == 8);
1587  auto hash_table = getHashTableForDevice(size_t(0));
1588  CHECK(hash_table);
1590 
1591  VLOG(1) << "Performing codgen for ManyToMany";
1592  const auto& inner_outer_pair = inner_outer_pairs_[0];
1593  const auto outer_col = inner_outer_pair.second;
1594 
1595  CodeGenerator code_generator(executor_);
1596  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1597  CHECK_EQ(col_lvs.size(), size_t(1));
1598 
1599  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1600  CHECK(outer_col_var);
1601  const auto coords_cd =
1602  Catalog_Namespace::get_metadata_for_column(outer_col_var->getColumnKey());
1603  CHECK(coords_cd);
1604 
1605  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1606  "array_buff",
1607  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1608  {col_lvs.front(), code_generator.posArg(outer_col)});
1609 
1610  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1611  // const auto arr_ptr =
1612  // code_generator.castArrayPointer(array_ptr,
1613  // coords_cd->columnType.get_elem_type());
1614  array_ptr->setName("array_ptr");
1615 
1616  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1617  "get_num_buckets_for_bounds",
1618  get_int_type(32, LL_CONTEXT),
1619  {array_ptr,
1620  LL_INT(0),
1621  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1622  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1623  num_keys_lv->setName("num_keys_lv");
1624 
1625  return {num_keys_lv, array_ptr};
1626 }
1627 
1629  const CompilationOptions& co,
1630  const size_t index) {
1631  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1632  if (getHashType() == HashType::ManyToMany) {
1633  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1634  const auto key_component_width = getKeyComponentWidth();
1635  CHECK(key_component_width == 4 || key_component_width == 8);
1636  auto many_to_many_args = codegenManyKey(co);
1637  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1638  const auto composite_dict_ptr_type =
1639  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1640  const auto composite_key_dict =
1641  hash_ptr->getType()->isPointerTy()
1642  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1643  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1644  const auto key_component_count = getKeyComponentCount();
1645 
1646  auto one_to_many_ptr = hash_ptr;
1647 
1648  if (one_to_many_ptr->getType()->isPointerTy()) {
1649  one_to_many_ptr =
1650  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1651  } else {
1652  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1653  }
1654 
1655  const auto composite_key_dict_size = offsetBufferOff();
1656  one_to_many_ptr =
1657  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1658 
1659  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1660  // this is likely the maximum value we can do that is safe to use across
1661  // all supported GPU architectures.
1662  const int max_array_size = 200;
1663  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1664  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1665  out_arr_lv->setName("out_arr");
1666 
1667  const auto casted_out_arr_lv =
1668  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1669 
1670  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1671 
1672  auto rowid_ptr_i32 =
1673  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1674 
1675  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1676  "get_candidate_rows",
1677  llvm::Type::getInt64Ty(LL_CONTEXT),
1678  {
1679  rowid_ptr_i32,
1680  LL_INT(max_array_size),
1681  many_to_many_args[1],
1682  LL_INT(0),
1685  many_to_many_args[0],
1686  LL_INT(key_component_count), // key_component_count
1687  composite_key_dict, // ptr to hash table
1688  LL_INT(getEntryCount()), // entry_count
1689  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1690  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1691  });
1692 
1693  const auto slot_lv = LL_INT(int64_t(0));
1694 
1695  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1696  } else {
1697  VLOG(1) << "Building codegenMatchingSet for Baseline";
1698  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1699  const auto key_component_width = getKeyComponentWidth();
1700  CHECK(key_component_width == 4 || key_component_width == 8);
1701  auto key_buff_lv = codegenKey(co);
1703  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1704  const auto composite_dict_ptr_type =
1705  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1706  const auto composite_key_dict =
1707  hash_ptr->getType()->isPointerTy()
1708  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1709  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1710  const auto key_component_count = getKeyComponentCount();
1711  const auto key = executor_->cgen_state_->emitExternalCall(
1712  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1713  get_int_type(64, LL_CONTEXT),
1714  {key_buff_lv,
1715  LL_INT(key_component_count),
1716  composite_key_dict,
1717  LL_INT(getEntryCount())});
1718  auto one_to_many_ptr = hash_ptr;
1719  if (one_to_many_ptr->getType()->isPointerTy()) {
1720  one_to_many_ptr =
1721  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1722  } else {
1723  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1724  }
1725  const auto composite_key_dict_size = offsetBufferOff();
1726  one_to_many_ptr =
1727  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1729  std::vector<llvm::Value*>{
1730  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1731  false,
1732  false,
1733  false,
1735  executor_);
1736  }
1737  UNREACHABLE();
1738  return HashJoinMatchingSet{};
1739 }
1740 
1742  const int device_id,
1743  bool raw) const {
1744  auto buffer = getJoinHashBuffer(device_type, device_id);
1745  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1746  auto hash_table = hash_tables_for_device_[device_id];
1747  CHECK(hash_table);
1748  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1749 #ifdef HAVE_CUDA
1750  std::unique_ptr<int8_t[]> buffer_copy;
1751  if (device_type == ExecutorDeviceType::GPU) {
1752  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1753  CHECK(executor_);
1754  auto data_mgr = executor_->getDataMgr();
1755  auto device_allocator = std::make_unique<CudaAllocator>(
1756  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1757 
1758  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1759  }
1760  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1761 #else
1762  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1763 #endif // HAVE_CUDA
1764  auto ptr2 = ptr1 + offsetBufferOff();
1765  auto ptr3 = ptr1 + countBufferOff();
1766  auto ptr4 = ptr1 + payloadBufferOff();
1767  CHECK(hash_table);
1768  const auto layout = getHashType();
1769  return HashTable::toString(
1770  "geo",
1771  getHashTypeString(layout),
1772  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1774  hash_table->getEntryCount(),
1775  ptr1,
1776  ptr2,
1777  ptr3,
1778  ptr4,
1779  buffer_size,
1780  raw);
1781 }
1782 
1783 std::set<DecodedJoinHashBufferEntry> OverlapsJoinHashTable::toSet(
1784  const ExecutorDeviceType device_type,
1785  const int device_id) const {
1786  auto buffer = getJoinHashBuffer(device_type, device_id);
1787  auto hash_table = getHashTableForDevice(device_id);
1788  CHECK(hash_table);
1789  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1790 #ifdef HAVE_CUDA
1791  std::unique_ptr<int8_t[]> buffer_copy;
1792  if (device_type == ExecutorDeviceType::GPU) {
1793  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1794  CHECK(executor_);
1795  auto data_mgr = executor_->getDataMgr();
1796  auto allocator = std::make_unique<CudaAllocator>(
1797  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1798 
1799  allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1800  }
1801  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1802 #else
1803  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1804 #endif // HAVE_CUDA
1805  auto ptr2 = ptr1 + offsetBufferOff();
1806  auto ptr3 = ptr1 + countBufferOff();
1807  auto ptr4 = ptr1 + payloadBufferOff();
1808  const auto layout = getHashType();
1809  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1811  hash_table->getEntryCount(),
1812  ptr1,
1813  ptr2,
1814  ptr3,
1815  ptr4,
1816  buffer_size);
1817 }
1818 
1820  const std::vector<InnerOuter>& inner_outer_pairs) const {
1823  this->executor_->getDataMgr()->gpusPresent() &&
1826  }
1827  // otherwise, try to build on CPU
1829 }
1830 
1832  try {
1834  } catch (...) {
1835  CHECK(false);
1836  }
1837  return {};
1838 }
1839 
1841  QueryPlanHash key,
1842  CacheItemType item_type,
1843  DeviceIdentifier device_identifier) {
1844  auto timer = DEBUG_TIMER(__func__);
1845  VLOG(1) << "Checking CPU hash table cache.";
1847  HashtableCacheMetaInfo meta_info;
1849  auto cached_hashtable =
1850  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1851  if (cached_hashtable) {
1852  return cached_hashtable;
1853  }
1854  return nullptr;
1855 }
1856 
1857 std::optional<std::pair<size_t, size_t>>
1859  QueryPlanHash key,
1860  CacheItemType item_type,
1861  DeviceIdentifier device_identifier) {
1863  HashtableCacheMetaInfo metaInfo;
1865  auto cached_hashtable =
1866  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1867  if (cached_hashtable) {
1868  return std::make_pair(cached_hashtable->getEntryCount() / 2,
1869  cached_hashtable->getEmittedKeysCount());
1870  }
1871  return std::nullopt;
1872 }
1873 
1875  QueryPlanHash key,
1876  CacheItemType item_type,
1877  std::shared_ptr<HashTable> hashtable_ptr,
1878  DeviceIdentifier device_identifier,
1879  size_t hashtable_building_time) {
1881  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1882  HashtableCacheMetaInfo meta_info;
1884  meta_info.registered_query_hint = query_hints_;
1885  hash_table_cache_->putItemToCache(
1886  key,
1887  hashtable_ptr,
1888  item_type,
1889  device_identifier,
1890  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1891  hashtable_building_time,
1892  meta_info);
1893 }
1894 
1896  return condition_->get_optype() == kBW_EQ;
1897 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:452
llvm::Value * codegenKey(const CompilationOptions &)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:165
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
shared::TableKey getInnerTableId() const noexceptoverride
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:130
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
bool overlaps_allow_gpu_build
Definition: QueryHint.h:318
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:285
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:580
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:363
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:337
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
#define CHECK_GE(x, y)
Definition: Logger.h:306
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
Definition: sqldefs.h:48
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:416
double overlaps_keys_per_bin
Definition: QueryHint.h:320
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const RegisteredQueryHint &query_hint)
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
const std::shared_ptr< Analyzer::BinOper > condition_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:112
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
future< Result > async(Fn &&fn, Args &&...args)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:38
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:220
ColumnCacheMap & column_cache_
RegisteredQueryHint query_hints_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const std::vector< InputTableInfo > & query_infos_
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
size_t payloadBufferOff() const noexceptoverride
executor_(executor)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:300
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
TuningState(const size_t overlaps_max_table_size_bytes, const double overlaps_target_entries_per_bin)
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:106
HashTableBuildDagMap hashtable_build_dag_map_
#define LL_BUILDER
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:105
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::vector< QueryPlanHash > hashtable_cache_key_
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:281
virtual shared::TableKey getInnerTableId() const noexcept=0
#define VLOGGING(n)
Definition: Logger.h:289
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::optional< HashType > layout_override_
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
size_t overlaps_max_size
Definition: QueryHint.h:317
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:165
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:532
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:348
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t QueryPlanHash
bool tuneOneStep(const TuningState::TuningDirection tuning_direction)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
HashtableCacheMetaInfo hashtable_cache_meta_info_
bool operator()(const HashTableProps &new_props, const bool new_overlaps_threshold)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
Definition: sqldefs.h:30
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:996
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
ThreadId thread_id_
Definition: Logger.h:138
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:114
double overlaps_bucket_threshold
Definition: QueryHint.h:316
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
#define LL_CONTEXT
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
CompositeKeyInfo composite_key_info_
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
bool isBitwiseEq() const override
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:874
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:111
#define VLOG(n)
Definition: Logger.h:387
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:161
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:461