OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BoundingBoxIntersectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
29 #include "QueryEngine/enums.h"
30 
31 std::unique_ptr<HashtableRecycler> BoundingBoxIntersectJoinHashTable::hash_table_cache_ =
32  std::make_unique<HashtableRecycler>(CacheItemType::BBOX_INTERSECT_HT,
34 std::unique_ptr<BoundingBoxIntersectTuningParamRecycler>
36  std::make_unique<BoundingBoxIntersectTuningParamRecycler>();
37 
39 std::shared_ptr<BoundingBoxIntersectJoinHashTable>
41  const std::shared_ptr<Analyzer::BinOper> condition,
42  const std::vector<InputTableInfo>& query_infos,
43  const Data_Namespace::MemoryLevel memory_level,
44  const JoinType join_type,
45  const int device_count,
46  ColumnCacheMap& column_cache,
47  Executor* executor,
48  const HashTableBuildDagMap& hashtable_build_dag_map,
49  const RegisteredQueryHint& query_hints,
50  const TableIdToNodeMap& table_id_to_node_map) {
51  decltype(std::chrono::steady_clock::now()) ts1, ts2;
52  auto copied_query_hints = query_hints;
53  if (query_hints.force_one_to_many_hash_join) {
54  LOG(INFO) << "Ignoring query hint \'force_one_to_many_hash_join\' for bounding box "
55  "intersection";
56  copied_query_hints.force_one_to_many_hash_join = false;
57  }
58  if (query_hints.force_baseline_hash_join) {
59  LOG(INFO) << "Ignoring query hint \'force_baseline_hash_join\' for bounding box "
60  "intersection";
61  copied_query_hints.force_baseline_hash_join = false;
62  }
63  std::vector<InnerOuter> inner_outer_pairs;
64  if (const auto range_expr =
65  dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
66  return RangeJoinHashTable::getInstance(condition,
67  range_expr,
68  query_infos,
69  memory_level,
70  join_type,
71  device_count,
72  column_cache,
73  executor,
74  hashtable_build_dag_map,
75  copied_query_hints,
76  table_id_to_node_map);
77  } else {
78  inner_outer_pairs =
79  HashJoin::normalizeColumnPairs(condition.get(), executor->getTemporaryTables())
80  .first;
81  }
82  CHECK(!inner_outer_pairs.empty());
83 
84  const auto getHashTableType =
85  [](const std::shared_ptr<Analyzer::BinOper> condition,
86  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
88  if (condition->is_bbox_intersect_oper()) {
89  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
90  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
91  inner_outer_pairs[0].second->get_type_info().is_array() &&
92  // Bounds vs constructed points, former should yield ManyToMany
93  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
94  layout = HashType::ManyToMany;
95  }
96  }
97  return layout;
98  };
99 
100  const auto layout = getHashTableType(condition, inner_outer_pairs);
101 
102  if (VLOGGING(1)) {
103  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
104  << " for qual: " << condition->toString();
105  ts1 = std::chrono::steady_clock::now();
106  }
107 
108  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
109  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
110 
111  VLOG(1) << "table_key = " << query_infos[0].table_key << " has " << qi_0 << " tuples.";
112  VLOG(1) << "table_key = " << query_infos[1].table_key << " has " << qi_1 << " tuples.";
113 
114  const auto& query_info =
115  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
116  .info;
117  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
118  if (total_entries > HashJoin::MAX_NUM_HASH_ENTRIES) {
119  throw TooManyHashEntries();
120  }
121 
122  auto join_hash_table =
123  std::make_shared<BoundingBoxIntersectJoinHashTable>(condition,
124  join_type,
125  query_infos,
126  memory_level,
127  column_cache,
128  executor,
129  inner_outer_pairs,
130  device_count,
131  copied_query_hints,
132  hashtable_build_dag_map,
133  table_id_to_node_map);
134  try {
135  join_hash_table->reify(layout);
136  } catch (const HashJoinFail& e) {
137  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
138  "involved in bounding box intersection | ") +
139  e.what());
140  } catch (const ColumnarConversionNotSupported& e) {
141  throw HashJoinFail(
142  std::string("Could not build hash tables for bounding box intersection | "
143  "Inner table too big. Attempt manual table reordering "
144  "or create a single fragment inner table. | ") +
145  e.what());
146  } catch (const JoinHashTableTooBig& e) {
147  throw e;
148  } catch (const std::exception& e) {
149  throw HashJoinFail(
150  std::string("Failed to build hash tables for bounding box intersection | ") +
151  e.what());
152  }
153  if (VLOGGING(1)) {
154  ts2 = std::chrono::steady_clock::now();
155  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
156  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
157  << " ms";
158  }
159  return join_hash_table;
160 }
161 
162 namespace {
163 
165  const std::vector<double>& bucket_sizes,
166  const std::vector<double>& bucket_thresholds,
167  const double initial_value) {
168  std::vector<double> corrected_bucket_sizes(bucket_sizes);
169  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
170  if (bucket_sizes[i] == initial_value) {
171  corrected_bucket_sizes[i] = bucket_thresholds[i];
172  }
173  }
174  return corrected_bucket_sizes;
175 }
176 
177 std::vector<double> compute_bucket_sizes(
178  const std::vector<double>& bucket_thresholds,
179  const Data_Namespace::MemoryLevel effective_memory_level,
180  const JoinColumn& join_column,
181  const JoinColumnTypeInfo& join_column_type,
182  const std::vector<InnerOuter>& inner_outer_pairs,
183  const Executor* executor) {
184  // No coalesced keys for bounding box intersection yet
185  CHECK_EQ(inner_outer_pairs.size(), 1u);
186 
187  const auto col = inner_outer_pairs[0].first;
188  CHECK(col);
189  const auto col_ti = col->get_type_info();
190  CHECK(col_ti.is_array());
191 
192  // TODO: Compute the number of dimensions for keys used to perform bounding box
193  // intersection
194  const size_t num_dims{2};
195  const double initial_bin_value{0.0};
196  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
197  CHECK_EQ(bucket_thresholds.size(), num_dims);
198 
199  VLOG(1) << "Computing x and y bucket sizes for bounding box intersection with maximum "
200  "bucket size "
201  << std::to_string(bucket_thresholds[0]) << ", "
202  << std::to_string(bucket_thresholds[1]);
203 
204  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
205  const int thread_count = cpu_threads();
207  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
208  }
209 #ifdef HAVE_CUDA
210  else {
211  // Note that we compute the bucket sizes using only a single GPU
212  const int device_id = 0;
213  auto data_mgr = executor->getDataMgr();
214  CudaAllocator allocator(
215  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
216  auto device_bucket_sizes_gpu =
217  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
218  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
219  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
220  auto device_bucket_thresholds_gpu =
221  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
222 
223  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
224  join_column_gpu,
225  join_column_type_gpu,
226  device_bucket_thresholds_gpu);
227  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
228  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
229  bucket_sizes.size() * sizeof(double));
230  }
231 #endif
232  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
233  bucket_sizes, bucket_thresholds, initial_bin_value);
234 
235  VLOG(1) << "Computed x and y bucket sizes for bounding box intersection: ("
236  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
237 
238  return corrected_bucket_sizes;
239 }
240 
242  HashTableProps(const size_t entry_count,
243  const size_t emitted_keys_count,
244  const size_t hash_table_size,
245  const std::vector<double>& bucket_sizes)
246  : entry_count(entry_count)
247  , emitted_keys_count(emitted_keys_count)
248  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
249  : emitted_keys_count / (entry_count / 2.0))
250  , hash_table_size(hash_table_size)
251  , bucket_sizes(bucket_sizes) {}
252 
253  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
254 
255  size_t entry_count;
257  double keys_per_bin;
259  std::vector<double> bucket_sizes;
260 };
261 
262 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
263  os << " entry_count: " << props.entry_count << ", emitted_keys "
264  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
265  << ", keys per bin " << props.keys_per_bin;
266  return os;
267 }
268 
269 struct TuningState {
270  TuningState(const size_t bbox_intersect_max_table_size_bytes,
271  const double bbox_intersect_target_entries_per_bin)
272  : crt_props(HashTableProps::invalid())
273  , prev_props(HashTableProps::invalid())
274  , chosen_bbox_intersect_threshold(-1)
275  , crt_step(0)
276  , crt_reverse_search_iteration(0)
277  , bbox_intersect_max_table_size_bytes(bbox_intersect_max_table_size_bytes)
278  , bbox_intersect_target_entries_per_bin(bbox_intersect_target_entries_per_bin) {}
279 
280  // current and previous props, allows for easy backtracking
283 
284  // value we are tuning for
286  enum class TuningDirection { SMALLER, LARGER };
287  TuningDirection tuning_direction{TuningDirection::SMALLER};
288 
289  // various constants / state
290  size_t crt_step; // 1 indexed
291  size_t crt_reverse_search_iteration; // 1 indexed
294  const size_t max_reverse_search_iterations{8};
295 
299  bool operator()(const HashTableProps& new_props,
300  const bool new_bbox_intersect_threshold) {
301  prev_props = crt_props;
302  crt_props = new_props;
303  crt_step++;
304 
305  if (hashTableTooBig() || keysPerBinIncreasing()) {
306  if (hashTableTooBig()) {
307  VLOG(1) << "Reached hash table size limit: "
308  << bbox_intersect_max_table_size_bytes << " with "
309  << crt_props.hash_table_size << " byte hash table, "
310  << crt_props.keys_per_bin << " keys per bin.";
311  } else if (keysPerBinIncreasing()) {
312  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
313  << crt_props.keys_per_bin;
314  CHECK(previousIterationValid());
315  }
316  if (previousIterationValid()) {
317  VLOG(1) << "Using previous threshold value " << chosen_bbox_intersect_threshold;
318  crt_props = prev_props;
319  return false;
320  } else {
321  CHECK(hashTableTooBig());
322  crt_reverse_search_iteration++;
323  chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
324 
325  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
326  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
327  << ") of reverse tuning iterations. Aborting tuning";
328  // use the crt props, but don't bother trying to tune any farther
329  return false;
330  }
331 
332  if (crt_reverse_search_iteration > 1 &&
333  crt_props.hash_table_size == prev_props.hash_table_size) {
334  // hash table size is not changing, bail
335  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
336  << " bytes) and still above maximum allowed size ("
337  << bbox_intersect_max_table_size_bytes << " bytes). Aborting tuning";
338  return false;
339  }
340 
341  // if the hash table is too big on the very first step, change direction towards
342  // larger bins to see if a slightly smaller hash table will fit
343  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
344  VLOG(1)
345  << "First iteration of tuning led to hash table size over "
346  "limit. Reversing search to try larger bin sizes (previous threshold: "
347  << chosen_bbox_intersect_threshold << ")";
348  // Need to change direction of tuning to tune "up" towards larger bins
349  tuning_direction = TuningDirection::LARGER;
350  }
351  return true;
352  }
353  UNREACHABLE();
354  }
355 
356  chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
357 
358  if (keysPerBinUnderThreshold()) {
359  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
360  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
361  << bbox_intersect_target_entries_per_bin
362  << ". Terminating bucket size loop.";
363  return false;
364  }
365 
366  if (crt_reverse_search_iteration > 0) {
367  // We always take the first tuning iteration that succeeds when reversing
368  // direction, as if we're here we haven't had a successful iteration and we're
369  // "backtracking" our search by making bin sizes larger
370  VLOG(1) << "On reverse (larger tuning direction) search found workable "
371  << " hash table size of " << crt_props.hash_table_size
372  << " with keys per bin " << crt_props.keys_per_bin
373  << ". Terminating bucket size loop.";
374  return false;
375  }
376 
377  return true;
378  }
379 
380  bool hashTableTooBig() const {
381  return crt_props.hash_table_size > bbox_intersect_max_table_size_bytes;
382  }
383 
384  bool keysPerBinIncreasing() const {
385  return crt_props.keys_per_bin > prev_props.keys_per_bin;
386  }
387 
388  bool previousIterationValid() const {
389  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
390  }
391 
393  return crt_props.keys_per_bin < bbox_intersect_target_entries_per_bin;
394  }
395 };
396 
398  public:
399  BucketSizeTuner(const double bucket_threshold,
400  const double step,
401  const double min_threshold,
402  const Data_Namespace::MemoryLevel effective_memory_level,
403  const std::vector<ColumnsForDevice>& columns_per_device,
404  const std::vector<InnerOuter>& inner_outer_pairs,
405  const size_t table_tuple_count,
406  const Executor* executor)
407  : num_dims_(2) // Todo: allow varying number of dims
408  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
409  , step_(step)
410  , min_threshold_(min_threshold)
411  , effective_memory_level_(effective_memory_level)
412  , columns_per_device_(columns_per_device)
413  , inner_outer_pairs_(inner_outer_pairs)
414  , table_tuple_count_(table_tuple_count)
415  , executor_(executor) {
416  CHECK(!columns_per_device_.empty());
417  }
418 
419  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
420 
421  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
422  return tuneOneStep(tuning_direction, step_);
423  }
424 
425  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
426  const double step_overide) {
427  if (table_tuple_count_ == 0) {
428  return false;
429  }
430  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
431  return tuneSmallerOneStep(step_overide);
432  }
433  return tuneLargerOneStep(step_overide);
434  }
435 
436  auto getMinBucketSize() const {
437  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
438  }
439 
446  std::vector<double> getInverseBucketSizes() {
447  if (num_steps_ == 0) {
448  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
449  current_bucket_sizes_ = computeBucketSizes();
450  }
451  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
452  std::vector<double> inverse_bucket_sizes;
453  for (const auto s : current_bucket_sizes_) {
454  inverse_bucket_sizes.emplace_back(1.0 / s);
455  }
456  return inverse_bucket_sizes;
457  }
458 
459  private:
461  for (const auto& t : bucket_thresholds_) {
462  if (t < min_threshold_) {
463  return true;
464  }
465  }
466  return false;
467  }
468 
469  std::vector<double> computeBucketSizes() const {
470  if (table_tuple_count_ == 0) {
471  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
472  }
473  return compute_bucket_sizes(bucket_thresholds_,
474  effective_memory_level_,
475  columns_per_device_.front().join_columns[0],
476  columns_per_device_.front().join_column_types[0],
477  inner_outer_pairs_,
478  executor_);
479  }
480 
481  bool tuneSmallerOneStep(const double step_overide) {
482  if (!current_bucket_sizes_.empty()) {
483  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
484  bucket_thresholds_ = current_bucket_sizes_;
485  for (auto& t : bucket_thresholds_) {
486  t /= step_overide;
487  }
488  }
489  if (bucketThresholdsBelowMinThreshold()) {
490  VLOG(1) << "Aborting tuning for bounding box intersection as at least one bucket "
491  "size is below min threshold";
492  return false;
493  }
494  const auto next_bucket_sizes = computeBucketSizes();
495  if (next_bucket_sizes == current_bucket_sizes_) {
496  VLOG(1) << "Aborting tuning for bounding box intersection as bucket size is no "
497  "longer changing.";
498  return false;
499  }
500 
501  current_bucket_sizes_ = next_bucket_sizes;
502  num_steps_++;
503  return true;
504  }
505 
506  bool tuneLargerOneStep(const double step_overide) {
507  if (!current_bucket_sizes_.empty()) {
508  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
509  bucket_thresholds_ = current_bucket_sizes_;
510  }
511  // If current_bucket_sizes was empty, we will start from our initial threshold
512  for (auto& t : bucket_thresholds_) {
513  t *= step_overide;
514  }
515  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
516  // written will pick the largest bin size below the threshold, meaning our bucket_size
517  // will never increase beyond the size of the largest polygon. This could mean that we
518  // can never make the bucket sizes large enough to get our hash table below the
519  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
520  // allows for optionally finding smallest extent above threshold, to mirror default
521  // behavior finding largest extent below threshold, and use former variant here
522  current_bucket_sizes_ = bucket_thresholds_;
523  num_steps_++;
524  return true;
525  }
526 
527  size_t num_dims_;
528  std::vector<double> bucket_thresholds_;
529  size_t num_steps_{0};
530  const double step_;
531  const double min_threshold_;
533  const std::vector<ColumnsForDevice>& columns_per_device_;
534  const std::vector<InnerOuter>& inner_outer_pairs_;
535  const size_t table_tuple_count_;
536  const Executor* executor_;
537 
538  std::vector<double> current_bucket_sizes_;
539 
540  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
541 };
542 
543 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
544  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed << "("
545  << tuner.bucket_thresholds_[0] << ", " << tuner.bucket_thresholds_[1] << ")"
546  << ", Step Size: " << std::fixed << tuner.step_ << ", Min: " << std::fixed
547  << tuner.min_threshold_;
548  return os;
549 }
550 
551 } // namespace
552 
554  auto timer = DEBUG_TIMER(__func__);
556  const auto& query_info =
558  .info;
559  auto [db_id, table_id] = HashJoin::getInnerTableId(inner_outer_pairs_);
560  VLOG(1) << "Reify with layout " << getHashTypeString(layout) << "for db_id: " << db_id
561  << ", table_id: " << table_id;
562  if (query_info.fragments.empty()) {
563  return;
564  }
565 
566  auto bbox_intersect_max_table_size_bytes = g_bbox_intersect_max_table_size_bytes;
567  std::optional<double> bbox_intersect_threshold_override;
568  double bbox_intersect_target_entries_per_bin = g_bbox_intersect_target_entries_per_bin;
569  auto skip_hashtable_caching = false;
571  VLOG(1) << "Setting bounding box intersection bucket threshold "
572  "\'bbox_intersect_bucket_threshold\' via "
573  "query hint: "
575  bbox_intersect_threshold_override = query_hints_.bbox_intersect_bucket_threshold;
576  }
578  std::ostringstream oss;
579  oss << "User requests to change a threshold \'bbox_intersect_max_table_size_bytes\' "
580  "via "
581  "query hint";
582  if (!bbox_intersect_threshold_override.has_value()) {
583  oss << ": " << bbox_intersect_max_table_size_bytes << " -> "
585  bbox_intersect_max_table_size_bytes = query_hints_.bbox_intersect_max_size;
586  } else {
587  oss << ", but is skipped since the query hint also changes the threshold "
588  "\'bbox_intersect_bucket_threshold\'";
589  }
590  VLOG(1) << oss.str();
591  }
593  VLOG(1) << "User requests to skip caching join hashtable for bounding box "
594  "intersection and its tuned "
595  "parameters for this query";
596  skip_hashtable_caching = true;
597  }
599  VLOG(1) << "User requests to change a threshold \'bbox_intersect_keys_per_bin\' via "
600  "query "
601  "hint: "
602  << bbox_intersect_target_entries_per_bin << " -> "
604  bbox_intersect_target_entries_per_bin = query_hints_.bbox_intersect_keys_per_bin;
605  }
606 
607  auto data_mgr = executor_->getDataMgr();
608  // we prioritize CPU when building a join hashtable for bounding box intersection, but
609  // if we have GPU and user-given hint is given we selectively allow GPU to build it but
610  // even if we have GPU but user foces to set CPU as execution device type we should not
611  // allow to use GPU for building it
612  auto allow_gpu_hashtable_build =
615  if (allow_gpu_hashtable_build) {
616  if (data_mgr->gpusPresent() &&
618  VLOG(1) << "A user forces to build GPU hash table for bounding box intersection";
619  } else {
620  allow_gpu_hashtable_build = false;
621  VLOG(1) << "A user forces to build GPU hash table for bounding box intersection "
622  "but we skip it since either GPU is not presented or CPU execution mode "
623  "is set";
624  }
625  }
626 
627  std::vector<ColumnsForDevice> columns_per_device;
628  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
630  allow_gpu_hashtable_build) {
631  for (int device_id = 0; device_id < device_count_; ++device_id) {
632  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
633  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
634  }
635  }
636 
637  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
638  const auto shard_count = shardCount();
639  size_t total_num_tuples = 0;
640  for (int device_id = 0; device_id < device_count_; ++device_id) {
641  fragments_per_device.emplace_back(
642  shard_count
643  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
644  : query_info.fragments);
645  const size_t crt_num_tuples =
646  std::accumulate(fragments_per_device.back().begin(),
647  fragments_per_device.back().end(),
648  size_t(0),
649  [](const auto& sum, const auto& fragment) {
650  return sum + fragment.getNumTuples();
651  });
652  total_num_tuples += crt_num_tuples;
653  const auto columns_for_device =
654  fetchColumnsForDevice(fragments_per_device.back(),
655  device_id,
657  allow_gpu_hashtable_build
658  ? dev_buff_owners[device_id].get()
659  : nullptr);
660  columns_per_device.push_back(columns_for_device);
661  }
662 
663  // try to extract cache key for hash table and its relevant info
664  auto hashtable_access_path_info =
666  {},
667  condition_->get_optype(),
668  join_type_,
671  shard_count,
672  fragments_per_device,
673  executor_);
674  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
675  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
676  table_keys_ = hashtable_access_path_info.table_keys;
677 
678  auto get_inner_table_key = [this]() {
679  auto col_var = inner_outer_pairs_.front().first;
680  return col_var->getTableKey();
681  };
682 
683  if (table_keys_.empty()) {
684  const auto& table_key = get_inner_table_key();
687  }
688  CHECK(!table_keys_.empty());
689 
690  if (bbox_intersect_threshold_override) {
691  // compute bucket sizes based on the user provided threshold
692  BucketSizeTuner tuner(/*initial_threshold=*/*bbox_intersect_threshold_override,
693  /*step=*/1.0,
694  /*min_threshold=*/0.0,
696  columns_per_device,
698  total_num_tuples,
699  executor_);
700  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
701 
702  auto [entry_count, emitted_keys_count] =
703  computeHashTableCounts(shard_count,
704  inverse_bucket_sizes,
705  columns_per_device,
706  bbox_intersect_max_table_size_bytes,
707  *bbox_intersect_threshold_override);
708  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
709  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
710  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
711  // one
712  generateCacheKey(bbox_intersect_max_table_size_bytes,
713  *bbox_intersect_threshold_override,
714  inverse_bucket_sizes,
715  fragments_per_device,
716  device_count_);
717  reifyImpl(columns_per_device,
718  query_info,
719  layout,
720  shard_count,
721  entry_count,
722  emitted_keys_count,
723  skip_hashtable_caching,
724  bbox_intersect_max_table_size_bytes,
725  *bbox_intersect_threshold_override);
726  } else {
727  double bbox_intersect_bucket_threshold = std::numeric_limits<double>::max();
728  generateCacheKey(bbox_intersect_max_table_size_bytes,
729  bbox_intersect_bucket_threshold,
730  {},
731  fragments_per_device,
732  device_count_);
733  std::vector<size_t> per_device_chunk_key;
734  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
735  get_inner_table_key().table_id > 0) {
736  for (int device_id = 0; device_id < device_count_; ++device_id) {
738  boost::hash_combine(
739  chunk_key_hash,
740  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
741  per_device_chunk_key.push_back(chunk_key_hash);
744  columns_per_device.front().join_columns.front().num_elems,
745  chunk_key_hash,
746  condition_->get_optype(),
747  bbox_intersect_max_table_size_bytes,
748  bbox_intersect_bucket_threshold,
749  {}};
750  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
751  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
752  table_keys_);
753  }
754  }
755 
756  auto cached_bucket_threshold = auto_tuner_cache_->getItemFromCache(
757  hashtable_cache_key_.front(),
760  if (cached_bucket_threshold) {
761  bbox_intersect_bucket_threshold = cached_bucket_threshold->bucket_threshold;
762  auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
763  setBoundingBoxIntersectionMetaInfo(bbox_intersect_max_table_size_bytes,
764  bbox_intersect_bucket_threshold,
765  inverse_bucket_sizes);
766  generateCacheKey(bbox_intersect_max_table_size_bytes,
767  bbox_intersect_bucket_threshold,
768  inverse_bucket_sizes,
769  fragments_per_device,
770  device_count_);
771 
772  if (auto hash_table =
773  hash_table_cache_->getItemFromCache(hashtable_cache_key_[device_count_],
776  std::nullopt)) {
777  // if we already have a built hash table, we can skip the scans required for
778  // computing bucket size and tuple count
779  // reset as the hash table sizes can vary a bit
780  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
781  CHECK(hash_table);
782 
783  VLOG(1) << "Using cached hash table bucket size";
784 
785  reifyImpl(columns_per_device,
786  query_info,
787  layout,
788  shard_count,
789  hash_table->getEntryCount(),
790  hash_table->getEmittedKeysCount(),
791  skip_hashtable_caching,
792  bbox_intersect_max_table_size_bytes,
793  bbox_intersect_bucket_threshold);
794  } else {
795  VLOG(1) << "Computing bucket size for cached bucket threshold";
796  // compute bucket size using our cached tuner value
797  BucketSizeTuner tuner(/*initial_threshold=*/bbox_intersect_bucket_threshold,
798  /*step=*/1.0,
799  /*min_threshold=*/0.0,
801  columns_per_device,
803  total_num_tuples,
804  executor_);
805 
806  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
807 
808  auto [entry_count, emitted_keys_count] =
809  computeHashTableCounts(shard_count,
810  inverse_bucket_sizes,
811  columns_per_device,
812  bbox_intersect_max_table_size_bytes,
813  bbox_intersect_bucket_threshold);
814  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
815 
816  generateCacheKey(bbox_intersect_max_table_size_bytes,
817  bbox_intersect_bucket_threshold,
818  inverse_bucket_sizes,
819  fragments_per_device,
820  device_count_);
821 
822  reifyImpl(columns_per_device,
823  query_info,
824  layout,
825  shard_count,
826  entry_count,
827  emitted_keys_count,
828  skip_hashtable_caching,
829  bbox_intersect_max_table_size_bytes,
830  bbox_intersect_bucket_threshold);
831  }
832  } else {
833  // compute bucket size using the auto tuner
834  BucketSizeTuner tuner(
835  /*initial_threshold=*/bbox_intersect_bucket_threshold,
836  /*step=*/2.0,
837  /*min_threshold=*/1e-7,
839  columns_per_device,
841  total_num_tuples,
842  executor_);
843 
844  VLOG(1) << "Running auto tune logic for bounding box intersection with parameters: "
845  << tuner;
846 
847  // manages the tuning state machine
848  TuningState tuning_state(bbox_intersect_max_table_size_bytes,
849  bbox_intersect_target_entries_per_bin);
850  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
851  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
852 
853  const auto [crt_entry_count, crt_emitted_keys_count] =
854  computeHashTableCounts(shard_count,
855  inverse_bucket_sizes,
856  columns_per_device,
857  tuning_state.bbox_intersect_max_table_size_bytes,
858  tuning_state.chosen_bbox_intersect_threshold);
859  const size_t hash_table_size = calculateHashTableSize(
860  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
861  HashTableProps crt_props(crt_entry_count,
862  crt_emitted_keys_count,
863  hash_table_size,
864  inverse_bucket_sizes);
865  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
866 
867  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
869  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
870  if (!should_continue) {
871  break;
872  }
873  }
874 
875  const auto& crt_props = tuning_state.crt_props;
876  // sanity check that the hash table size has not changed. this is a fairly
877  // inexpensive check to ensure the above algorithm is consistent
878  const size_t hash_table_size =
880  crt_props.emitted_keys_count,
881  crt_props.entry_count);
882  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
883 
885  hash_table_size > bbox_intersect_max_table_size_bytes) {
886  VLOG(1) << "Could not find suitable parameters to create hash "
887  "table for bounding box intersectionunder max allowed size ("
888  << bbox_intersect_max_table_size_bytes << ") bytes.";
889  throw TooBigHashTableForBoundingBoxIntersect(bbox_intersect_max_table_size_bytes);
890  }
891 
892  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
894  VLOG(1) << "Final bucket sizes: ";
895  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
896  VLOG(1) << "dim[" << dim
897  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
898  }
899  CHECK_GE(tuning_state.chosen_bbox_intersect_threshold, double(0));
900  generateCacheKey(tuning_state.bbox_intersect_max_table_size_bytes,
901  tuning_state.chosen_bbox_intersect_threshold,
902  {},
903  fragments_per_device,
904  device_count_);
905  const auto candidate_auto_tuner_cache_key = hashtable_cache_key_.front();
906  if (skip_hashtable_caching) {
907  VLOG(1) << "Skip to add tuned parameters to auto tuner";
908  } else {
909  AutoTunerMetaInfo meta_info{tuning_state.bbox_intersect_max_table_size_bytes,
910  tuning_state.chosen_bbox_intersect_threshold,
912  auto_tuner_cache_->putItemToCache(candidate_auto_tuner_cache_key,
913  meta_info,
916  0,
917  0);
918  }
919  bbox_intersect_bucket_threshold = tuning_state.chosen_bbox_intersect_threshold;
920  reifyImpl(columns_per_device,
921  query_info,
922  layout,
923  shard_count,
924  crt_props.entry_count,
925  crt_props.emitted_keys_count,
926  skip_hashtable_caching,
927  bbox_intersect_max_table_size_bytes,
928  bbox_intersect_bucket_threshold);
929  }
930  }
931 }
932 
934  size_t number_of_dimensions,
935  size_t emitted_keys_count,
936  size_t entry_count) const {
937  const auto key_component_width = getKeyComponentWidth();
938  const auto key_component_count = number_of_dimensions;
939  const auto entry_size = key_component_count * key_component_width;
940  const auto keys_for_all_rows = emitted_keys_count;
941  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
942  const size_t hash_table_size =
943  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
944  return hash_table_size;
945 }
946 
948  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
949  const int device_id,
950  DeviceAllocator* dev_buff_owner) {
951  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
952 
953  std::vector<JoinColumn> join_columns;
954  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
955  std::vector<JoinColumnTypeInfo> join_column_types;
956  std::vector<std::shared_ptr<void>> malloc_owner;
957  for (const auto& inner_outer_pair : inner_outer_pairs_) {
958  const auto inner_col = inner_outer_pair.first;
959  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
960  if (inner_cd && inner_cd->isVirtualCol) {
962  }
963  join_columns.emplace_back(fetchJoinColumn(inner_col,
964  fragments,
965  effective_memory_level,
966  device_id,
967  chunks_owner,
968  dev_buff_owner,
969  malloc_owner,
970  executor_,
971  &column_cache_));
972  const auto& ti = inner_col->get_type_info();
973  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
974  0,
975  0,
976  inline_int_null_value<int64_t>(),
977  false,
978  0,
980  CHECK(ti.is_array())
981  << "Bounding box intersection currently only supported for arrays.";
982  }
983  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
984 }
985 
987  const size_t shard_count,
988  const std::vector<double>& inverse_bucket_sizes_for_dimension,
989  std::vector<ColumnsForDevice>& columns_per_device,
990  const size_t chosen_max_hashtable_size,
991  const double chosen_bucket_threshold) {
992  CHECK(!inverse_bucket_sizes_for_dimension.empty());
993  const auto [tuple_count, emitted_keys_count] =
994  approximateTupleCount(inverse_bucket_sizes_for_dimension,
995  columns_per_device,
996  chosen_max_hashtable_size,
997  chosen_bucket_threshold);
998  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
999 
1000  return std::make_pair(
1001  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
1002  emitted_keys_count);
1003 }
1004 
1006  const std::vector<double>& inverse_bucket_sizes_for_dimension,
1007  std::vector<ColumnsForDevice>& columns_per_device,
1008  const size_t chosen_max_hashtable_size,
1009  const double chosen_bucket_threshold) {
1010  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1011  CountDistinctDescriptor count_distinct_desc{
1013  0,
1014  0,
1015  11,
1016  true,
1017  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
1020  1};
1021  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
1022 
1023  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
1024  if (columns_per_device.front().join_columns.front().num_elems == 0) {
1025  return std::make_pair(0, 0);
1026  }
1027 
1028  // TODO: state management in here should be revisited, but this should be safe enough
1029  // for now
1030  // re-compute bucket counts per device based on global bucket size
1031  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
1032  auto& columns_for_device = columns_per_device[device_id];
1033  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
1035  }
1036 
1037  // Number of keys must match dimension of buckets
1038  CHECK_EQ(columns_per_device.front().join_columns.size(),
1039  columns_per_device.front().join_buckets.size());
1040  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1041  // Note that this path assumes each device has the same hash table (for GPU hash
1042  // join w/ hash table built on CPU)
1043  const auto cached_count_info =
1047  if (cached_count_info) {
1048  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
1049  << ", emitted keys count: " << cached_count_info->second;
1050  return *cached_count_info;
1051  }
1052  int thread_count = cpu_threads();
1053  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
1054  auto hll_result = &hll_buffer_all_cpus[0];
1055 
1056  std::vector<int32_t> num_keys_for_row;
1057  // TODO(adb): support multi-column bounding box intersection
1058  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
1059 
1061  hll_result,
1062  num_keys_for_row,
1063  count_distinct_desc.bitmap_sz_bits,
1064  padded_size_bytes,
1065  columns_per_device.front().join_columns,
1066  columns_per_device.front().join_column_types,
1067  columns_per_device.front().join_buckets,
1068  thread_count);
1069  for (int i = 1; i < thread_count; ++i) {
1070  hll_unify(hll_result,
1071  hll_result + i * padded_size_bytes,
1072  size_t(1) << count_distinct_desc.bitmap_sz_bits);
1073  }
1074  return std::make_pair(
1075  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1076  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
1077  }
1078 #ifdef HAVE_CUDA
1079  auto data_mgr = executor_->getDataMgr();
1080  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
1081  for (auto& host_hll_buffer : host_hll_buffers) {
1082  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
1083  }
1084  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
1085  std::vector<std::future<void>> approximate_distinct_device_threads;
1086  for (int device_id = 0; device_id < device_count_; ++device_id) {
1087  approximate_distinct_device_threads.emplace_back(std::async(
1089  [device_id,
1090  &columns_per_device,
1091  &count_distinct_desc,
1092  data_mgr,
1093  &host_hll_buffers,
1094  &emitted_keys_count_device_threads] {
1095  auto allocator = std::make_unique<CudaAllocator>(
1096  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1097  auto device_hll_buffer =
1098  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1099  data_mgr->getCudaMgr()->zeroDeviceMem(
1100  device_hll_buffer,
1101  count_distinct_desc.bitmapPaddedSizeBytes(),
1102  device_id,
1104  const auto& columns_for_device = columns_per_device[device_id];
1105  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
1106  columns_for_device.join_columns, *allocator);
1107 
1108  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1109  const auto& inverse_bucket_sizes_for_dimension =
1110  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1111  auto inverse_bucket_sizes_gpu = allocator->alloc(
1112  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1113  allocator->copyToDevice(
1114  inverse_bucket_sizes_gpu,
1115  inverse_bucket_sizes_for_dimension.data(),
1116  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1117  const size_t row_counts_buffer_sz =
1118  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
1119  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1120  data_mgr->getCudaMgr()->zeroDeviceMem(
1121  row_counts_buffer,
1122  row_counts_buffer_sz,
1123  device_id,
1125  const auto key_handler = BoundingBoxIntersectKeyHandler(
1126  inverse_bucket_sizes_for_dimension.size(),
1127  join_columns_gpu,
1128  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
1129  const auto key_handler_gpu =
1130  transfer_flat_object_to_gpu(key_handler, *allocator);
1132  reinterpret_cast<uint8_t*>(device_hll_buffer),
1133  count_distinct_desc.bitmap_sz_bits,
1134  reinterpret_cast<int32_t*>(row_counts_buffer),
1135  key_handler_gpu,
1136  columns_for_device.join_columns[0].num_elems);
1137 
1138  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1139  allocator->copyFromDevice(
1140  &host_emitted_keys_count,
1141  row_counts_buffer +
1142  (columns_per_device.front().join_columns[0].num_elems - 1) *
1143  sizeof(int32_t),
1144  sizeof(int32_t));
1145 
1146  auto& host_hll_buffer = host_hll_buffers[device_id];
1147  allocator->copyFromDevice(&host_hll_buffer[0],
1148  device_hll_buffer,
1149  count_distinct_desc.bitmapPaddedSizeBytes());
1150  }));
1151  }
1152  for (auto& child : approximate_distinct_device_threads) {
1153  child.get();
1154  }
1155  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1156  auto& result_hll_buffer = host_hll_buffers.front();
1157  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1158  for (int device_id = 1; device_id < device_count_; ++device_id) {
1159  auto& host_hll_buffer = host_hll_buffers[device_id];
1160  hll_unify(hll_result,
1161  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1162  size_t(1) << count_distinct_desc.bitmap_sz_bits);
1163  }
1164  const size_t emitted_keys_count =
1165  std::accumulate(emitted_keys_count_device_threads.begin(),
1166  emitted_keys_count_device_threads.end(),
1167  0);
1168  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1169  emitted_keys_count);
1170 #else
1171  UNREACHABLE();
1172  return {0, 0};
1173 #endif // HAVE_CUDA
1174 }
1175 
1177  const std::vector<double>& inverse_bucket_sizes,
1178  std::vector<ColumnsForDevice>& columns_per_device,
1179  const size_t device_count) {
1180  // set global bucket size
1181  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1182 
1183  // re-compute bucket counts per device based on global bucket size
1184  CHECK_EQ(columns_per_device.size(), static_cast<size_t>(device_count));
1185  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1186  auto& columns_for_device = columns_per_device[device_id];
1187  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1189  }
1190 }
1191 
1193  return 8;
1194 }
1195 
1199 }
1200 
1202  auto timer = DEBUG_TIMER(__func__);
1203  CHECK_LT(0, device_count_);
1205 
1206  CHECK(condition_->is_bbox_intersect_oper());
1207  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1208  HashType layout;
1209  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1210  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1211  // bounds array
1212  layout = HashType::ManyToMany;
1213  } else {
1214  layout = HashType::OneToMany;
1215  }
1216  try {
1217  reifyWithLayout(layout);
1218  return;
1219  } catch (const JoinHashTableTooBig& e) {
1220  throw e;
1221  } catch (const std::exception& e) {
1222  VLOG(1) << "Caught exception while building baseline hash table for bounding box "
1223  "intersection: "
1224  << e.what();
1225  throw;
1226  }
1227 }
1228 
1230  std::vector<ColumnsForDevice>& columns_per_device,
1231  const Fragmenter_Namespace::TableInfo& query_info,
1232  const HashType layout,
1233  const size_t shard_count,
1234  const size_t entry_count,
1235  const size_t emitted_keys_count,
1236  const bool skip_hashtable_caching,
1237  const size_t chosen_max_hashtable_size,
1238  const double chosen_bucket_threshold) {
1239  std::vector<std::future<void>> init_threads;
1240  chosen_bbox_intersect_bucket_threshold_ = chosen_bucket_threshold;
1241  chosen_bbox_intersect_max_table_size_bytes_ = chosen_max_hashtable_size;
1245 
1246  for (int device_id = 0; device_id < device_count_; ++device_id) {
1247  const auto fragments =
1248  shard_count
1249  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1250  : query_info.fragments;
1251  init_threads.push_back(std::async(std::launch::async,
1253  this,
1254  columns_per_device[device_id],
1255  layout,
1256  entry_count,
1257  emitted_keys_count,
1258  skip_hashtable_caching,
1259  device_id,
1261  }
1262  for (auto& init_thread : init_threads) {
1263  init_thread.wait();
1264  }
1265  for (auto& init_thread : init_threads) {
1266  init_thread.get();
1267  }
1268 }
1269 
1271  const ColumnsForDevice& columns_for_device,
1272  const HashType layout,
1273  const size_t entry_count,
1274  const size_t emitted_keys_count,
1275  const bool skip_hashtable_caching,
1276  const int device_id,
1277  const logger::ThreadLocalIds parent_thread_local_ids) {
1278  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1279  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
1280  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1282  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1283  BaselineHashTableEntryInfo hash_table_entry_info(entry_count,
1284  emitted_keys_count,
1285  sizeof(int32_t),
1288  layout,
1289  false);
1290  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1291  VLOG(1) << "Building join hash table for bounding box intersection on CPU.";
1292  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1293  columns_for_device.join_column_types,
1294  columns_for_device.join_buckets,
1295  hash_table_entry_info,
1296  skip_hashtable_caching);
1297  CHECK(hash_table);
1298 
1299 #ifdef HAVE_CUDA
1301  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table, device_id);
1302  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1303  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1304  } else {
1305 #else
1306  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1307 #endif
1308  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1309  hash_tables_for_device_[0] = hash_table;
1310 #ifdef HAVE_CUDA
1311  }
1312 #endif
1313  } else {
1314 #ifdef HAVE_CUDA
1315  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1316  columns_for_device.join_column_types,
1317  columns_for_device.join_buckets,
1318  hash_table_entry_info,
1319  device_id);
1320  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1321  hash_tables_for_device_[device_id] = std::move(hash_table);
1322 #else
1323  UNREACHABLE();
1324 #endif
1325  }
1326 }
1327 
1329  const std::vector<JoinColumn>& join_columns,
1330  const std::vector<JoinColumnTypeInfo>& join_column_types,
1331  const std::vector<JoinBucketInfo>& join_bucket_info,
1332  const BaselineHashTableEntryInfo hash_table_entry_info,
1333  const bool skip_hashtable_caching) {
1334  auto timer = DEBUG_TIMER(__func__);
1335  decltype(std::chrono::steady_clock::now()) ts1, ts2;
1336  ts1 = std::chrono::steady_clock::now();
1337  CHECK(!join_columns.empty());
1338  CHECK(!join_bucket_info.empty());
1339  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1340  auto const hash_table_layout = hash_table_entry_info.getHashTableLayout();
1341  if (auto generic_hash_table =
1345  if (auto hash_table =
1346  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1347  VLOG(1) << "Using cached CPU hash table for initialization.";
1348  // See if a hash table of a different layout was returned.
1349  // If it was OneToMany, we can reuse it on ManyToMany.
1350  if (hash_table_layout == HashType::ManyToMany &&
1351  hash_table->getLayout() == HashType::OneToMany) {
1352  // use the cached hash table
1354  return hash_table;
1355  }
1356  if (hash_table_layout == hash_table->getLayout()) {
1357  return hash_table;
1358  }
1359  }
1360  }
1361  CHECK(layoutRequiresAdditionalBuffers(hash_table_layout));
1362  const auto key_component_count =
1363  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1364 
1365  const auto key_handler = BoundingBoxIntersectKeyHandler(
1366  key_component_count,
1367  &join_columns[0],
1368  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1371  dummy_str_proxy_translation_maps_ptrs_and_offsets;
1372  const auto err =
1373  builder.initHashTableOnCpu(&key_handler,
1375  join_columns,
1376  join_column_types,
1377  join_bucket_info,
1378  dummy_str_proxy_translation_maps_ptrs_and_offsets,
1379  hash_table_entry_info,
1380  join_type_,
1381  executor_,
1382  query_hints_);
1383  ts2 = std::chrono::steady_clock::now();
1384  if (err) {
1385  throw HashJoinFail(std::string("Unrecognized error when initializing CPU hash table "
1386  "for bounding box intersection(") +
1387  std::to_string(err) + std::string(")"));
1388  }
1389  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1390  if (skip_hashtable_caching) {
1391  VLOG(1) << "Skip to cache join hashtable for bounding box intersection";
1392  } else {
1393  auto hashtable_build_time =
1394  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1397  hash_table,
1399  hashtable_build_time);
1400  }
1401  return hash_table;
1402 }
1403 
1404 #ifdef HAVE_CUDA
1405 
1406 std::shared_ptr<BaselineHashTable> BoundingBoxIntersectJoinHashTable::initHashTableOnGpu(
1407  const std::vector<JoinColumn>& join_columns,
1408  const std::vector<JoinColumnTypeInfo>& join_column_types,
1409  const std::vector<JoinBucketInfo>& join_bucket_info,
1410  const BaselineHashTableEntryInfo hash_table_entry_info,
1411  const size_t device_id) {
1413 
1414  VLOG(1) << "Building join hash table for bounding box intersection on GPU.";
1415 
1417  auto data_mgr = executor_->getDataMgr();
1418  CudaAllocator allocator(
1419  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1420  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1421  CHECK_EQ(join_columns.size(), 1u);
1422  CHECK(!join_bucket_info.empty());
1423  auto& inverse_bucket_sizes_for_dimension =
1424  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1425  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1426  inverse_bucket_sizes_for_dimension, allocator);
1427  const auto key_handler =
1428  BoundingBoxIntersectKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1429  join_columns_gpu,
1430  inverse_bucket_sizes_gpu);
1431 
1432  const auto err = builder.initHashTableOnGpu(&key_handler,
1433  join_columns,
1434  join_type_,
1435  hash_table_entry_info,
1436  device_id,
1437  executor_,
1438  query_hints_);
1439  if (err) {
1440  throw HashJoinFail(std::string("Unrecognized error when initializing GPU hash table "
1441  "for bounding box intersection (") +
1442  std::to_string(err) + std::string(")"));
1443  }
1444  return builder.getHashTable();
1445 }
1446 
1447 std::shared_ptr<BaselineHashTable>
1448 BoundingBoxIntersectJoinHashTable::copyCpuHashTableToGpu(
1449  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
1450  const size_t device_id) {
1452 
1453  auto data_mgr = executor_->getDataMgr();
1454 
1455  // copy hash table to GPU
1456  BaselineJoinHashTableBuilder gpu_builder;
1457  gpu_builder.allocateDeviceMemory(
1458  cpu_hash_table->getHashTableEntryInfo(), device_id, executor_, query_hints_);
1459  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1460  CHECK(gpu_hash_table);
1461  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1462  CHECK(gpu_buffer_ptr);
1463 
1464  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1465  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1466  auto device_allocator = std::make_unique<CudaAllocator>(
1467  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1468  device_allocator->copyToDevice(
1469  gpu_buffer_ptr,
1470  cpu_hash_table->getCpuBuffer(),
1471  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1472  return gpu_hash_table;
1473 }
1474 
1475 #endif // HAVE_CUDA
1476 
1477 #define LL_CONTEXT executor_->cgen_state_->context_
1478 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1479 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1480 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1481 #define ROW_FUNC executor_->cgen_state_->row_func_
1482 
1484  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1485  const auto key_component_width = getKeyComponentWidth();
1486  CHECK(key_component_width == 4 || key_component_width == 8);
1487  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1488  llvm::Value* key_buff_lv{nullptr};
1489  switch (key_component_width) {
1490  case 4:
1491  key_buff_lv =
1492  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1493  break;
1494  case 8:
1495  key_buff_lv =
1496  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1497  break;
1498  default:
1499  CHECK(false);
1500  }
1501 
1502  const auto& inner_outer_pair = inner_outer_pairs_[0];
1503  const auto outer_geo = inner_outer_pair.second;
1504  const auto outer_geo_ti = outer_geo->get_type_info();
1505 
1506  llvm::Value* arr_ptr = nullptr;
1507  CodeGenerator code_generator(executor_);
1508  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1509 
1510  if (outer_geo_ti.is_geometry()) {
1511  // TODO(adb): for points we will use the coords array, but for other geometries we
1512  // will need to use the bounding box. For now only support points.
1513  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1514 
1515  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1516  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1517  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1518  auto column_key = outer_geo_col->getColumnKey();
1519  column_key.column_id = column_key.column_id + 1;
1520  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(column_key);
1521  CHECK(coords_cd);
1522 
1523  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1524  "array_buff",
1525  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1526  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1527  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1528  << "Bounding box intersection only supports TINYINT coordinates columns.";
1529  arr_ptr = code_generator.castArrayPointer(array_ptr,
1530  coords_cd->columnType.get_elem_type());
1531  } else if (const auto outer_geo_function_operator =
1532  dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1533  // Process points dynamically constructed by geo function operators
1534  const auto outer_geo_function_operator_lvs =
1535  code_generator.codegen(outer_geo_function_operator, true, co);
1536  CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1537  arr_ptr = outer_geo_function_operator_lvs.front();
1538  } else if (const auto outer_geo_expr =
1539  dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1540  UNREACHABLE() << outer_geo_expr->toString();
1541  }
1542  } else if (outer_geo_ti.is_fixlen_array()) {
1543  // Process dynamically constructed points
1544  const auto outer_geo_cast_coord_array =
1545  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1546  CHECK_EQ(outer_geo_cast_coord_array->get_optype(), kCAST);
1547  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1548  outer_geo_cast_coord_array->get_operand());
1549  CHECK(outer_geo_coord_array);
1550  CHECK(outer_geo_coord_array->isLocalAlloc());
1551  CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1552  auto elem_size = (outer_geo_ti.get_compression() == kENCODING_GEOINT)
1553  ? sizeof(int32_t)
1554  : sizeof(double);
1555  CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1556  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1557  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1558  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1559  arr_ptr = LL_BUILDER.CreateGEP(
1560  array_ptr->getType()->getScalarType()->getPointerElementType(),
1561  array_ptr,
1562  LL_INT(0));
1563  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1564  }
1565  if (!arr_ptr) {
1566  LOG(FATAL)
1567  << "Bounding box intersection currently only supports geospatial columns and "
1568  "constructed points.";
1569  }
1570 
1571  for (size_t i = 0; i < 2; i++) {
1572  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1573  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1574  key_buff_lv,
1575  LL_INT(i));
1576 
1577  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1578  // future compression schemes
1579  auto bucket_key =
1580  outer_geo_ti.get_compression() == kENCODING_GEOINT
1581  ? executor_->cgen_state_->emitExternalCall(
1582  "get_bucket_key_for_range_compressed",
1583  get_int_type(64, LL_CONTEXT),
1584  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1585  : executor_->cgen_state_->emitExternalCall(
1586  "get_bucket_key_for_range_double",
1587  get_int_type(64, LL_CONTEXT),
1588  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1589  const auto col_lv = LL_BUILDER.CreateSExt(
1590  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1591  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1592  }
1593  return key_buff_lv;
1594 }
1595 
1597  const CompilationOptions& co) {
1598  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1599  const auto key_component_width = getKeyComponentWidth();
1600  CHECK(key_component_width == 4 || key_component_width == 8);
1601  auto hash_table = getHashTableForDevice(size_t(0));
1602  CHECK(hash_table);
1604 
1605  VLOG(1) << "Performing codgen for ManyToMany";
1606  const auto& inner_outer_pair = inner_outer_pairs_[0];
1607  const auto outer_col = inner_outer_pair.second;
1608 
1609  CodeGenerator code_generator(executor_);
1610  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1611  CHECK_EQ(col_lvs.size(), size_t(1));
1612 
1613  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1614  CHECK(outer_col_var);
1615  const auto coords_cd =
1616  Catalog_Namespace::get_metadata_for_column(outer_col_var->getColumnKey());
1617  CHECK(coords_cd);
1618 
1619  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1620  "array_buff",
1621  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1622  {col_lvs.front(), code_generator.posArg(outer_col)});
1623 
1624  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1625  // const auto arr_ptr =
1626  // code_generator.castArrayPointer(array_ptr,
1627  // coords_cd->columnType.get_elem_type());
1628  array_ptr->setName("array_ptr");
1629 
1630  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1631  "get_num_buckets_for_bounds",
1632  get_int_type(32, LL_CONTEXT),
1633  {array_ptr,
1634  LL_INT(0),
1635  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1636  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1637  num_keys_lv->setName("num_keys_lv");
1638 
1639  return {num_keys_lv, array_ptr};
1640 }
1641 
1643  const CompilationOptions& co,
1644  const size_t index) {
1645  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1646  if (getHashType() == HashType::ManyToMany) {
1647  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1648  const auto key_component_width = getKeyComponentWidth();
1649  CHECK(key_component_width == 4 || key_component_width == 8);
1650  auto many_to_many_args = codegenManyKey(co);
1651  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1652  const auto composite_dict_ptr_type =
1653  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1654  const auto composite_key_dict =
1655  hash_ptr->getType()->isPointerTy()
1656  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1657  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1658  const auto key_component_count = getKeyComponentCount();
1659 
1660  auto one_to_many_ptr = hash_ptr;
1661 
1662  if (one_to_many_ptr->getType()->isPointerTy()) {
1663  one_to_many_ptr =
1664  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1665  } else {
1666  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1667  }
1668 
1669  const auto composite_key_dict_size = offsetBufferOff();
1670  one_to_many_ptr =
1671  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1672 
1673  const auto arr_type = get_int_array_type(32, kMaxBBoxOverlapsCount, LL_CONTEXT);
1674  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1675  out_arr_lv->setName("out_arr");
1676 
1677  const auto casted_out_arr_lv =
1678  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1679 
1680  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1681 
1682  auto rowid_ptr_i32 =
1683  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1684 
1685  const auto error_code_ptr = LL_BUILDER.CreateAlloca(
1686  get_int_type(32, LL_CONTEXT), nullptr, "candidate_rows_error_code");
1687  LL_BUILDER.CreateStore(LL_INT(int32_t(0)), error_code_ptr);
1688 
1689  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1690  "get_candidate_rows",
1691  llvm::Type::getInt64Ty(LL_CONTEXT),
1692  {rowid_ptr_i32,
1693  error_code_ptr,
1695  many_to_many_args[1],
1696  LL_INT(0),
1699  many_to_many_args[0],
1700  LL_INT(key_component_count), // key_component_count
1701  composite_key_dict, // ptr to hash table
1702  LL_INT(getEntryCount()), // entry_count
1703  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1704  LL_INT(getEntryCount() * sizeof(int32_t)), // sub_buff_size
1705  LL_INT(int32_t(heavyai::ErrorCode::BBOX_OVERLAPS_LIMIT_EXCEEDED))});
1706 
1707  const auto slot_lv = LL_INT(int64_t(0));
1708  auto error_code_lv = LL_BUILDER.CreateLoad(
1709  error_code_ptr->getType()->getPointerElementType(), error_code_ptr);
1710  return {rowid_ptr_i32, candidate_count_lv, slot_lv, error_code_lv};
1711  } else {
1712  VLOG(1) << "Building codegenMatchingSet for Baseline";
1713  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1714  const auto key_component_width = getKeyComponentWidth();
1715  CHECK(key_component_width == 4 || key_component_width == 8);
1716  auto key_buff_lv = codegenKey(co);
1718  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1719  const auto composite_dict_ptr_type =
1720  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1721  const auto composite_key_dict =
1722  hash_ptr->getType()->isPointerTy()
1723  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1724  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1725  const auto key_component_count = getKeyComponentCount();
1726  const auto key = executor_->cgen_state_->emitExternalCall(
1727  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1728  get_int_type(64, LL_CONTEXT),
1729  {key_buff_lv,
1730  LL_INT(key_component_count),
1731  composite_key_dict,
1732  LL_INT(getEntryCount())});
1733  auto one_to_many_ptr = hash_ptr;
1734  if (one_to_many_ptr->getType()->isPointerTy()) {
1735  one_to_many_ptr =
1736  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1737  } else {
1738  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1739  }
1740  const auto composite_key_dict_size = offsetBufferOff();
1741  one_to_many_ptr =
1742  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1744  std::vector<llvm::Value*>{
1745  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1746  false,
1747  false,
1748  false,
1750  executor_);
1751  }
1752  UNREACHABLE();
1753  return HashJoinMatchingSet{};
1754 }
1755 
1757  const ExecutorDeviceType device_type,
1758  const int device_id,
1759  bool raw) const {
1760  auto buffer = getJoinHashBuffer(device_type, device_id);
1761  if (!buffer) {
1762  return "EMPTY";
1763  }
1764  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1765  auto hash_table = hash_tables_for_device_[device_id];
1766  CHECK(hash_table);
1767  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1768 #ifdef HAVE_CUDA
1769  std::unique_ptr<int8_t[]> buffer_copy;
1770  if (device_type == ExecutorDeviceType::GPU) {
1771  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1772  CHECK(executor_);
1773  auto data_mgr = executor_->getDataMgr();
1774  auto device_allocator = std::make_unique<CudaAllocator>(
1775  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1776 
1777  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1778  }
1779  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1780 #else
1781  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1782 #endif // HAVE_CUDA
1783  auto ptr2 = ptr1 + offsetBufferOff();
1784  auto ptr3 = ptr1 + countBufferOff();
1785  auto ptr4 = ptr1 + payloadBufferOff();
1786  CHECK(hash_table);
1787  const auto layout = getHashType();
1788  return HashTable::toString(
1789  "geo",
1790  getHashTypeString(layout),
1791  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1793  hash_table->getEntryCount(),
1794  ptr1,
1795  ptr2,
1796  ptr3,
1797  ptr4,
1798  buffer_size,
1799  raw);
1800 }
1801 
1802 std::set<DecodedJoinHashBufferEntry> BoundingBoxIntersectJoinHashTable::toSet(
1803  const ExecutorDeviceType device_type,
1804  const int device_id) const {
1805  auto buffer = getJoinHashBuffer(device_type, device_id);
1806  auto hash_table = getHashTableForDevice(device_id);
1807  CHECK(hash_table);
1808  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1809 #ifdef HAVE_CUDA
1810  std::unique_ptr<int8_t[]> buffer_copy;
1811  if (device_type == ExecutorDeviceType::GPU) {
1812  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1813  CHECK(executor_);
1814  auto data_mgr = executor_->getDataMgr();
1815  auto allocator = std::make_unique<CudaAllocator>(
1816  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1817 
1818  allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1819  }
1820  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1821 #else
1822  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1823 #endif // HAVE_CUDA
1824  auto ptr2 = ptr1 + offsetBufferOff();
1825  auto ptr3 = ptr1 + countBufferOff();
1826  auto ptr4 = ptr1 + payloadBufferOff();
1827  const auto layout = getHashType();
1828  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1830  hash_table->getEntryCount(),
1831  ptr1,
1832  ptr2,
1833  ptr3,
1834  ptr4,
1835  buffer_size);
1836 }
1837 
1839  const std::vector<InnerOuter>& inner_outer_pairs) const {
1842  this->executor_->getDataMgr()->gpusPresent() &&
1845  }
1846  // otherwise, try to build on CPU
1848 }
1849 
1851  try {
1853  } catch (...) {
1854  CHECK(false);
1855  }
1856  return {};
1857 }
1858 
1860  QueryPlanHash key,
1861  CacheItemType item_type,
1862  DeviceIdentifier device_identifier) {
1863  auto timer = DEBUG_TIMER(__func__);
1864  VLOG(1) << "Checking CPU hash table cache.";
1866  HashtableCacheMetaInfo meta_info;
1868  auto cached_hashtable =
1869  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1870  if (cached_hashtable) {
1871  return cached_hashtable;
1872  }
1873  return nullptr;
1874 }
1875 
1876 std::optional<std::pair<size_t, size_t>>
1878  QueryPlanHash key,
1879  CacheItemType item_type,
1880  DeviceIdentifier device_identifier) {
1882  HashtableCacheMetaInfo metaInfo;
1884  auto cached_hashtable =
1885  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1886  if (cached_hashtable) {
1887  return std::make_pair(cached_hashtable->getEntryCount() / 2,
1888  cached_hashtable->getEmittedKeysCount());
1889  }
1890  return std::nullopt;
1891 }
1892 
1894  QueryPlanHash key,
1895  CacheItemType item_type,
1896  std::shared_ptr<HashTable> hashtable_ptr,
1897  DeviceIdentifier device_identifier,
1898  size_t hashtable_building_time) {
1900  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1901  HashtableCacheMetaInfo meta_info;
1903  meta_info.registered_query_hint = query_hints_;
1904  hash_table_cache_->putItemToCache(
1905  key,
1906  hashtable_ptr,
1907  item_type,
1908  device_identifier,
1909  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1910  hashtable_building_time,
1911  meta_info);
1912 }
1913 
1915  return condition_->get_optype() == kBW_EQ;
1916 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:461
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
static std::unique_ptr< BoundingBoxIntersectTuningParamRecycler > auto_tuner_cache_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t DeviceIdentifier
Definition: DataRecycler.h:129
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
void setBoundingBoxIntersectionMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
JoinType
Definition: sqldefs.h:238
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:259
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
llvm::Value * codegenKey(const CompilationOptions &)
shared::TableKey getInnerTableId() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:130
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
double bbox_intersect_keys_per_bin
Definition: QueryHint.h:368
#define LOG(tag)
Definition: Logger.h:285
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:60
void reify(const HashType preferred_layout)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:590
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:378
const std::shared_ptr< Analyzer::BinOper > condition_
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const BaselineHashTableEntryInfo hash_table_entry_info, const bool skip_hashtable_caching)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
#define CHECK_GE(x, y)
Definition: Logger.h:306
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
Definition: sqldefs.h:51
double g_bbox_intersect_target_entries_per_bin
Definition: Execute.cpp:115
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
void allocateDeviceMemory(const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
ExecutorDeviceType
std::string to_string(char const *&&v)
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
std::optional< BoundingBoxIntersectMetaInfo > bbox_intersect_meta_info
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
future< Result > async(Fn &&fn, Args &&...args)
std::optional< BoundingBoxIntersectMetaInfo > getBoundingBoxIntersectMetaInfo()
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
CacheItemType
Definition: DataRecycler.h:38
HashType getHashTableLayout() const
Definition: HashTable.h:53
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const Data_Namespace::MemoryLevel memory_level_
double bbox_intersect_bucket_threshold
Definition: QueryHint.h:363
static std::unique_ptr< HashtableRecycler > hash_table_cache_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::shared_ptr< BoundingBoxIntersectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const std::vector< InputTableInfo > & query_infos_
static constexpr size_t MAX_NUM_HASH_ENTRIES
Definition: HashJoin.h:137
executor_(executor)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:315
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBoundingBoxIntersection &info)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t payloadBufferOff() const noexceptoverride
void approximate_distinct_tuples_on_device_bbox_intersect(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
TuningState(const size_t bbox_intersect_max_table_size_bytes, const double bbox_intersect_target_entries_per_bin)
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:296
virtual shared::TableKey getInnerTableId() const noexcept=0
#define VLOGGING(n)
Definition: Logger.h:289
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
#define CHECK_LT(x, y)
Definition: Logger.h:303
HashType getHashType() const noexceptoverride
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:180
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:398
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const BaselineHashTableEntryInfo hash_table_entry_info, const JoinType join_type, const Executor *executor, const RegisteredQueryHint &query_hint)
bool operator()(const HashTableProps &new_props, const bool new_bbox_intersect_threshold)
virtual void reifyWithLayout(const HashType layout)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
QueryEngine enum classes with minimal #include files.
size_t QueryPlanHash
void approximate_distinct_tuples_bbox_intersect(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
size_t bbox_intersect_max_size
Definition: QueryHint.h:365
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool force_baseline_hash_join
Definition: QueryHint.h:374
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3548
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Definition: sqldefs.h:33
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1015
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
constexpr int32_t kMaxBBoxOverlapsCount
bool bbox_intersect_allow_gpu_build
Definition: QueryHint.h:366
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
ThreadId thread_id_
Definition: Logger.h:138
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:882
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:388
bool force_one_to_many_hash_join
Definition: QueryHint.h:375
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:176
size_t g_bbox_intersect_max_table_size_bytes
Definition: Execute.cpp:114
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:470
size_t getComponentBufferSize() const noexceptoverride