OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
29 
30 std::unique_ptr<HashtableRecycler> OverlapsJoinHashTable::hash_table_cache_ =
31  std::make_unique<HashtableRecycler>(CacheItemType::OVERLAPS_HT,
33 std::unique_ptr<OverlapsTuningParamRecycler> OverlapsJoinHashTable::auto_tuner_cache_ =
34  std::make_unique<OverlapsTuningParamRecycler>();
35 
37 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
38  const std::shared_ptr<Analyzer::BinOper> condition,
39  const std::vector<InputTableInfo>& query_infos,
40  const Data_Namespace::MemoryLevel memory_level,
41  const JoinType join_type,
42  const int device_count,
43  ColumnCacheMap& column_cache,
44  Executor* executor,
45  const HashTableBuildDagMap& hashtable_build_dag_map,
46  const RegisteredQueryHint& query_hint,
47  const TableIdToNodeMap& table_id_to_node_map) {
48  decltype(std::chrono::steady_clock::now()) ts1, ts2;
49 
50  std::vector<InnerOuter> inner_outer_pairs;
51 
52  if (const auto range_expr =
53  dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
54  return RangeJoinHashTable::getInstance(condition,
55  range_expr,
56  query_infos,
57  memory_level,
58  join_type,
59  device_count,
60  column_cache,
61  executor,
62  hashtable_build_dag_map,
63  query_hint,
64  table_id_to_node_map);
65  } else {
66  inner_outer_pairs = HashJoin::normalizeColumnPairs(
67  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
68  }
69  CHECK(!inner_outer_pairs.empty());
70 
71  const auto getHashTableType =
72  [](const std::shared_ptr<Analyzer::BinOper> condition,
73  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
75  if (condition->is_overlaps_oper()) {
76  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
77  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
78  inner_outer_pairs[0].second->get_type_info().is_array() &&
79  // Bounds vs constructed points, former should yield ManyToMany
80  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
81  layout = HashType::ManyToMany;
82  }
83  }
84  return layout;
85  };
86 
87  const auto layout = getHashTableType(condition, inner_outer_pairs);
88 
89  if (VLOGGING(1)) {
90  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
91  << " for qual: " << condition->toString();
92  ts1 = std::chrono::steady_clock::now();
93  }
94 
95  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
96  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
97 
98  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
99  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
100 
101  const auto& query_info =
102  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
103  .info;
104  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
105  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
106  throw TooManyHashEntries();
107  }
108 
109  auto hashtable_cache_key_string =
111  condition->get_optype(),
112  join_type,
113  hashtable_build_dag_map,
114  executor);
115 
116  auto join_hash_table =
117  std::make_shared<OverlapsJoinHashTable>(condition,
118  join_type,
119  query_infos,
120  memory_level,
121  column_cache,
122  executor,
123  inner_outer_pairs,
124  device_count,
125  hashtable_cache_key_string.first,
126  hashtable_cache_key_string.second,
127  table_id_to_node_map);
128  if (query_hint.isAnyQueryHintDelivered()) {
129  join_hash_table->registerQueryHint(query_hint);
130  }
131  try {
132  join_hash_table->reify(layout);
133  } catch (const HashJoinFail& e) {
134  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
135  "involved in overlaps join | ") +
136  e.what());
137  } catch (const ColumnarConversionNotSupported& e) {
138  throw HashJoinFail(std::string("Could not build hash tables for overlaps join | "
139  "Inner table too big. Attempt manual table reordering "
140  "or create a single fragment inner table. | ") +
141  e.what());
142  } catch (const std::exception& e) {
143  throw HashJoinFail(std::string("Failed to build hash tables for overlaps join | ") +
144  e.what());
145  }
146  if (VLOGGING(1)) {
147  ts2 = std::chrono::steady_clock::now();
148  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
149  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
150  << " ms";
151  }
152  return join_hash_table;
153 }
154 
155 namespace {
156 
158  const std::vector<double>& bucket_sizes,
159  const std::vector<double>& bucket_thresholds,
160  const double initial_value) {
161  std::vector<double> corrected_bucket_sizes(bucket_sizes);
162  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
163  if (bucket_sizes[i] == initial_value) {
164  corrected_bucket_sizes[i] = bucket_thresholds[i];
165  }
166  }
167  return corrected_bucket_sizes;
168 }
169 
170 std::vector<double> compute_bucket_sizes(
171  const std::vector<double>& bucket_thresholds,
172  const Data_Namespace::MemoryLevel effective_memory_level,
173  const JoinColumn& join_column,
174  const JoinColumnTypeInfo& join_column_type,
175  const std::vector<InnerOuter>& inner_outer_pairs,
176  const Executor* executor) {
177  // No coalesced keys for overlaps joins yet
178  CHECK_EQ(inner_outer_pairs.size(), 1u);
179 
180  const auto col = inner_outer_pairs[0].first;
181  CHECK(col);
182  const auto col_ti = col->get_type_info();
183  CHECK(col_ti.is_array());
184 
185  // TODO: Compute the number of dimensions for this overlaps key
186  const size_t num_dims{2};
187  const double initial_bin_value{0.0};
188  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
189  CHECK_EQ(bucket_thresholds.size(), num_dims);
190 
191  VLOG(1)
192  << "Computing x and y bucket sizes for overlaps hash join with maximum bucket size "
193  << std::to_string(bucket_thresholds[0]) << ", "
194  << std::to_string(bucket_thresholds[1]);
195 
196  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
197  const int thread_count = cpu_threads();
199  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
200  }
201 #ifdef HAVE_CUDA
202  else {
203  // Note that we compute the bucket sizes using only a single GPU
204  const int device_id = 0;
205  auto data_mgr = executor->getDataMgr();
206  CudaAllocator allocator(data_mgr, device_id);
207  auto device_bucket_sizes_gpu =
208  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
209  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
210  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
211  auto device_bucket_thresholds_gpu =
212  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
213 
214  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
215  join_column_gpu,
216  join_column_type_gpu,
217  device_bucket_thresholds_gpu);
218  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
219  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
220  bucket_sizes.size() * sizeof(double));
221  }
222 #endif
223  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
224  bucket_sizes, bucket_thresholds, initial_bin_value);
225 
226  VLOG(1) << "Computed x and y bucket sizes for overlaps hash join: ("
227  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
228 
229  return corrected_bucket_sizes;
230 }
231 
233  HashTableProps(const size_t entry_count,
234  const size_t emitted_keys_count,
235  const size_t hash_table_size,
236  const std::vector<double>& bucket_sizes)
237  : entry_count(entry_count)
238  , emitted_keys_count(emitted_keys_count)
239  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
240  : emitted_keys_count / (entry_count / 2.0))
241  , hash_table_size(hash_table_size)
242  , bucket_sizes(bucket_sizes) {}
243 
244  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
245 
246  size_t entry_count;
248  double keys_per_bin;
250  std::vector<double> bucket_sizes;
251 };
252 
253 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
254  os << " entry_count: " << props.entry_count << ", emitted_keys "
255  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
256  << ", keys per bin " << props.keys_per_bin;
257  return os;
258 }
259 
260 struct TuningState {
261  TuningState(const size_t overlaps_max_table_size_bytes,
262  const double overlaps_target_entries_per_bin)
263  : crt_props(HashTableProps::invalid())
264  , prev_props(HashTableProps::invalid())
265  , chosen_overlaps_threshold(-1)
266  , crt_step(0)
267  , crt_reverse_search_iteration(0)
268  , overlaps_max_table_size_bytes(overlaps_max_table_size_bytes)
269  , overlaps_target_entries_per_bin(overlaps_target_entries_per_bin) {}
270 
271  // current and previous props, allows for easy backtracking
274 
275  // value we are tuning for
277  enum class TuningDirection { SMALLER, LARGER };
278  TuningDirection tuning_direction{TuningDirection::SMALLER};
279 
280  // various constants / state
281  size_t crt_step; // 1 indexed
282  size_t crt_reverse_search_iteration; // 1 indexed
285  const size_t max_reverse_search_iterations{8};
286 
291  bool operator()(const HashTableProps& new_props, const bool new_overlaps_threshold) {
292  prev_props = crt_props;
293  crt_props = new_props;
294  crt_step++;
295 
296  if (hashTableTooBig() || keysPerBinIncreasing()) {
297  if (hashTableTooBig()) {
298  VLOG(1) << "Reached hash table size limit: " << overlaps_max_table_size_bytes
299  << " with " << crt_props.hash_table_size << " byte hash table, "
300  << crt_props.keys_per_bin << " keys per bin.";
301  } else if (keysPerBinIncreasing()) {
302  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
303  << crt_props.keys_per_bin;
304  CHECK(previousIterationValid());
305  }
306  if (previousIterationValid()) {
307  VLOG(1) << "Using previous threshold value " << chosen_overlaps_threshold;
308  crt_props = prev_props;
309  return false;
310  } else {
311  CHECK(hashTableTooBig());
312  crt_reverse_search_iteration++;
313  chosen_overlaps_threshold = new_overlaps_threshold;
314 
315  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
316  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
317  << ") of reverse tuning iterations. Aborting tuning";
318  // use the crt props, but don't bother trying to tune any farther
319  return false;
320  }
321 
322  if (crt_reverse_search_iteration > 1 &&
323  crt_props.hash_table_size == prev_props.hash_table_size) {
324  // hash table size is not changing, bail
325  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
326  << " bytes) and still above maximum allowed size ("
327  << overlaps_max_table_size_bytes << " bytes). Aborting tuning";
328  return false;
329  }
330 
331  // if the hash table is too big on the very first step, change direction towards
332  // larger bins to see if a slightly smaller hash table will fit
333  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
334  VLOG(1)
335  << "First iteration of overlaps tuning led to hash table size over "
336  "limit. Reversing search to try larger bin sizes (previous threshold: "
337  << chosen_overlaps_threshold << ")";
338  // Need to change direction of tuning to tune "up" towards larger bins
339  tuning_direction = TuningDirection::LARGER;
340  }
341  return true;
342  }
343  UNREACHABLE();
344  }
345 
346  chosen_overlaps_threshold = new_overlaps_threshold;
347 
348  if (keysPerBinUnderThreshold()) {
349  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
350  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
351  << overlaps_target_entries_per_bin << ". Terminating bucket size loop.";
352  return false;
353  }
354 
355  if (crt_reverse_search_iteration > 0) {
356  // We always take the first tuning iteration that succeeds when reversing
357  // direction, as if we're here we haven't had a successful iteration and we're
358  // "backtracking" our search by making bin sizes larger
359  VLOG(1) << "On reverse (larger tuning direction) search found workable "
360  << " hash table size of " << crt_props.hash_table_size
361  << " with keys per bin " << crt_props.keys_per_bin
362  << ". Terminating bucket size loop.";
363  return false;
364  }
365 
366  return true;
367  }
368 
369  bool hashTableTooBig() const {
370  return crt_props.hash_table_size > overlaps_max_table_size_bytes;
371  }
372 
373  bool keysPerBinIncreasing() const {
374  return crt_props.keys_per_bin > prev_props.keys_per_bin;
375  }
376 
377  bool previousIterationValid() const {
378  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
379  }
380 
382  return crt_props.keys_per_bin < overlaps_target_entries_per_bin;
383  }
384 };
385 
387  public:
388  BucketSizeTuner(const double bucket_threshold,
389  const double step,
390  const double min_threshold,
391  const Data_Namespace::MemoryLevel effective_memory_level,
392  const std::vector<ColumnsForDevice>& columns_per_device,
393  const std::vector<InnerOuter>& inner_outer_pairs,
394  const size_t table_tuple_count,
395  const Executor* executor)
396  : num_dims_(2) // Todo: allow varying number of dims
397  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
398  , step_(step)
399  , min_threshold_(min_threshold)
400  , effective_memory_level_(effective_memory_level)
401  , columns_per_device_(columns_per_device)
402  , inner_outer_pairs_(inner_outer_pairs)
403  , table_tuple_count_(table_tuple_count)
404  , executor_(executor) {
405  CHECK(!columns_per_device_.empty());
406  }
407 
408  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
409 
410  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
411  return tuneOneStep(tuning_direction, step_);
412  }
413 
414  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
415  const double step_overide) {
416  if (table_tuple_count_ == 0) {
417  return false;
418  }
419  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
420  return tuneSmallerOneStep(step_overide);
421  }
422  return tuneLargerOneStep(step_overide);
423  }
424 
425  auto getMinBucketSize() const {
426  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
427  }
428 
435  std::vector<double> getInverseBucketSizes() {
436  if (num_steps_ == 0) {
437  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
438  current_bucket_sizes_ = computeBucketSizes();
439  }
440  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
441  std::vector<double> inverse_bucket_sizes;
442  for (const auto s : current_bucket_sizes_) {
443  inverse_bucket_sizes.emplace_back(1.0 / s);
444  }
445  return inverse_bucket_sizes;
446  }
447 
448  private:
450  for (const auto& t : bucket_thresholds_) {
451  if (t < min_threshold_) {
452  return true;
453  }
454  }
455  return false;
456  }
457 
458  std::vector<double> computeBucketSizes() const {
459  if (table_tuple_count_ == 0) {
460  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
461  }
462  return compute_bucket_sizes(bucket_thresholds_,
463  effective_memory_level_,
464  columns_per_device_.front().join_columns[0],
465  columns_per_device_.front().join_column_types[0],
466  inner_outer_pairs_,
467  executor_);
468  }
469 
470  bool tuneSmallerOneStep(const double step_overide) {
471  if (!current_bucket_sizes_.empty()) {
472  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
473  bucket_thresholds_ = current_bucket_sizes_;
474  for (auto& t : bucket_thresholds_) {
475  t /= step_overide;
476  }
477  }
478  if (bucketThresholdsBelowMinThreshold()) {
479  VLOG(1) << "Aborting overlaps tuning as at least one bucket size is below min "
480  "threshold";
481  return false;
482  }
483  const auto next_bucket_sizes = computeBucketSizes();
484  if (next_bucket_sizes == current_bucket_sizes_) {
485  VLOG(1) << "Aborting overlaps tuning as bucket size is no longer changing.";
486  return false;
487  }
488 
489  current_bucket_sizes_ = next_bucket_sizes;
490  num_steps_++;
491  return true;
492  }
493 
494  bool tuneLargerOneStep(const double step_overide) {
495  if (!current_bucket_sizes_.empty()) {
496  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
497  bucket_thresholds_ = current_bucket_sizes_;
498  }
499  // If current_bucket_sizes was empty, we will start from our initial threshold
500  for (auto& t : bucket_thresholds_) {
501  t *= step_overide;
502  }
503  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
504  // written will pick the largest bin size below the threshold, meaning our bucket_size
505  // will never increase beyond the size of the largest polygon. This could mean that we
506  // can never make the bucket sizes large enough to get our hash table below the
507  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
508  // allows for optionally finding smallest extent above threshold, to mirror default
509  // behavior finding largest extent below threshold, and use former variant here
510  current_bucket_sizes_ = bucket_thresholds_;
511  num_steps_++;
512  return true;
513  }
514 
515  size_t num_dims_;
516  std::vector<double> bucket_thresholds_;
517  size_t num_steps_{0};
518  const double step_;
519  const double min_threshold_;
521  const std::vector<ColumnsForDevice>& columns_per_device_;
522  const std::vector<InnerOuter>& inner_outer_pairs_;
523  const size_t table_tuple_count_;
524  const Executor* executor_;
525 
526  std::vector<double> current_bucket_sizes_;
527 
528  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
529 };
530 
531 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
532  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed << "("
533  << tuner.bucket_thresholds_[0] << ", " << tuner.bucket_thresholds_[1] << ")"
534  << ", Step Size: " << std::fixed << tuner.step_ << ", Min: " << std::fixed
535  << tuner.min_threshold_;
536  return os;
537 }
538 
539 } // namespace
540 
542  auto timer = DEBUG_TIMER(__func__);
544  const auto& query_info =
546  .info;
547  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
548  << "for table_id: " << HashJoin::getInnerTableId(inner_outer_pairs_);
549  if (query_info.fragments.empty()) {
550  return;
551  }
552 
553  auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
554  std::optional<double> overlaps_threshold_override;
555  double overlaps_target_entries_per_bin = g_overlaps_target_entries_per_bin;
556  auto query_hint = getRegisteredQueryHint();
557  auto skip_hashtable_caching = false;
558  if (query_hint.isHintRegistered(QueryHint::kOverlapsBucketThreshold)) {
559  VLOG(1) << "Setting overlaps bucket threshold "
560  "\'overlaps_hashjoin_bucket_threshold\' via "
561  "query hint: "
562  << query_hint.overlaps_bucket_threshold;
563  overlaps_threshold_override = query_hint.overlaps_bucket_threshold;
564  }
565  if (query_hint.isHintRegistered(QueryHint::kOverlapsMaxSize)) {
566  std::ostringstream oss;
567  oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
568  "query hint";
569  if (!overlaps_threshold_override.has_value()) {
570  oss << ": " << overlaps_max_table_size_bytes << " -> "
571  << query_hint.overlaps_max_size;
572  overlaps_max_table_size_bytes = query_hint.overlaps_max_size;
573  } else {
574  oss << ", but is skipped since the query hint also changes the threshold "
575  "\'overlaps_hashjoin_bucket_threshold\'";
576  }
577  VLOG(1) << oss.str();
578  }
579  if (query_hint.isHintRegistered(QueryHint::kOverlapsNoCache)) {
580  VLOG(1) << "User requests to skip caching overlaps join hashtable and its tuned "
581  "parameters for this query";
582  skip_hashtable_caching = true;
583  }
584  if (query_hint.isHintRegistered(QueryHint::kOverlapsKeysPerBin)) {
585  VLOG(1) << "User requests to change a threshold \'overlaps_keys_per_bin\' via query "
586  "hint: "
587  << overlaps_target_entries_per_bin << " -> "
588  << query_hint.overlaps_keys_per_bin;
589  overlaps_target_entries_per_bin = query_hint.overlaps_keys_per_bin;
590  }
591 
592  std::vector<ColumnsForDevice> columns_per_device;
593  auto data_mgr = executor_->getDataMgr();
594  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
596  for (int device_id = 0; device_id < device_count_; ++device_id) {
597  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(data_mgr, device_id));
598  }
599  }
600  const auto shard_count = shardCount();
601  size_t total_num_tuples = 0;
602  for (int device_id = 0; device_id < device_count_; ++device_id) {
603  const auto fragments =
604  shard_count
605  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
606  : query_info.fragments;
607  const size_t crt_num_tuples =
608  std::accumulate(fragments.begin(),
609  fragments.end(),
610  size_t(0),
611  [](const auto& sum, const auto& fragment) {
612  return sum + fragment.getNumTuples();
613  });
614  total_num_tuples += crt_num_tuples;
615  const auto columns_for_device =
616  fetchColumnsForDevice(fragments,
617  device_id,
619  ? dev_buff_owners[device_id].get()
620  : nullptr);
621  columns_per_device.push_back(columns_for_device);
622  }
623 
624  if (overlaps_threshold_override) {
625  // compute bucket sizes based on the user provided threshold
626  BucketSizeTuner tuner(/*initial_threshold=*/*overlaps_threshold_override,
627  /*step=*/1.0,
628  /*min_threshold=*/0.0,
630  columns_per_device,
632  total_num_tuples,
633  executor_);
634  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
635 
636  auto [entry_count, emitted_keys_count] =
637  computeHashTableCounts(shard_count,
638  inverse_bucket_sizes,
639  columns_per_device,
640  overlaps_max_table_size_bytes,
641  *overlaps_threshold_override);
642  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
643  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
644  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
645  // one
646  generateCacheKey(overlaps_max_table_size_bytes, *overlaps_threshold_override);
647  reifyImpl(columns_per_device,
648  query_info,
649  layout,
650  shard_count,
651  entry_count,
652  emitted_keys_count,
653  skip_hashtable_caching,
654  overlaps_max_table_size_bytes,
655  *overlaps_threshold_override);
656  } else {
657  double overlaps_bucket_threshold = std::numeric_limits<double>::max();
658  generateCacheKey(overlaps_max_table_size_bytes, overlaps_bucket_threshold);
659  auto candidate_auto_tuner_cache_key = getCacheKey();
660  if ((query_plan_dag_.compare(EMPTY_QUERY_PLAN) == 0 ||
662  inner_outer_pairs_.front().first->get_table_id() > 0) {
665  columns_per_device.front().join_columns.front().num_elems,
667  condition_->get_optype(),
668  overlaps_max_table_size_bytes,
669  overlaps_bucket_threshold};
670  candidate_auto_tuner_cache_key = getAlternativeCacheKey(cache_key);
671  VLOG(2) << "Use alternative auto tuner cache key due to unavailable query plan dag "
672  "extraction";
673  }
674  auto cached_bucket_threshold =
675  auto_tuner_cache_->getItemFromCache(candidate_auto_tuner_cache_key,
678  if (cached_bucket_threshold) {
679  overlaps_bucket_threshold = cached_bucket_threshold->bucket_threshold;
680  auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
682  overlaps_max_table_size_bytes, overlaps_bucket_threshold, inverse_bucket_sizes);
683  generateCacheKey(overlaps_max_table_size_bytes, overlaps_bucket_threshold);
684  if ((query_plan_dag_.compare(EMPTY_QUERY_PLAN) == 0 ||
686  inner_outer_pairs_.front().first->get_table_id() > 0) {
689  columns_per_device.front().join_columns.front().num_elems,
691  condition_->get_optype(),
692  overlaps_max_table_size_bytes,
693  overlaps_bucket_threshold,
694  inverse_bucket_sizes};
696  VLOG(2) << "Use alternative hashtable cache key due to unavailable query plan "
697  "dag extraction";
698  }
699  if (auto hash_table =
700  hash_table_cache_->getItemFromCache(hashtable_cache_key_,
703  std::nullopt)) {
704  // if we already have a built hash table, we can skip the scans required for
705  // computing bucket size and tuple count
706  // reset as the hash table sizes can vary a bit
707  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
708  CHECK(hash_table);
709 
710  VLOG(1) << "Using cached hash table bucket size";
711 
712  reifyImpl(columns_per_device,
713  query_info,
714  layout,
715  shard_count,
716  hash_table->getEntryCount(),
717  hash_table->getEmittedKeysCount(),
718  skip_hashtable_caching,
719  overlaps_max_table_size_bytes,
720  overlaps_bucket_threshold);
721  } else {
722  VLOG(1) << "Computing bucket size for cached bucket threshold";
723  // compute bucket size using our cached tuner value
724  BucketSizeTuner tuner(/*initial_threshold=*/overlaps_bucket_threshold,
725  /*step=*/1.0,
726  /*min_threshold=*/0.0,
728  columns_per_device,
730  total_num_tuples,
731  executor_);
732 
733  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
734 
735  auto [entry_count, emitted_keys_count] =
736  computeHashTableCounts(shard_count,
737  inverse_bucket_sizes,
738  columns_per_device,
739  overlaps_max_table_size_bytes,
740  overlaps_bucket_threshold);
741  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
742 
743  reifyImpl(columns_per_device,
744  query_info,
745  layout,
746  shard_count,
747  entry_count,
748  emitted_keys_count,
749  skip_hashtable_caching,
750  overlaps_max_table_size_bytes,
751  overlaps_bucket_threshold);
752  }
753  } else {
754  // compute bucket size using the auto tuner
755  BucketSizeTuner tuner(
756  /*initial_threshold=*/overlaps_bucket_threshold,
757  /*step=*/2.0,
758  /*min_threshold=*/1e-7,
760  columns_per_device,
762  total_num_tuples,
763  executor_);
764 
765  VLOG(1) << "Running overlaps join size auto tune with parameters: " << tuner;
766 
767  // manages the tuning state machine
768  TuningState tuning_state(overlaps_max_table_size_bytes,
769  overlaps_target_entries_per_bin);
770  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
771  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
772 
773  const auto [crt_entry_count, crt_emitted_keys_count] =
774  computeHashTableCounts(shard_count,
775  inverse_bucket_sizes,
776  columns_per_device,
777  tuning_state.overlaps_max_table_size_bytes,
778  tuning_state.chosen_overlaps_threshold);
779  const size_t hash_table_size = calculateHashTableSize(
780  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
781  HashTableProps crt_props(crt_entry_count,
782  crt_emitted_keys_count,
783  hash_table_size,
784  inverse_bucket_sizes);
785  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
786 
787  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
789  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
790  if (!should_continue) {
791  break;
792  }
793  }
794 
795  const auto& crt_props = tuning_state.crt_props;
796  // sanity check that the hash table size has not changed. this is a fairly
797  // inexpensive check to ensure the above algorithm is consistent
798  const size_t hash_table_size =
800  crt_props.emitted_keys_count,
801  crt_props.entry_count);
802  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
803 
805  hash_table_size > overlaps_max_table_size_bytes) {
806  VLOG(1) << "Could not find suitable overlaps join parameters to create hash "
807  "table under max allowed size ("
808  << overlaps_max_table_size_bytes << ") bytes.";
809  throw OverlapsHashTableTooBig(overlaps_max_table_size_bytes);
810  }
811 
812  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
814  VLOG(1) << "Final bucket sizes: ";
815  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
816  VLOG(1) << "dim[" << dim
817  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
818  }
819  CHECK_GE(tuning_state.chosen_overlaps_threshold, double(0));
820  generateCacheKey(tuning_state.overlaps_max_table_size_bytes,
821  tuning_state.chosen_overlaps_threshold);
822  candidate_auto_tuner_cache_key = getCacheKey();
823  if (skip_hashtable_caching) {
824  VLOG(1) << "Skip to add tuned parameters to auto tuner";
825  } else {
826  AutoTunerMetaInfo meta_info{tuning_state.overlaps_max_table_size_bytes,
827  tuning_state.chosen_overlaps_threshold,
829  auto_tuner_cache_->putItemToCache(candidate_auto_tuner_cache_key,
830  meta_info,
833  0,
834  0);
835  }
836  overlaps_bucket_threshold = tuning_state.chosen_overlaps_threshold;
837  reifyImpl(columns_per_device,
838  query_info,
839  layout,
840  shard_count,
841  crt_props.entry_count,
842  crt_props.emitted_keys_count,
843  skip_hashtable_caching,
844  overlaps_max_table_size_bytes,
845  overlaps_bucket_threshold);
846  }
847  }
848 }
849 
850 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
851  size_t emitted_keys_count,
852  size_t entry_count) const {
853  const auto key_component_width = getKeyComponentWidth();
854  const auto key_component_count = number_of_dimensions;
855  const auto entry_size = key_component_count * key_component_width;
856  const auto keys_for_all_rows = emitted_keys_count;
857  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
858  const size_t hash_table_size =
859  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
860  return hash_table_size;
861 }
862 
864  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
865  const int device_id,
866  DeviceAllocator* dev_buff_owner) {
867  const auto& catalog = *executor_->getCatalog();
868  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
869 
870  std::vector<JoinColumn> join_columns;
871  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
872  std::vector<JoinColumnTypeInfo> join_column_types;
873  std::vector<std::shared_ptr<void>> malloc_owner;
874  for (const auto& inner_outer_pair : inner_outer_pairs_) {
875  const auto inner_col = inner_outer_pair.first;
876  const auto inner_cd = get_column_descriptor_maybe(
877  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
878  if (inner_cd && inner_cd->isVirtualCol) {
880  }
881  join_columns.emplace_back(fetchJoinColumn(inner_col,
882  fragments,
883  effective_memory_level,
884  device_id,
885  chunks_owner,
886  dev_buff_owner,
887  malloc_owner,
888  executor_,
889  &column_cache_));
890  const auto& ti = inner_col->get_type_info();
891  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
892  0,
893  0,
894  inline_int_null_value<int64_t>(),
895  false,
896  0,
898  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
899  }
900  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
901 }
902 
904  const size_t shard_count,
905  const std::vector<double>& inverse_bucket_sizes_for_dimension,
906  std::vector<ColumnsForDevice>& columns_per_device,
907  const size_t chosen_max_hashtable_size,
908  const double chosen_bucket_threshold) {
909  CHECK(!inverse_bucket_sizes_for_dimension.empty());
910  const auto [tuple_count, emitted_keys_count] =
911  approximateTupleCount(inverse_bucket_sizes_for_dimension,
912  columns_per_device,
913  chosen_max_hashtable_size,
914  chosen_bucket_threshold);
915  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
916 
917  return std::make_pair(
918  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
919  emitted_keys_count);
920 }
921 
923  const std::vector<double>& inverse_bucket_sizes_for_dimension,
924  std::vector<ColumnsForDevice>& columns_per_device,
925  const size_t chosen_max_hashtable_size,
926  const double chosen_bucket_threshold) {
927  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
928  CountDistinctDescriptor count_distinct_desc{
930  0,
931  11,
932  true,
933  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
936  1};
937  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
938 
939  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
940  if (columns_per_device.front().join_columns.front().num_elems == 0) {
941  return std::make_pair(0, 0);
942  }
943 
944  // TODO: state management in here should be revisited, but this should be safe enough
945  // for now
946  // re-compute bucket counts per device based on global bucket size
947  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
948  auto& columns_for_device = columns_per_device[device_id];
949  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
951  }
952 
953  // Number of keys must match dimension of buckets
954  CHECK_EQ(columns_per_device.front().join_columns.size(),
955  columns_per_device.front().join_buckets.size());
956  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
957  // Note that this path assumes each device has the same hash table (for GPU hash join
958  // w/ hash table built on CPU)
959  const auto cached_count_info =
963  if (cached_count_info) {
964  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
965  << ", emitted keys count: " << cached_count_info->second;
966  return *cached_count_info;
967  }
968  int thread_count = cpu_threads();
969  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
970  auto hll_result = &hll_buffer_all_cpus[0];
971 
972  std::vector<int32_t> num_keys_for_row;
973  // TODO(adb): support multi-column overlaps join
974  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
975 
977  num_keys_for_row,
978  count_distinct_desc.bitmap_sz_bits,
979  padded_size_bytes,
980  columns_per_device.front().join_columns,
981  columns_per_device.front().join_column_types,
982  columns_per_device.front().join_buckets,
983  thread_count);
984  for (int i = 1; i < thread_count; ++i) {
985  hll_unify(hll_result,
986  hll_result + i * padded_size_bytes,
987  1 << count_distinct_desc.bitmap_sz_bits);
988  }
989  return std::make_pair(
990  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
991  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
992  }
993 #ifdef HAVE_CUDA
994  auto data_mgr = executor_->getDataMgr();
995  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
996  for (auto& host_hll_buffer : host_hll_buffers) {
997  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
998  }
999  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
1000  std::vector<std::future<void>> approximate_distinct_device_threads;
1001  for (int device_id = 0; device_id < device_count_; ++device_id) {
1002  approximate_distinct_device_threads.emplace_back(std::async(
1004  [device_id,
1005  &columns_per_device,
1006  &count_distinct_desc,
1007  data_mgr,
1008  &host_hll_buffers,
1009  &emitted_keys_count_device_threads] {
1010  auto allocator = data_mgr->createGpuAllocator(device_id);
1011  auto device_hll_buffer =
1012  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1013  data_mgr->getCudaMgr()->zeroDeviceMem(
1014  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
1015  const auto& columns_for_device = columns_per_device[device_id];
1016  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
1017  columns_for_device.join_columns, *allocator);
1018 
1019  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1020  const auto& inverse_bucket_sizes_for_dimension =
1021  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1022  auto inverse_bucket_sizes_gpu = allocator->alloc(
1023  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1024  allocator->copyToDevice(
1025  inverse_bucket_sizes_gpu,
1026  inverse_bucket_sizes_for_dimension.data(),
1027  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1028  const size_t row_counts_buffer_sz =
1029  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
1030  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1031  data_mgr->getCudaMgr()->zeroDeviceMem(
1032  row_counts_buffer, row_counts_buffer_sz, device_id);
1033  const auto key_handler =
1034  OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1035  join_columns_gpu,
1036  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
1037  const auto key_handler_gpu =
1038  transfer_flat_object_to_gpu(key_handler, *allocator);
1040  reinterpret_cast<uint8_t*>(device_hll_buffer),
1041  count_distinct_desc.bitmap_sz_bits,
1042  reinterpret_cast<int32_t*>(row_counts_buffer),
1043  key_handler_gpu,
1044  columns_for_device.join_columns[0].num_elems);
1045 
1046  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1047  allocator->copyFromDevice(
1048  &host_emitted_keys_count,
1049  row_counts_buffer +
1050  (columns_per_device.front().join_columns[0].num_elems - 1) *
1051  sizeof(int32_t),
1052  sizeof(int32_t));
1053 
1054  auto& host_hll_buffer = host_hll_buffers[device_id];
1055  allocator->copyFromDevice(&host_hll_buffer[0],
1056  device_hll_buffer,
1057  count_distinct_desc.bitmapPaddedSizeBytes());
1058  }));
1059  }
1060  for (auto& child : approximate_distinct_device_threads) {
1061  child.get();
1062  }
1063  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1064  auto& result_hll_buffer = host_hll_buffers.front();
1065  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1066  for (int device_id = 1; device_id < device_count_; ++device_id) {
1067  auto& host_hll_buffer = host_hll_buffers[device_id];
1068  hll_unify(hll_result,
1069  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1070  1 << count_distinct_desc.bitmap_sz_bits);
1071  }
1072  const size_t emitted_keys_count =
1073  std::accumulate(emitted_keys_count_device_threads.begin(),
1074  emitted_keys_count_device_threads.end(),
1075  0);
1076  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1077  emitted_keys_count);
1078 #else
1079  UNREACHABLE();
1080  return {0, 0};
1081 #endif // HAVE_CUDA
1082 }
1083 
1085  const std::vector<double>& inverse_bucket_sizes,
1086  std::vector<ColumnsForDevice>& columns_per_device,
1087  const size_t device_count) {
1088  // set global bucket size
1089  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1090 
1091  // re-compute bucket counts per device based on global bucket size
1092  CHECK_EQ(columns_per_device.size(), size_t(device_count));
1093  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1094  auto& columns_for_device = columns_per_device[device_id];
1095  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1097  }
1098 }
1099 
1101  return 8;
1102 }
1103 
1107 }
1108 
1109 void OverlapsJoinHashTable::reify(const HashType preferred_layout) {
1110  auto timer = DEBUG_TIMER(__func__);
1111  CHECK_LT(0, device_count_);
1113 
1114  CHECK(condition_->is_overlaps_oper());
1115  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1116  HashType layout;
1117  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1118  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1119  // bounds array
1120  layout = HashType::ManyToMany;
1121  } else {
1122  layout = HashType::OneToMany;
1123  }
1124  try {
1125  reifyWithLayout(layout);
1126  return;
1127  } catch (const std::exception& e) {
1128  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
1129  << e.what();
1130  throw;
1131  }
1132 }
1133 
1134 void OverlapsJoinHashTable::reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
1135  const Fragmenter_Namespace::TableInfo& query_info,
1136  const HashType layout,
1137  const size_t shard_count,
1138  const size_t entry_count,
1139  const size_t emitted_keys_count,
1140  const bool skip_hashtable_caching,
1141  const size_t chosen_max_hashtable_size,
1142  const double chosen_bucket_threshold) {
1143  std::vector<std::future<void>> init_threads;
1144  chosen_overlaps_bucket_threshold_ = chosen_bucket_threshold;
1145  chosen_overlaps_max_table_size_bytes_ = chosen_max_hashtable_size;
1149  if ((query_plan_dag_.compare(EMPTY_QUERY_PLAN) == 0 ||
1151  inner_outer_pairs_.front().first->get_table_id() > 0) {
1152  // sometimes we cannot retrieve query plan dag, so try to recycler cache
1153  // with the old-passioned cache key if we deal with hashtable of non-temporary table
1156  columns_per_device.front().join_columns.front().num_elems,
1158  condition_->get_optype(),
1163  VLOG(2) << "Use alternative hashtable cache key due to unavailable query plan dag "
1164  "extraction";
1165  }
1166  for (int device_id = 0; device_id < device_count_; ++device_id) {
1167  const auto fragments =
1168  shard_count
1169  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1170  : query_info.fragments;
1171  init_threads.push_back(std::async(std::launch::async,
1173  this,
1174  columns_per_device[device_id],
1175  layout,
1176  entry_count,
1177  emitted_keys_count,
1178  skip_hashtable_caching,
1179  device_id,
1180  logger::thread_id()));
1181  }
1182  for (auto& init_thread : init_threads) {
1183  init_thread.wait();
1184  }
1185  for (auto& init_thread : init_threads) {
1186  init_thread.get();
1187  }
1188 }
1189 
1191  const HashType layout,
1192  const size_t entry_count,
1193  const size_t emitted_keys_count,
1194  const bool skip_hashtable_caching,
1195  const int device_id,
1196  const logger::ThreadId parent_thread_id) {
1197  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
1198  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1200  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1201 
1202  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1203  VLOG(1) << "Building overlaps join hash table on CPU.";
1204  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1205  columns_for_device.join_column_types,
1206  columns_for_device.join_buckets,
1207  layout,
1208  entry_count,
1209  emitted_keys_count,
1210  skip_hashtable_caching);
1211  CHECK(hash_table);
1212 
1213 #ifdef HAVE_CUDA
1215  auto gpu_hash_table = copyCpuHashTableToGpu(
1216  std::move(hash_table), layout, entry_count, emitted_keys_count, device_id);
1217  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
1218  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1219  } else {
1220 #else
1221  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1222 #endif
1223  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1224  hash_tables_for_device_[0] = std::move(hash_table);
1225 #ifdef HAVE_CUDA
1226  }
1227 #endif
1228  } else {
1229 #ifdef HAVE_CUDA
1230  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1231  columns_for_device.join_column_types,
1232  columns_for_device.join_buckets,
1233  layout,
1234  entry_count,
1235  emitted_keys_count,
1236  device_id);
1237  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
1238  hash_tables_for_device_[device_id] = std::move(hash_table);
1239 #else
1240  UNREACHABLE();
1241 #endif
1242  }
1243 }
1244 
1245 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnCpu(
1246  const std::vector<JoinColumn>& join_columns,
1247  const std::vector<JoinColumnTypeInfo>& join_column_types,
1248  const std::vector<JoinBucketInfo>& join_bucket_info,
1249  const HashType layout,
1250  const size_t entry_count,
1251  const size_t emitted_keys_count,
1252  const bool skip_hashtable_caching) {
1253  auto timer = DEBUG_TIMER(__func__);
1254  decltype(std::chrono::steady_clock::now()) ts1, ts2;
1255  ts1 = std::chrono::steady_clock::now();
1256  CHECK(!join_columns.empty());
1257  CHECK(!join_bucket_info.empty());
1258  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1259  if (auto generic_hash_table =
1263  if (auto hash_table =
1264  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1265  VLOG(1) << "Using cached CPU hash table for initialization.";
1266  // See if a hash table of a different layout was returned.
1267  // If it was OneToMany, we can reuse it on ManyToMany.
1268  if (layout == HashType::ManyToMany &&
1269  hash_table->getLayout() == HashType::OneToMany) {
1270  // use the cached hash table
1272  return hash_table;
1273  }
1274  if (layout == hash_table->getLayout()) {
1275  return hash_table;
1276  }
1277  }
1278  }
1280  const auto key_component_count =
1281  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1282 
1283  const auto key_handler =
1284  OverlapsKeyHandler(key_component_count,
1285  &join_columns[0],
1286  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1288  const auto err = builder.initHashTableOnCpu(&key_handler,
1290  join_columns,
1291  join_column_types,
1292  join_bucket_info,
1293  entry_count,
1294  emitted_keys_count,
1295  layout,
1296  join_type_,
1299  ts2 = std::chrono::steady_clock::now();
1300  if (err) {
1301  throw HashJoinFail(
1302  std::string("Unrecognized error when initializing CPU overlaps hash table (") +
1303  std::to_string(err) + std::string(")"));
1304  }
1305  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1306  if (skip_hashtable_caching) {
1307  VLOG(1) << "Skip to cache overlaps join hashtable";
1308  } else {
1309  auto hashtable_build_time =
1310  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1313  hash_table,
1315  hashtable_build_time);
1316  }
1317  return hash_table;
1318 }
1319 
1320 #ifdef HAVE_CUDA
1321 
1322 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnGpu(
1323  const std::vector<JoinColumn>& join_columns,
1324  const std::vector<JoinColumnTypeInfo>& join_column_types,
1325  const std::vector<JoinBucketInfo>& join_bucket_info,
1326  const HashType layout,
1327  const size_t entry_count,
1328  const size_t emitted_keys_count,
1329  const size_t device_id) {
1331 
1332  VLOG(1) << "Building overlaps join hash table on GPU.";
1333 
1335  auto data_mgr = executor_->getDataMgr();
1336  CudaAllocator allocator(data_mgr, device_id);
1337  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1338  CHECK_EQ(join_columns.size(), 1u);
1339  CHECK(!join_bucket_info.empty());
1340  auto& inverse_bucket_sizes_for_dimension =
1341  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1342  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1343  inverse_bucket_sizes_for_dimension, allocator);
1344  const auto key_handler = OverlapsKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1345  join_columns_gpu,
1346  inverse_bucket_sizes_gpu);
1347 
1348  const auto err = builder.initHashTableOnGpu(&key_handler,
1349  join_columns,
1350  layout,
1351  join_type_,
1354  entry_count,
1355  emitted_keys_count,
1356  device_id,
1357  executor_);
1358  if (err) {
1359  throw HashJoinFail(
1360  std::string("Unrecognized error when initializing GPU overlaps hash table (") +
1361  std::to_string(err) + std::string(")"));
1362  }
1363  return builder.getHashTable();
1364 }
1365 
1366 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::copyCpuHashTableToGpu(
1367  std::shared_ptr<BaselineHashTable>&& cpu_hash_table,
1368  const HashType layout,
1369  const size_t entry_count,
1370  const size_t emitted_keys_count,
1371  const size_t device_id) {
1373 
1374  auto data_mgr = executor_->getDataMgr();
1375 
1376  // copy hash table to GPU
1377  BaselineJoinHashTableBuilder gpu_builder;
1378  gpu_builder.allocateDeviceMemory(layout,
1381  entry_count,
1382  emitted_keys_count,
1383  device_id,
1384  executor_);
1385  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1386  CHECK(gpu_hash_table);
1387  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1388  CHECK(gpu_buffer_ptr);
1389 
1390  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1391  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1392  auto device_allocator = data_mgr->createGpuAllocator(device_id);
1393  device_allocator->copyToDevice(
1394  gpu_buffer_ptr,
1395  cpu_hash_table->getCpuBuffer(),
1396  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1397  return gpu_hash_table;
1398 }
1399 
1400 #endif // HAVE_CUDA
1401 
1402 #define LL_CONTEXT executor_->cgen_state_->context_
1403 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1404 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1405 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1406 #define ROW_FUNC executor_->cgen_state_->row_func_
1407 
1409  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1410  const auto key_component_width = getKeyComponentWidth();
1411  CHECK(key_component_width == 4 || key_component_width == 8);
1412  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1413  llvm::Value* key_buff_lv{nullptr};
1414  switch (key_component_width) {
1415  case 4:
1416  key_buff_lv =
1417  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1418  break;
1419  case 8:
1420  key_buff_lv =
1421  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1422  break;
1423  default:
1424  CHECK(false);
1425  }
1426 
1427  const auto& inner_outer_pair = inner_outer_pairs_[0];
1428  const auto outer_geo = inner_outer_pair.second;
1429  const auto outer_geo_ti = outer_geo->get_type_info();
1430 
1431  llvm::Value* arr_ptr = nullptr;
1432  CodeGenerator code_generator(executor_);
1433  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1434 
1435  if (outer_geo_ti.is_geometry()) {
1436  // TODO(adb): for points we will use the coords array, but for other geometries we
1437  // will need to use the bounding box. For now only support points.
1438  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1439 
1440  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1441  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1442  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1443  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1444  outer_geo_col->get_table_id(), outer_geo_col->get_column_id() + 1);
1445  CHECK(coords_cd);
1446 
1447  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1448  "array_buff",
1449  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1450  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1451  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1452  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
1453  arr_ptr = code_generator.castArrayPointer(array_ptr,
1454  coords_cd->columnType.get_elem_type());
1455  } else if (const auto outer_geo_function_operator =
1456  dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1457  // Process points dynamically constructed by geo function operators
1458  const auto outer_geo_function_operator_lvs =
1459  code_generator.codegen(outer_geo_function_operator, true, co);
1460  CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1461  arr_ptr = outer_geo_function_operator_lvs.front();
1462  } else if (const auto outer_geo_expr =
1463  dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1464  UNREACHABLE() << outer_geo_expr->toString();
1465  }
1466  } else if (outer_geo_ti.is_fixlen_array()) {
1467  // Process dynamically constructed points
1468  const auto outer_geo_cast_coord_array =
1469  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1470  CHECK_EQ(outer_geo_cast_coord_array->get_optype(), kCAST);
1471  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1472  outer_geo_cast_coord_array->get_operand());
1473  CHECK(outer_geo_coord_array);
1474  CHECK(outer_geo_coord_array->isLocalAlloc());
1475  CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1476  auto elem_size = (outer_geo_ti.get_compression() == kENCODING_GEOINT)
1477  ? sizeof(int32_t)
1478  : sizeof(double);
1479  CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1480  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1481  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1482  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1483  arr_ptr = LL_BUILDER.CreateGEP(array_ptr, LL_INT(0));
1484  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1485  }
1486  if (!arr_ptr) {
1487  LOG(FATAL) << "Overlaps key currently only supported for geospatial columns and "
1488  "constructed points.";
1489  }
1490 
1491  for (size_t i = 0; i < 2; i++) {
1492  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
1493 
1494  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1495  // future compression schemes
1496  auto bucket_key =
1497  outer_geo_ti.get_compression() == kENCODING_GEOINT
1498  ? executor_->cgen_state_->emitExternalCall(
1499  "get_bucket_key_for_range_compressed",
1500  get_int_type(64, LL_CONTEXT),
1501  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1502  : executor_->cgen_state_->emitExternalCall(
1503  "get_bucket_key_for_range_double",
1504  get_int_type(64, LL_CONTEXT),
1505  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1506  const auto col_lv = LL_BUILDER.CreateSExt(
1507  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1508  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1509  }
1510  return key_buff_lv;
1511 }
1512 
1513 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
1514  const CompilationOptions& co) {
1515  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1516  const auto key_component_width = getKeyComponentWidth();
1517  CHECK(key_component_width == 4 || key_component_width == 8);
1518  auto hash_table = getHashTableForDevice(size_t(0));
1519  CHECK(hash_table);
1521 
1522  VLOG(1) << "Performing codgen for ManyToMany";
1523  const auto& inner_outer_pair = inner_outer_pairs_[0];
1524  const auto outer_col = inner_outer_pair.second;
1525 
1526  CodeGenerator code_generator(executor_);
1527  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1528  CHECK_EQ(col_lvs.size(), size_t(1));
1529 
1530  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1531  CHECK(outer_col_var);
1532  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1533  outer_col_var->get_table_id(), outer_col_var->get_column_id());
1534  CHECK(coords_cd);
1535 
1536  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1537  "array_buff",
1538  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1539  {col_lvs.front(), code_generator.posArg(outer_col)});
1540 
1541  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1542  // const auto arr_ptr =
1543  // code_generator.castArrayPointer(array_ptr,
1544  // coords_cd->columnType.get_elem_type());
1545  array_ptr->setName("array_ptr");
1546 
1547  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1548  "get_num_buckets_for_bounds",
1549  get_int_type(32, LL_CONTEXT),
1550  {array_ptr,
1551  LL_INT(0),
1552  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1553  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1554  num_keys_lv->setName("num_keys_lv");
1555 
1556  return {num_keys_lv, array_ptr};
1557 }
1558 
1560  const CompilationOptions& co,
1561  const size_t index) {
1562  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1563  if (getHashType() == HashType::ManyToMany) {
1564  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1565  const auto key_component_width = getKeyComponentWidth();
1566  CHECK(key_component_width == 4 || key_component_width == 8);
1567  auto many_to_many_args = codegenManyKey(co);
1568  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1569  const auto composite_dict_ptr_type =
1570  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1571  const auto composite_key_dict =
1572  hash_ptr->getType()->isPointerTy()
1573  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1574  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1575  const auto key_component_count = getKeyComponentCount();
1576 
1577  auto one_to_many_ptr = hash_ptr;
1578 
1579  if (one_to_many_ptr->getType()->isPointerTy()) {
1580  one_to_many_ptr =
1581  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1582  } else {
1583  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1584  }
1585 
1586  const auto composite_key_dict_size = offsetBufferOff();
1587  one_to_many_ptr =
1588  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1589 
1590  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1591  // this is likely the maximum value we can do that is safe to use across
1592  // all supported GPU architectures.
1593  const int max_array_size = 200;
1594  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1595  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1596  out_arr_lv->setName("out_arr");
1597 
1598  const auto casted_out_arr_lv =
1599  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1600 
1601  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1602 
1603  auto rowid_ptr_i32 =
1604  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1605 
1606  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1607  "get_candidate_rows",
1608  llvm::Type::getInt64Ty(LL_CONTEXT),
1609  {
1610  rowid_ptr_i32,
1611  LL_INT(max_array_size),
1612  many_to_many_args[1],
1613  LL_INT(0),
1616  many_to_many_args[0],
1617  LL_INT(key_component_count), // key_component_count
1618  composite_key_dict, // ptr to hash table
1619  LL_INT(getEntryCount()), // entry_count
1620  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1621  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1622  });
1623 
1624  const auto slot_lv = LL_INT(int64_t(0));
1625 
1626  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1627  } else {
1628  VLOG(1) << "Building codegenMatchingSet for Baseline";
1629  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1630  const auto key_component_width = getKeyComponentWidth();
1631  CHECK(key_component_width == 4 || key_component_width == 8);
1632  auto key_buff_lv = codegenKey(co);
1634  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1635  const auto composite_dict_ptr_type =
1636  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1637  const auto composite_key_dict =
1638  hash_ptr->getType()->isPointerTy()
1639  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1640  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1641  const auto key_component_count = getKeyComponentCount();
1642  const auto key = executor_->cgen_state_->emitExternalCall(
1643  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1644  get_int_type(64, LL_CONTEXT),
1645  {key_buff_lv,
1646  LL_INT(key_component_count),
1647  composite_key_dict,
1648  LL_INT(getEntryCount())});
1649  auto one_to_many_ptr = hash_ptr;
1650  if (one_to_many_ptr->getType()->isPointerTy()) {
1651  one_to_many_ptr =
1652  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1653  } else {
1654  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1655  }
1656  const auto composite_key_dict_size = offsetBufferOff();
1657  one_to_many_ptr =
1658  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1660  std::vector<llvm::Value*>{
1661  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1662  false,
1663  false,
1664  false,
1666  executor_);
1667  }
1668  UNREACHABLE();
1669  return HashJoinMatchingSet{};
1670 }
1671 
1673  const int device_id,
1674  bool raw) const {
1675  auto buffer = getJoinHashBuffer(device_type, device_id);
1676  CHECK_LT(device_id, hash_tables_for_device_.size());
1677  auto hash_table = hash_tables_for_device_[device_id];
1678  CHECK(hash_table);
1679  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1680 #ifdef HAVE_CUDA
1681  std::unique_ptr<int8_t[]> buffer_copy;
1682  if (device_type == ExecutorDeviceType::GPU) {
1683  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1684  CHECK(executor_);
1685  auto data_mgr = executor_->getDataMgr();
1686  auto device_allocator = data_mgr->createGpuAllocator(device_id);
1687 
1688  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1689  }
1690  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1691 #else
1692  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1693 #endif // HAVE_CUDA
1694  auto ptr2 = ptr1 + offsetBufferOff();
1695  auto ptr3 = ptr1 + countBufferOff();
1696  auto ptr4 = ptr1 + payloadBufferOff();
1697  CHECK(hash_table);
1698  const auto layout = getHashType();
1699  return HashTable::toString(
1700  "geo",
1701  getHashTypeString(layout),
1702  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1704  hash_table->getEntryCount(),
1705  ptr1,
1706  ptr2,
1707  ptr3,
1708  ptr4,
1709  buffer_size,
1710  raw);
1711 }
1712 
1713 std::set<DecodedJoinHashBufferEntry> OverlapsJoinHashTable::toSet(
1714  const ExecutorDeviceType device_type,
1715  const int device_id) const {
1716  auto buffer = getJoinHashBuffer(device_type, device_id);
1717  auto hash_table = getHashTableForDevice(device_id);
1718  CHECK(hash_table);
1719  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1720 #ifdef HAVE_CUDA
1721  std::unique_ptr<int8_t[]> buffer_copy;
1722  if (device_type == ExecutorDeviceType::GPU) {
1723  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1724  CHECK(executor_);
1725  auto data_mgr = executor_->getDataMgr();
1726  auto allocator = data_mgr->createGpuAllocator(device_id);
1727 
1728  allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1729  }
1730  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1731 #else
1732  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1733 #endif // HAVE_CUDA
1734  auto ptr2 = ptr1 + offsetBufferOff();
1735  auto ptr3 = ptr1 + countBufferOff();
1736  auto ptr4 = ptr1 + payloadBufferOff();
1737  const auto layout = getHashType();
1738  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1740  hash_table->getEntryCount(),
1741  ptr1,
1742  ptr2,
1743  ptr3,
1744  ptr4,
1745  buffer_size);
1746 }
1747 
1749  const std::vector<InnerOuter>& inner_outer_pairs) const {
1750  // always build on CPU
1753  if (this->executor_->getDataMgr()->gpusPresent() &&
1755  VLOG(1) << "A user forces to build GPU hash table for this overlaps join operator";
1757  }
1758  }
1760 }
1761 
1763  try {
1765  } catch (...) {
1766  CHECK(false);
1767  }
1768  return 0;
1769 }
1770 
1772  QueryPlanHash key,
1773  CacheItemType item_type,
1774  DeviceIdentifier device_identifier) {
1775  auto timer = DEBUG_TIMER(__func__);
1776  VLOG(1) << "Checking CPU hash table cache.";
1778  HashtableCacheMetaInfo meta_info;
1780  auto cached_hashtable =
1781  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1782  if (cached_hashtable) {
1783  return cached_hashtable;
1784  }
1785  return nullptr;
1786 }
1787 
1788 std::optional<std::pair<size_t, size_t>>
1790  QueryPlanHash key,
1791  CacheItemType item_type,
1792  DeviceIdentifier device_identifier) {
1794  HashtableCacheMetaInfo metaInfo;
1796  auto cached_hashtable =
1797  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1798  if (cached_hashtable) {
1799  return std::make_pair(cached_hashtable->getEntryCount() / 2,
1800  cached_hashtable->getEmittedKeysCount());
1801  }
1802  return std::nullopt;
1803 }
1804 
1806  QueryPlanHash key,
1807  CacheItemType item_type,
1808  std::shared_ptr<HashTable> hashtable_ptr,
1809  DeviceIdentifier device_identifier,
1810  size_t hashtable_building_time) {
1812  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1813  HashtableCacheMetaInfo meta_info;
1815  hash_table_cache_->putItemToCache(
1816  key,
1817  hashtable_ptr,
1818  item_type,
1819  device_identifier,
1820  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1821  hashtable_building_time,
1822  meta_info);
1823 }
llvm::Value * codegenKey(const CompilationOptions &)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int getInnerTableId() const noexceptoverride
size_t DeviceIdentifier
Definition: DataRecycler.h:111
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:108
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
QueryPlanHash getCacheKey() const
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:99
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold)
bool overlaps_allow_gpu_build
Definition: QueryHint.h:181
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define const
#define LOG(tag)
Definition: Logger.h:203
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const RegisteredQueryHint & getRegisteredQueryHint()
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:512
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:294
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:253
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
RegisteredQueryHint query_hint_
#define CHECK_GE(x, y)
Definition: Logger.h:222
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:357
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:162
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:221
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
static std::pair< QueryPlan, HashtableCacheMetaInfo > getHashtableKeyString(const std::vector< InnerOuter > &inner_outer_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, Executor *executor)
const std::shared_ptr< Analyzer::BinOper > condition_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:81
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
future< Result > async(Fn &&fn, Args &&...args)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:218
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:36
int count
ColumnCacheMap & column_cache_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
size_t payloadBufferOff() const noexceptoverride
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:258
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
TuningState(const size_t overlaps_max_table_size_bytes, const double overlaps_target_entries_per_bin)
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:102
#define LL_BUILDER
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:101
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
virtual int getInnerTableId() const noexcept=0
#define AUTOMATIC_IR_METADATA(CGENSTATE)
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:239
#define VLOGGING(n)
Definition: Logger.h:207
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:334
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:219
std::optional< HashType > layout_override_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadId parent_thread_id)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
#define CHECK_LE(x, y)
Definition: Logger.h:220
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:134
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:345
size_t QueryPlanHash
bool tuneOneStep(const TuningState::TuningDirection tuning_direction)
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
const bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:215
bool operator()(const HashTableProps &new_props, const bool new_overlaps_threshold)
ThreadId thread_id()
Definition: Logger.cpp:791
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
static std::vector< InnerOuter > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:744
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
char * t
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:83
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
#define LL_CONTEXT
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:132
int cpu_threads()
Definition: thread_count.h:24
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
CompositeKeyInfo composite_key_info_
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
bool isAnyQueryHintDelivered() const
Definition: QueryHint.h:199
#define VLOG(n)
Definition: Logger.h:303
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:130
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count)
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap