OmniSciDB  f17484ade4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BoundingBoxIntersectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
29 
30 std::unique_ptr<HashtableRecycler> BoundingBoxIntersectJoinHashTable::hash_table_cache_ =
31  std::make_unique<HashtableRecycler>(CacheItemType::BBOX_INTERSECT_HT,
33 std::unique_ptr<BoundingBoxIntersectTuningParamRecycler>
35  std::make_unique<BoundingBoxIntersectTuningParamRecycler>();
36 
38 std::shared_ptr<BoundingBoxIntersectJoinHashTable>
40  const std::shared_ptr<Analyzer::BinOper> condition,
41  const std::vector<InputTableInfo>& query_infos,
42  const Data_Namespace::MemoryLevel memory_level,
43  const JoinType join_type,
44  const int device_count,
45  ColumnCacheMap& column_cache,
46  Executor* executor,
47  const HashTableBuildDagMap& hashtable_build_dag_map,
48  const RegisteredQueryHint& query_hints,
49  const TableIdToNodeMap& table_id_to_node_map) {
50  decltype(std::chrono::steady_clock::now()) ts1, ts2;
51  auto copied_query_hints = query_hints;
52  if (query_hints.force_one_to_many_hash_join) {
53  LOG(INFO) << "Ignoring query hint \'force_one_to_many_hash_join\' for bounding box "
54  "intersection";
55  copied_query_hints.force_one_to_many_hash_join = false;
56  }
57  if (query_hints.force_baseline_hash_join) {
58  LOG(INFO) << "Ignoring query hint \'force_baseline_hash_join\' for bounding box "
59  "intersection";
60  copied_query_hints.force_baseline_hash_join = false;
61  }
62  std::vector<InnerOuter> inner_outer_pairs;
63  if (const auto range_expr =
64  dynamic_cast<const Analyzer::RangeOper*>(condition->get_right_operand())) {
65  return RangeJoinHashTable::getInstance(condition,
66  range_expr,
67  query_infos,
68  memory_level,
69  join_type,
70  device_count,
71  column_cache,
72  executor,
73  hashtable_build_dag_map,
74  copied_query_hints,
75  table_id_to_node_map);
76  } else {
77  inner_outer_pairs =
78  HashJoin::normalizeColumnPairs(condition.get(), executor->getTemporaryTables())
79  .first;
80  }
81  CHECK(!inner_outer_pairs.empty());
82 
83  const auto getHashTableType =
84  [](const std::shared_ptr<Analyzer::BinOper> condition,
85  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
87  if (condition->is_bbox_intersect_oper()) {
88  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
89  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
90  inner_outer_pairs[0].second->get_type_info().is_array() &&
91  // Bounds vs constructed points, former should yield ManyToMany
92  inner_outer_pairs[0].second->get_type_info().get_size() == 32) {
93  layout = HashType::ManyToMany;
94  }
95  }
96  return layout;
97  };
98 
99  const auto layout = getHashTableType(condition, inner_outer_pairs);
100 
101  if (VLOGGING(1)) {
102  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
103  << " for qual: " << condition->toString();
104  ts1 = std::chrono::steady_clock::now();
105  }
106 
107  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
108  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
109 
110  VLOG(1) << "table_key = " << query_infos[0].table_key << " has " << qi_0 << " tuples.";
111  VLOG(1) << "table_key = " << query_infos[1].table_key << " has " << qi_1 << " tuples.";
112 
113  const auto& query_info =
114  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
115  .info;
116  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
117  if (total_entries > HashJoin::MAX_NUM_HASH_ENTRIES) {
118  throw TooManyHashEntries();
119  }
120 
121  auto join_hash_table =
122  std::make_shared<BoundingBoxIntersectJoinHashTable>(condition,
123  join_type,
124  query_infos,
125  memory_level,
126  column_cache,
127  executor,
128  inner_outer_pairs,
129  device_count,
130  copied_query_hints,
131  hashtable_build_dag_map,
132  table_id_to_node_map);
133  try {
134  join_hash_table->reify(layout);
135  } catch (const HashJoinFail& e) {
136  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
137  "involved in bounding box intersection | ") +
138  e.what());
139  } catch (const ColumnarConversionNotSupported& e) {
140  throw HashJoinFail(
141  std::string("Could not build hash tables for bounding box intersection | "
142  "Inner table too big. Attempt manual table reordering "
143  "or create a single fragment inner table. | ") +
144  e.what());
145  } catch (const JoinHashTableTooBig& e) {
146  throw e;
147  } catch (const std::exception& e) {
148  throw HashJoinFail(
149  std::string("Failed to build hash tables for bounding box intersection | ") +
150  e.what());
151  }
152  if (VLOGGING(1)) {
153  ts2 = std::chrono::steady_clock::now();
154  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
155  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
156  << " ms";
157  }
158  return join_hash_table;
159 }
160 
161 namespace {
162 
164  const std::vector<double>& bucket_sizes,
165  const std::vector<double>& bucket_thresholds,
166  const double initial_value) {
167  std::vector<double> corrected_bucket_sizes(bucket_sizes);
168  for (size_t i = 0; i != bucket_sizes.size(); ++i) {
169  if (bucket_sizes[i] == initial_value) {
170  corrected_bucket_sizes[i] = bucket_thresholds[i];
171  }
172  }
173  return corrected_bucket_sizes;
174 }
175 
176 std::vector<double> compute_bucket_sizes(
177  const std::vector<double>& bucket_thresholds,
178  const Data_Namespace::MemoryLevel effective_memory_level,
179  const JoinColumn& join_column,
180  const JoinColumnTypeInfo& join_column_type,
181  const std::vector<InnerOuter>& inner_outer_pairs,
182  const Executor* executor) {
183  // No coalesced keys for bounding box intersection yet
184  CHECK_EQ(inner_outer_pairs.size(), 1u);
185 
186  const auto col = inner_outer_pairs[0].first;
187  CHECK(col);
188  const auto col_ti = col->get_type_info();
189  CHECK(col_ti.is_array());
190 
191  // TODO: Compute the number of dimensions for keys used to perform bounding box
192  // intersection
193  const size_t num_dims{2};
194  const double initial_bin_value{0.0};
195  std::vector<double> bucket_sizes(num_dims, initial_bin_value);
196  CHECK_EQ(bucket_thresholds.size(), num_dims);
197 
198  VLOG(1) << "Computing x and y bucket sizes for bounding box intersection with maximum "
199  "bucket size "
200  << std::to_string(bucket_thresholds[0]) << ", "
201  << std::to_string(bucket_thresholds[1]);
202 
203  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
204  const int thread_count = cpu_threads();
206  bucket_sizes, join_column, join_column_type, bucket_thresholds, thread_count);
207  }
208 #ifdef HAVE_CUDA
209  else {
210  // Note that we compute the bucket sizes using only a single GPU
211  const int device_id = 0;
212  auto data_mgr = executor->getDataMgr();
213  CudaAllocator allocator(
214  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
215  auto device_bucket_sizes_gpu =
216  transfer_vector_of_flat_objects_to_gpu(bucket_sizes, allocator);
217  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
218  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
219  auto device_bucket_thresholds_gpu =
220  transfer_vector_of_flat_objects_to_gpu(bucket_thresholds, allocator);
221 
222  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
223  join_column_gpu,
224  join_column_type_gpu,
225  device_bucket_thresholds_gpu);
226  allocator.copyFromDevice(reinterpret_cast<int8_t*>(bucket_sizes.data()),
227  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
228  bucket_sizes.size() * sizeof(double));
229  }
230 #endif
231  const auto corrected_bucket_sizes = correct_uninitialized_bucket_sizes_to_thresholds(
232  bucket_sizes, bucket_thresholds, initial_bin_value);
233 
234  VLOG(1) << "Computed x and y bucket sizes for bounding box intersection: ("
235  << corrected_bucket_sizes[0] << ", " << corrected_bucket_sizes[1] << ")";
236 
237  return corrected_bucket_sizes;
238 }
239 
241  HashTableProps(const size_t entry_count,
242  const size_t emitted_keys_count,
243  const size_t hash_table_size,
244  const std::vector<double>& bucket_sizes)
245  : entry_count(entry_count)
246  , emitted_keys_count(emitted_keys_count)
247  , keys_per_bin(entry_count == 0 ? std::numeric_limits<double>::max()
248  : emitted_keys_count / (entry_count / 2.0))
249  , hash_table_size(hash_table_size)
250  , bucket_sizes(bucket_sizes) {}
251 
252  static HashTableProps invalid() { return HashTableProps(0, 0, 0, {}); }
253 
254  size_t entry_count;
256  double keys_per_bin;
258  std::vector<double> bucket_sizes;
259 };
260 
261 std::ostream& operator<<(std::ostream& os, const HashTableProps& props) {
262  os << " entry_count: " << props.entry_count << ", emitted_keys "
263  << props.emitted_keys_count << ", hash table size " << props.hash_table_size
264  << ", keys per bin " << props.keys_per_bin;
265  return os;
266 }
267 
268 struct TuningState {
269  TuningState(const size_t bbox_intersect_max_table_size_bytes,
270  const double bbox_intersect_target_entries_per_bin)
271  : crt_props(HashTableProps::invalid())
272  , prev_props(HashTableProps::invalid())
273  , chosen_bbox_intersect_threshold(-1)
274  , crt_step(0)
275  , crt_reverse_search_iteration(0)
276  , bbox_intersect_max_table_size_bytes(bbox_intersect_max_table_size_bytes)
277  , bbox_intersect_target_entries_per_bin(bbox_intersect_target_entries_per_bin) {}
278 
279  // current and previous props, allows for easy backtracking
282 
283  // value we are tuning for
285  enum class TuningDirection { SMALLER, LARGER };
286  TuningDirection tuning_direction{TuningDirection::SMALLER};
287 
288  // various constants / state
289  size_t crt_step; // 1 indexed
290  size_t crt_reverse_search_iteration; // 1 indexed
293  const size_t max_reverse_search_iterations{8};
294 
298  bool operator()(const HashTableProps& new_props,
299  const bool new_bbox_intersect_threshold) {
300  prev_props = crt_props;
301  crt_props = new_props;
302  crt_step++;
303 
304  if (hashTableTooBig() || keysPerBinIncreasing()) {
305  if (hashTableTooBig()) {
306  VLOG(1) << "Reached hash table size limit: "
307  << bbox_intersect_max_table_size_bytes << " with "
308  << crt_props.hash_table_size << " byte hash table, "
309  << crt_props.keys_per_bin << " keys per bin.";
310  } else if (keysPerBinIncreasing()) {
311  VLOG(1) << "Keys per bin increasing from " << prev_props.keys_per_bin << " to "
312  << crt_props.keys_per_bin;
313  CHECK(previousIterationValid());
314  }
315  if (previousIterationValid()) {
316  VLOG(1) << "Using previous threshold value " << chosen_bbox_intersect_threshold;
317  crt_props = prev_props;
318  return false;
319  } else {
320  CHECK(hashTableTooBig());
321  crt_reverse_search_iteration++;
322  chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
323 
324  if (crt_reverse_search_iteration == max_reverse_search_iterations) {
325  VLOG(1) << "Hit maximum number (" << max_reverse_search_iterations
326  << ") of reverse tuning iterations. Aborting tuning";
327  // use the crt props, but don't bother trying to tune any farther
328  return false;
329  }
330 
331  if (crt_reverse_search_iteration > 1 &&
332  crt_props.hash_table_size == prev_props.hash_table_size) {
333  // hash table size is not changing, bail
334  VLOG(1) << "Hash table size not decreasing (" << crt_props.hash_table_size
335  << " bytes) and still above maximum allowed size ("
336  << bbox_intersect_max_table_size_bytes << " bytes). Aborting tuning";
337  return false;
338  }
339 
340  // if the hash table is too big on the very first step, change direction towards
341  // larger bins to see if a slightly smaller hash table will fit
342  if (crt_step == 1 && crt_reverse_search_iteration == 1) {
343  VLOG(1)
344  << "First iteration of tuning led to hash table size over "
345  "limit. Reversing search to try larger bin sizes (previous threshold: "
346  << chosen_bbox_intersect_threshold << ")";
347  // Need to change direction of tuning to tune "up" towards larger bins
348  tuning_direction = TuningDirection::LARGER;
349  }
350  return true;
351  }
352  UNREACHABLE();
353  }
354 
355  chosen_bbox_intersect_threshold = new_bbox_intersect_threshold;
356 
357  if (keysPerBinUnderThreshold()) {
358  VLOG(1) << "Hash table reached size " << crt_props.hash_table_size
359  << " with keys per bin " << crt_props.keys_per_bin << " under threshold "
360  << bbox_intersect_target_entries_per_bin
361  << ". Terminating bucket size loop.";
362  return false;
363  }
364 
365  if (crt_reverse_search_iteration > 0) {
366  // We always take the first tuning iteration that succeeds when reversing
367  // direction, as if we're here we haven't had a successful iteration and we're
368  // "backtracking" our search by making bin sizes larger
369  VLOG(1) << "On reverse (larger tuning direction) search found workable "
370  << " hash table size of " << crt_props.hash_table_size
371  << " with keys per bin " << crt_props.keys_per_bin
372  << ". Terminating bucket size loop.";
373  return false;
374  }
375 
376  return true;
377  }
378 
379  bool hashTableTooBig() const {
380  return crt_props.hash_table_size > bbox_intersect_max_table_size_bytes;
381  }
382 
383  bool keysPerBinIncreasing() const {
384  return crt_props.keys_per_bin > prev_props.keys_per_bin;
385  }
386 
387  bool previousIterationValid() const {
388  return tuning_direction == TuningDirection::SMALLER && crt_step > 1;
389  }
390 
392  return crt_props.keys_per_bin < bbox_intersect_target_entries_per_bin;
393  }
394 };
395 
397  public:
398  BucketSizeTuner(const double bucket_threshold,
399  const double step,
400  const double min_threshold,
401  const Data_Namespace::MemoryLevel effective_memory_level,
402  const std::vector<ColumnsForDevice>& columns_per_device,
403  const std::vector<InnerOuter>& inner_outer_pairs,
404  const size_t table_tuple_count,
405  const Executor* executor)
406  : num_dims_(2) // Todo: allow varying number of dims
407  , bucket_thresholds_(/*count=*/num_dims_, /*value=*/bucket_threshold)
408  , step_(step)
409  , min_threshold_(min_threshold)
410  , effective_memory_level_(effective_memory_level)
411  , columns_per_device_(columns_per_device)
412  , inner_outer_pairs_(inner_outer_pairs)
413  , table_tuple_count_(table_tuple_count)
414  , executor_(executor) {
415  CHECK(!columns_per_device_.empty());
416  }
417 
418  bool tuneOneStep() { return tuneOneStep(TuningState::TuningDirection::SMALLER, step_); }
419 
420  bool tuneOneStep(const TuningState::TuningDirection tuning_direction) {
421  return tuneOneStep(tuning_direction, step_);
422  }
423 
424  bool tuneOneStep(const TuningState::TuningDirection tuning_direction,
425  const double step_overide) {
426  if (table_tuple_count_ == 0) {
427  return false;
428  }
429  if (tuning_direction == TuningState::TuningDirection::SMALLER) {
430  return tuneSmallerOneStep(step_overide);
431  }
432  return tuneLargerOneStep(step_overide);
433  }
434 
435  auto getMinBucketSize() const {
436  return *std::min_element(bucket_thresholds_.begin(), bucket_thresholds_.end());
437  }
438 
445  std::vector<double> getInverseBucketSizes() {
446  if (num_steps_ == 0) {
447  CHECK_EQ(current_bucket_sizes_.size(), static_cast<size_t>(0));
448  current_bucket_sizes_ = computeBucketSizes();
449  }
450  CHECK_EQ(current_bucket_sizes_.size(), num_dims_);
451  std::vector<double> inverse_bucket_sizes;
452  for (const auto s : current_bucket_sizes_) {
453  inverse_bucket_sizes.emplace_back(1.0 / s);
454  }
455  return inverse_bucket_sizes;
456  }
457 
458  private:
460  for (const auto& t : bucket_thresholds_) {
461  if (t < min_threshold_) {
462  return true;
463  }
464  }
465  return false;
466  }
467 
468  std::vector<double> computeBucketSizes() const {
469  if (table_tuple_count_ == 0) {
470  return std::vector<double>(/*count=*/num_dims_, /*val=*/0);
471  }
472  return compute_bucket_sizes(bucket_thresholds_,
473  effective_memory_level_,
474  columns_per_device_.front().join_columns[0],
475  columns_per_device_.front().join_column_types[0],
476  inner_outer_pairs_,
477  executor_);
478  }
479 
480  bool tuneSmallerOneStep(const double step_overide) {
481  if (!current_bucket_sizes_.empty()) {
482  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
483  bucket_thresholds_ = current_bucket_sizes_;
484  for (auto& t : bucket_thresholds_) {
485  t /= step_overide;
486  }
487  }
488  if (bucketThresholdsBelowMinThreshold()) {
489  VLOG(1) << "Aborting tuning for bounding box intersection as at least one bucket "
490  "size is below min threshold";
491  return false;
492  }
493  const auto next_bucket_sizes = computeBucketSizes();
494  if (next_bucket_sizes == current_bucket_sizes_) {
495  VLOG(1) << "Aborting tuning for bounding box intersection as bucket size is no "
496  "longer changing.";
497  return false;
498  }
499 
500  current_bucket_sizes_ = next_bucket_sizes;
501  num_steps_++;
502  return true;
503  }
504 
505  bool tuneLargerOneStep(const double step_overide) {
506  if (!current_bucket_sizes_.empty()) {
507  CHECK_EQ(current_bucket_sizes_.size(), bucket_thresholds_.size());
508  bucket_thresholds_ = current_bucket_sizes_;
509  }
510  // If current_bucket_sizes was empty, we will start from our initial threshold
511  for (auto& t : bucket_thresholds_) {
512  t *= step_overide;
513  }
514  // When tuning up, do not dynamically compute bucket_sizes, as compute_bucket_sizes as
515  // written will pick the largest bin size below the threshold, meaning our bucket_size
516  // will never increase beyond the size of the largest polygon. This could mean that we
517  // can never make the bucket sizes large enough to get our hash table below the
518  // maximum size Possible todo: enable templated version of compute_bucket_sizes that
519  // allows for optionally finding smallest extent above threshold, to mirror default
520  // behavior finding largest extent below threshold, and use former variant here
521  current_bucket_sizes_ = bucket_thresholds_;
522  num_steps_++;
523  return true;
524  }
525 
526  size_t num_dims_;
527  std::vector<double> bucket_thresholds_;
528  size_t num_steps_{0};
529  const double step_;
530  const double min_threshold_;
532  const std::vector<ColumnsForDevice>& columns_per_device_;
533  const std::vector<InnerOuter>& inner_outer_pairs_;
534  const size_t table_tuple_count_;
535  const Executor* executor_;
536 
537  std::vector<double> current_bucket_sizes_;
538 
539  friend std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner);
540 };
541 
542 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
543  os << "Step Num: " << tuner.num_steps_ << ", Threshold: " << std::fixed << "("
544  << tuner.bucket_thresholds_[0] << ", " << tuner.bucket_thresholds_[1] << ")"
545  << ", Step Size: " << std::fixed << tuner.step_ << ", Min: " << std::fixed
546  << tuner.min_threshold_;
547  return os;
548 }
549 
550 } // namespace
551 
553  auto timer = DEBUG_TIMER(__func__);
555  const auto& query_info =
557  .info;
558  auto [db_id, table_id] = HashJoin::getInnerTableId(inner_outer_pairs_);
559  VLOG(1) << "Reify with layout " << getHashTypeString(layout) << "for db_id: " << db_id
560  << ", table_id: " << table_id;
561  if (query_info.fragments.empty()) {
562  return;
563  }
564 
565  auto bbox_intersect_max_table_size_bytes = g_bbox_intersect_max_table_size_bytes;
566  std::optional<double> bbox_intersect_threshold_override;
567  double bbox_intersect_target_entries_per_bin = g_bbox_intersect_target_entries_per_bin;
568  auto skip_hashtable_caching = false;
570  VLOG(1) << "Setting bounding box intersection bucket threshold "
571  "\'bbox_intersect_bucket_threshold\' via "
572  "query hint: "
574  bbox_intersect_threshold_override = query_hints_.bbox_intersect_bucket_threshold;
575  }
577  std::ostringstream oss;
578  oss << "User requests to change a threshold \'bbox_intersect_max_table_size_bytes\' "
579  "via "
580  "query hint";
581  if (!bbox_intersect_threshold_override.has_value()) {
582  oss << ": " << bbox_intersect_max_table_size_bytes << " -> "
584  bbox_intersect_max_table_size_bytes = query_hints_.bbox_intersect_max_size;
585  } else {
586  oss << ", but is skipped since the query hint also changes the threshold "
587  "\'bbox_intersect_bucket_threshold\'";
588  }
589  VLOG(1) << oss.str();
590  }
592  VLOG(1) << "User requests to skip caching join hashtable for bounding box "
593  "intersection and its tuned "
594  "parameters for this query";
595  skip_hashtable_caching = true;
596  }
598  VLOG(1) << "User requests to change a threshold \'bbox_intersect_keys_per_bin\' via "
599  "query "
600  "hint: "
601  << bbox_intersect_target_entries_per_bin << " -> "
603  bbox_intersect_target_entries_per_bin = query_hints_.bbox_intersect_keys_per_bin;
604  }
605 
606  auto data_mgr = executor_->getDataMgr();
607  // we prioritize CPU when building a join hashtable for bounding box intersection, but
608  // if we have GPU and user-given hint is given we selectively allow GPU to build it but
609  // even if we have GPU but user foces to set CPU as execution device type we should not
610  // allow to use GPU for building it
611  auto allow_gpu_hashtable_build =
614  if (allow_gpu_hashtable_build) {
615  if (data_mgr->gpusPresent() &&
617  VLOG(1) << "A user forces to build GPU hash table for bounding box intersection";
618  } else {
619  allow_gpu_hashtable_build = false;
620  VLOG(1) << "A user forces to build GPU hash table for bounding box intersection "
621  "but we skip it since either GPU is not presented or CPU execution mode "
622  "is set";
623  }
624  }
625 
626  std::vector<ColumnsForDevice> columns_per_device;
627  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
629  allow_gpu_hashtable_build) {
630  for (int device_id = 0; device_id < device_count_; ++device_id) {
631  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
632  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
633  }
634  }
635 
636  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
637  const auto shard_count = shardCount();
638  size_t total_num_tuples = 0;
639  for (int device_id = 0; device_id < device_count_; ++device_id) {
640  fragments_per_device.emplace_back(
641  shard_count
642  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
643  : query_info.fragments);
644  const size_t crt_num_tuples =
645  std::accumulate(fragments_per_device.back().begin(),
646  fragments_per_device.back().end(),
647  size_t(0),
648  [](const auto& sum, const auto& fragment) {
649  return sum + fragment.getNumTuples();
650  });
651  total_num_tuples += crt_num_tuples;
652  const auto columns_for_device =
653  fetchColumnsForDevice(fragments_per_device.back(),
654  device_id,
656  allow_gpu_hashtable_build
657  ? dev_buff_owners[device_id].get()
658  : nullptr);
659  columns_per_device.push_back(columns_for_device);
660  }
661 
662  // try to extract cache key for hash table and its relevant info
663  auto hashtable_access_path_info =
665  {},
666  condition_->get_optype(),
667  join_type_,
670  shard_count,
671  fragments_per_device,
672  executor_);
673  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
674  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
675  table_keys_ = hashtable_access_path_info.table_keys;
676 
677  auto get_inner_table_key = [this]() {
678  auto col_var = inner_outer_pairs_.front().first;
679  return col_var->getTableKey();
680  };
681 
682  if (table_keys_.empty()) {
683  const auto& table_key = get_inner_table_key();
686  }
687  CHECK(!table_keys_.empty());
688 
689  if (bbox_intersect_threshold_override) {
690  // compute bucket sizes based on the user provided threshold
691  BucketSizeTuner tuner(/*initial_threshold=*/*bbox_intersect_threshold_override,
692  /*step=*/1.0,
693  /*min_threshold=*/0.0,
695  columns_per_device,
697  total_num_tuples,
698  executor_);
699  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
700 
701  auto [entry_count, emitted_keys_count] =
702  computeHashTableCounts(shard_count,
703  inverse_bucket_sizes,
704  columns_per_device,
705  bbox_intersect_max_table_size_bytes,
706  *bbox_intersect_threshold_override);
707  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
708  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
709  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
710  // one
711  generateCacheKey(bbox_intersect_max_table_size_bytes,
712  *bbox_intersect_threshold_override,
713  inverse_bucket_sizes,
714  fragments_per_device,
715  device_count_);
716  reifyImpl(columns_per_device,
717  query_info,
718  layout,
719  shard_count,
720  entry_count,
721  emitted_keys_count,
722  skip_hashtable_caching,
723  bbox_intersect_max_table_size_bytes,
724  *bbox_intersect_threshold_override);
725  } else {
726  double bbox_intersect_bucket_threshold = std::numeric_limits<double>::max();
727  generateCacheKey(bbox_intersect_max_table_size_bytes,
728  bbox_intersect_bucket_threshold,
729  {},
730  fragments_per_device,
731  device_count_);
732  std::vector<size_t> per_device_chunk_key;
733  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
734  get_inner_table_key().table_id > 0) {
735  for (int device_id = 0; device_id < device_count_; ++device_id) {
737  boost::hash_combine(
738  chunk_key_hash,
739  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
740  per_device_chunk_key.push_back(chunk_key_hash);
743  columns_per_device.front().join_columns.front().num_elems,
744  chunk_key_hash,
745  condition_->get_optype(),
746  bbox_intersect_max_table_size_bytes,
747  bbox_intersect_bucket_threshold,
748  {}};
749  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
750  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
751  table_keys_);
752  }
753  }
754 
755  auto cached_bucket_threshold = auto_tuner_cache_->getItemFromCache(
756  hashtable_cache_key_.front(),
759  if (cached_bucket_threshold) {
760  bbox_intersect_bucket_threshold = cached_bucket_threshold->bucket_threshold;
761  auto inverse_bucket_sizes = cached_bucket_threshold->bucket_sizes;
762  setBoundingBoxIntersectionMetaInfo(bbox_intersect_max_table_size_bytes,
763  bbox_intersect_bucket_threshold,
764  inverse_bucket_sizes);
765  generateCacheKey(bbox_intersect_max_table_size_bytes,
766  bbox_intersect_bucket_threshold,
767  inverse_bucket_sizes,
768  fragments_per_device,
769  device_count_);
770 
771  if (auto hash_table =
772  hash_table_cache_->getItemFromCache(hashtable_cache_key_[device_count_],
775  std::nullopt)) {
776  // if we already have a built hash table, we can skip the scans required for
777  // computing bucket size and tuple count
778  // reset as the hash table sizes can vary a bit
779  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
780  CHECK(hash_table);
781 
782  VLOG(1) << "Using cached hash table bucket size";
783 
784  reifyImpl(columns_per_device,
785  query_info,
786  layout,
787  shard_count,
788  hash_table->getEntryCount(),
789  hash_table->getEmittedKeysCount(),
790  skip_hashtable_caching,
791  bbox_intersect_max_table_size_bytes,
792  bbox_intersect_bucket_threshold);
793  } else {
794  VLOG(1) << "Computing bucket size for cached bucket threshold";
795  // compute bucket size using our cached tuner value
796  BucketSizeTuner tuner(/*initial_threshold=*/bbox_intersect_bucket_threshold,
797  /*step=*/1.0,
798  /*min_threshold=*/0.0,
800  columns_per_device,
802  total_num_tuples,
803  executor_);
804 
805  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
806 
807  auto [entry_count, emitted_keys_count] =
808  computeHashTableCounts(shard_count,
809  inverse_bucket_sizes,
810  columns_per_device,
811  bbox_intersect_max_table_size_bytes,
812  bbox_intersect_bucket_threshold);
813  setInverseBucketSizeInfo(inverse_bucket_sizes, columns_per_device, device_count_);
814 
815  generateCacheKey(bbox_intersect_max_table_size_bytes,
816  bbox_intersect_bucket_threshold,
817  inverse_bucket_sizes,
818  fragments_per_device,
819  device_count_);
820 
821  reifyImpl(columns_per_device,
822  query_info,
823  layout,
824  shard_count,
825  entry_count,
826  emitted_keys_count,
827  skip_hashtable_caching,
828  bbox_intersect_max_table_size_bytes,
829  bbox_intersect_bucket_threshold);
830  }
831  } else {
832  // compute bucket size using the auto tuner
833  BucketSizeTuner tuner(
834  /*initial_threshold=*/bbox_intersect_bucket_threshold,
835  /*step=*/2.0,
836  /*min_threshold=*/1e-7,
838  columns_per_device,
840  total_num_tuples,
841  executor_);
842 
843  VLOG(1) << "Running auto tune logic for bounding box intersection with parameters: "
844  << tuner;
845 
846  // manages the tuning state machine
847  TuningState tuning_state(bbox_intersect_max_table_size_bytes,
848  bbox_intersect_target_entries_per_bin);
849  while (tuner.tuneOneStep(tuning_state.tuning_direction)) {
850  const auto inverse_bucket_sizes = tuner.getInverseBucketSizes();
851 
852  const auto [crt_entry_count, crt_emitted_keys_count] =
853  computeHashTableCounts(shard_count,
854  inverse_bucket_sizes,
855  columns_per_device,
856  tuning_state.bbox_intersect_max_table_size_bytes,
857  tuning_state.chosen_bbox_intersect_threshold);
858  const size_t hash_table_size = calculateHashTableSize(
859  inverse_bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
860  HashTableProps crt_props(crt_entry_count,
861  crt_emitted_keys_count,
862  hash_table_size,
863  inverse_bucket_sizes);
864  VLOG(1) << "Tuner output: " << tuner << " with properties " << crt_props;
865 
866  const auto should_continue = tuning_state(crt_props, tuner.getMinBucketSize());
868  tuning_state.crt_props.bucket_sizes, columns_per_device, device_count_);
869  if (!should_continue) {
870  break;
871  }
872  }
873 
874  const auto& crt_props = tuning_state.crt_props;
875  // sanity check that the hash table size has not changed. this is a fairly
876  // inexpensive check to ensure the above algorithm is consistent
877  const size_t hash_table_size =
879  crt_props.emitted_keys_count,
880  crt_props.entry_count);
881  CHECK_EQ(crt_props.hash_table_size, hash_table_size);
882 
884  hash_table_size > bbox_intersect_max_table_size_bytes) {
885  VLOG(1) << "Could not find suitable parameters to create hash "
886  "table for bounding box intersectionunder max allowed size ("
887  << bbox_intersect_max_table_size_bytes << ") bytes.";
888  throw TooBigHashTableForBoundingBoxIntersect(bbox_intersect_max_table_size_bytes);
889  }
890 
891  VLOG(1) << "Final tuner output: " << tuner << " with properties " << crt_props;
893  VLOG(1) << "Final bucket sizes: ";
894  for (size_t dim = 0; dim < inverse_bucket_sizes_for_dimension_.size(); dim++) {
895  VLOG(1) << "dim[" << dim
896  << "]: " << 1.0 / inverse_bucket_sizes_for_dimension_[dim];
897  }
898  CHECK_GE(tuning_state.chosen_bbox_intersect_threshold, double(0));
899  generateCacheKey(tuning_state.bbox_intersect_max_table_size_bytes,
900  tuning_state.chosen_bbox_intersect_threshold,
901  {},
902  fragments_per_device,
903  device_count_);
904  const auto candidate_auto_tuner_cache_key = hashtable_cache_key_.front();
905  if (skip_hashtable_caching) {
906  VLOG(1) << "Skip to add tuned parameters to auto tuner";
907  } else {
908  AutoTunerMetaInfo meta_info{tuning_state.bbox_intersect_max_table_size_bytes,
909  tuning_state.chosen_bbox_intersect_threshold,
911  auto_tuner_cache_->putItemToCache(candidate_auto_tuner_cache_key,
912  meta_info,
915  0,
916  0);
917  }
918  bbox_intersect_bucket_threshold = tuning_state.chosen_bbox_intersect_threshold;
919  reifyImpl(columns_per_device,
920  query_info,
921  layout,
922  shard_count,
923  crt_props.entry_count,
924  crt_props.emitted_keys_count,
925  skip_hashtable_caching,
926  bbox_intersect_max_table_size_bytes,
927  bbox_intersect_bucket_threshold);
928  }
929  }
930 }
931 
933  size_t number_of_dimensions,
934  size_t emitted_keys_count,
935  size_t entry_count) const {
936  const auto key_component_width = getKeyComponentWidth();
937  const auto key_component_count = number_of_dimensions;
938  const auto entry_size = key_component_count * key_component_width;
939  const auto keys_for_all_rows = emitted_keys_count;
940  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
941  const size_t hash_table_size =
942  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
943  return hash_table_size;
944 }
945 
947  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
948  const int device_id,
949  DeviceAllocator* dev_buff_owner) {
950  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
951 
952  std::vector<JoinColumn> join_columns;
953  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
954  std::vector<JoinColumnTypeInfo> join_column_types;
955  std::vector<std::shared_ptr<void>> malloc_owner;
956  for (const auto& inner_outer_pair : inner_outer_pairs_) {
957  const auto inner_col = inner_outer_pair.first;
958  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
959  if (inner_cd && inner_cd->isVirtualCol) {
961  }
962  join_columns.emplace_back(fetchJoinColumn(inner_col,
963  fragments,
964  effective_memory_level,
965  device_id,
966  chunks_owner,
967  dev_buff_owner,
968  malloc_owner,
969  executor_,
970  &column_cache_));
971  const auto& ti = inner_col->get_type_info();
972  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
973  0,
974  0,
975  inline_int_null_value<int64_t>(),
976  false,
977  0,
979  CHECK(ti.is_array())
980  << "Bounding box intersection currently only supported for arrays.";
981  }
982  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
983 }
984 
986  const size_t shard_count,
987  const std::vector<double>& inverse_bucket_sizes_for_dimension,
988  std::vector<ColumnsForDevice>& columns_per_device,
989  const size_t chosen_max_hashtable_size,
990  const double chosen_bucket_threshold) {
991  CHECK(!inverse_bucket_sizes_for_dimension.empty());
992  const auto [tuple_count, emitted_keys_count] =
993  approximateTupleCount(inverse_bucket_sizes_for_dimension,
994  columns_per_device,
995  chosen_max_hashtable_size,
996  chosen_bucket_threshold);
997  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
998 
999  return std::make_pair(
1000  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
1001  emitted_keys_count);
1002 }
1003 
1005  const std::vector<double>& inverse_bucket_sizes_for_dimension,
1006  std::vector<ColumnsForDevice>& columns_per_device,
1007  const size_t chosen_max_hashtable_size,
1008  const double chosen_bucket_threshold) {
1009  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1010  CountDistinctDescriptor count_distinct_desc{
1012  0,
1013  11,
1014  true,
1015  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
1018  1};
1019  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
1020 
1021  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
1022  if (columns_per_device.front().join_columns.front().num_elems == 0) {
1023  return std::make_pair(0, 0);
1024  }
1025 
1026  // TODO: state management in here should be revisited, but this should be safe enough
1027  // for now
1028  // re-compute bucket counts per device based on global bucket size
1029  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
1030  auto& columns_for_device = columns_per_device[device_id];
1031  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
1033  }
1034 
1035  // Number of keys must match dimension of buckets
1036  CHECK_EQ(columns_per_device.front().join_columns.size(),
1037  columns_per_device.front().join_buckets.size());
1038  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1039  // Note that this path assumes each device has the same hash table (for GPU hash
1040  // join w/ hash table built on CPU)
1041  const auto cached_count_info =
1045  if (cached_count_info) {
1046  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
1047  << ", emitted keys count: " << cached_count_info->second;
1048  return *cached_count_info;
1049  }
1050  int thread_count = cpu_threads();
1051  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
1052  auto hll_result = &hll_buffer_all_cpus[0];
1053 
1054  std::vector<int32_t> num_keys_for_row;
1055  // TODO(adb): support multi-column bounding box intersection
1056  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
1057 
1059  hll_result,
1060  num_keys_for_row,
1061  count_distinct_desc.bitmap_sz_bits,
1062  padded_size_bytes,
1063  columns_per_device.front().join_columns,
1064  columns_per_device.front().join_column_types,
1065  columns_per_device.front().join_buckets,
1066  thread_count);
1067  for (int i = 1; i < thread_count; ++i) {
1068  hll_unify(hll_result,
1069  hll_result + i * padded_size_bytes,
1070  size_t(1) << count_distinct_desc.bitmap_sz_bits);
1071  }
1072  return std::make_pair(
1073  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1074  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
1075  }
1076 #ifdef HAVE_CUDA
1077  auto data_mgr = executor_->getDataMgr();
1078  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
1079  for (auto& host_hll_buffer : host_hll_buffers) {
1080  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
1081  }
1082  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
1083  std::vector<std::future<void>> approximate_distinct_device_threads;
1084  for (int device_id = 0; device_id < device_count_; ++device_id) {
1085  approximate_distinct_device_threads.emplace_back(std::async(
1087  [device_id,
1088  &columns_per_device,
1089  &count_distinct_desc,
1090  data_mgr,
1091  &host_hll_buffers,
1092  &emitted_keys_count_device_threads] {
1093  auto allocator = std::make_unique<CudaAllocator>(
1094  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1095  auto device_hll_buffer =
1096  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
1097  data_mgr->getCudaMgr()->zeroDeviceMem(
1098  device_hll_buffer,
1099  count_distinct_desc.bitmapPaddedSizeBytes(),
1100  device_id,
1102  const auto& columns_for_device = columns_per_device[device_id];
1103  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
1104  columns_for_device.join_columns, *allocator);
1105 
1106  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
1107  const auto& inverse_bucket_sizes_for_dimension =
1108  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
1109  auto inverse_bucket_sizes_gpu = allocator->alloc(
1110  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1111  allocator->copyToDevice(
1112  inverse_bucket_sizes_gpu,
1113  inverse_bucket_sizes_for_dimension.data(),
1114  inverse_bucket_sizes_for_dimension.size() * sizeof(double));
1115  const size_t row_counts_buffer_sz =
1116  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
1117  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
1118  data_mgr->getCudaMgr()->zeroDeviceMem(
1119  row_counts_buffer,
1120  row_counts_buffer_sz,
1121  device_id,
1123  const auto key_handler = BoundingBoxIntersectKeyHandler(
1124  inverse_bucket_sizes_for_dimension.size(),
1125  join_columns_gpu,
1126  reinterpret_cast<double*>(inverse_bucket_sizes_gpu));
1127  const auto key_handler_gpu =
1128  transfer_flat_object_to_gpu(key_handler, *allocator);
1130  reinterpret_cast<uint8_t*>(device_hll_buffer),
1131  count_distinct_desc.bitmap_sz_bits,
1132  reinterpret_cast<int32_t*>(row_counts_buffer),
1133  key_handler_gpu,
1134  columns_for_device.join_columns[0].num_elems);
1135 
1136  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
1137  allocator->copyFromDevice(
1138  &host_emitted_keys_count,
1139  row_counts_buffer +
1140  (columns_per_device.front().join_columns[0].num_elems - 1) *
1141  sizeof(int32_t),
1142  sizeof(int32_t));
1143 
1144  auto& host_hll_buffer = host_hll_buffers[device_id];
1145  allocator->copyFromDevice(&host_hll_buffer[0],
1146  device_hll_buffer,
1147  count_distinct_desc.bitmapPaddedSizeBytes());
1148  }));
1149  }
1150  for (auto& child : approximate_distinct_device_threads) {
1151  child.get();
1152  }
1153  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
1154  auto& result_hll_buffer = host_hll_buffers.front();
1155  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
1156  for (int device_id = 1; device_id < device_count_; ++device_id) {
1157  auto& host_hll_buffer = host_hll_buffers[device_id];
1158  hll_unify(hll_result,
1159  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
1160  size_t(1) << count_distinct_desc.bitmap_sz_bits);
1161  }
1162  const size_t emitted_keys_count =
1163  std::accumulate(emitted_keys_count_device_threads.begin(),
1164  emitted_keys_count_device_threads.end(),
1165  0);
1166  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
1167  emitted_keys_count);
1168 #else
1169  UNREACHABLE();
1170  return {0, 0};
1171 #endif // HAVE_CUDA
1172 }
1173 
1175  const std::vector<double>& inverse_bucket_sizes,
1176  std::vector<ColumnsForDevice>& columns_per_device,
1177  const size_t device_count) {
1178  // set global bucket size
1179  inverse_bucket_sizes_for_dimension_ = inverse_bucket_sizes;
1180 
1181  // re-compute bucket counts per device based on global bucket size
1182  CHECK_EQ(columns_per_device.size(), static_cast<size_t>(device_count));
1183  for (size_t device_id = 0; device_id < device_count; ++device_id) {
1184  auto& columns_for_device = columns_per_device[device_id];
1185  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension_,
1187  }
1188 }
1189 
1191  return 8;
1192 }
1193 
1197 }
1198 
1200  auto timer = DEBUG_TIMER(__func__);
1201  CHECK_LT(0, device_count_);
1203 
1204  CHECK(condition_->is_bbox_intersect_oper());
1205  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
1206  HashType layout;
1207  if (inner_outer_pairs_[0].second->get_type_info().is_fixlen_array() &&
1208  inner_outer_pairs_[0].second->get_type_info().get_size() == 32) {
1209  // bounds array
1210  layout = HashType::ManyToMany;
1211  } else {
1212  layout = HashType::OneToMany;
1213  }
1214  try {
1215  reifyWithLayout(layout);
1216  return;
1217  } catch (const JoinHashTableTooBig& e) {
1218  throw e;
1219  } catch (const std::exception& e) {
1220  VLOG(1) << "Caught exception while building baseline hash table for bounding box "
1221  "intersection: "
1222  << e.what();
1223  throw;
1224  }
1225 }
1226 
1228  std::vector<ColumnsForDevice>& columns_per_device,
1229  const Fragmenter_Namespace::TableInfo& query_info,
1230  const HashType layout,
1231  const size_t shard_count,
1232  const size_t entry_count,
1233  const size_t emitted_keys_count,
1234  const bool skip_hashtable_caching,
1235  const size_t chosen_max_hashtable_size,
1236  const double chosen_bucket_threshold) {
1237  std::vector<std::future<void>> init_threads;
1238  chosen_bbox_intersect_bucket_threshold_ = chosen_bucket_threshold;
1239  chosen_bbox_intersect_max_table_size_bytes_ = chosen_max_hashtable_size;
1243 
1244  for (int device_id = 0; device_id < device_count_; ++device_id) {
1245  const auto fragments =
1246  shard_count
1247  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
1248  : query_info.fragments;
1249  init_threads.push_back(std::async(std::launch::async,
1251  this,
1252  columns_per_device[device_id],
1253  layout,
1254  entry_count,
1255  emitted_keys_count,
1256  skip_hashtable_caching,
1257  device_id,
1259  }
1260  for (auto& init_thread : init_threads) {
1261  init_thread.wait();
1262  }
1263  for (auto& init_thread : init_threads) {
1264  init_thread.get();
1265  }
1266 }
1267 
1269  const ColumnsForDevice& columns_for_device,
1270  const HashType layout,
1271  const size_t entry_count,
1272  const size_t emitted_keys_count,
1273  const bool skip_hashtable_caching,
1274  const int device_id,
1275  const logger::ThreadLocalIds parent_thread_local_ids) {
1276  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1277  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
1278  CHECK_EQ(getKeyComponentWidth(), size_t(8));
1280  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
1281  BaselineHashTableEntryInfo hash_table_entry_info(entry_count,
1282  emitted_keys_count,
1283  sizeof(int32_t),
1286  layout,
1287  false);
1288  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1289  VLOG(1) << "Building join hash table for bounding box intersection on CPU.";
1290  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
1291  columns_for_device.join_column_types,
1292  columns_for_device.join_buckets,
1293  hash_table_entry_info,
1294  skip_hashtable_caching);
1295  CHECK(hash_table);
1296 
1297 #ifdef HAVE_CUDA
1299  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table, device_id);
1300  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1301  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1302  } else {
1303 #else
1304  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1305 #endif
1306  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
1307  hash_tables_for_device_[0] = hash_table;
1308 #ifdef HAVE_CUDA
1309  }
1310 #endif
1311  } else {
1312 #ifdef HAVE_CUDA
1313  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
1314  columns_for_device.join_column_types,
1315  columns_for_device.join_buckets,
1316  hash_table_entry_info,
1317  device_id);
1318  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1319  hash_tables_for_device_[device_id] = std::move(hash_table);
1320 #else
1321  UNREACHABLE();
1322 #endif
1323  }
1324 }
1325 
1327  const std::vector<JoinColumn>& join_columns,
1328  const std::vector<JoinColumnTypeInfo>& join_column_types,
1329  const std::vector<JoinBucketInfo>& join_bucket_info,
1330  const BaselineHashTableEntryInfo hash_table_entry_info,
1331  const bool skip_hashtable_caching) {
1332  auto timer = DEBUG_TIMER(__func__);
1333  decltype(std::chrono::steady_clock::now()) ts1, ts2;
1334  ts1 = std::chrono::steady_clock::now();
1335  CHECK(!join_columns.empty());
1336  CHECK(!join_bucket_info.empty());
1337  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1338  auto const hash_table_layout = hash_table_entry_info.getHashTableLayout();
1339  if (auto generic_hash_table =
1343  if (auto hash_table =
1344  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
1345  VLOG(1) << "Using cached CPU hash table for initialization.";
1346  // See if a hash table of a different layout was returned.
1347  // If it was OneToMany, we can reuse it on ManyToMany.
1348  if (hash_table_layout == HashType::ManyToMany &&
1349  hash_table->getLayout() == HashType::OneToMany) {
1350  // use the cached hash table
1352  return hash_table;
1353  }
1354  if (hash_table_layout == hash_table->getLayout()) {
1355  return hash_table;
1356  }
1357  }
1358  }
1359  CHECK(layoutRequiresAdditionalBuffers(hash_table_layout));
1360  const auto key_component_count =
1361  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
1362 
1363  const auto key_handler = BoundingBoxIntersectKeyHandler(
1364  key_component_count,
1365  &join_columns[0],
1366  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
1369  dummy_str_proxy_translation_maps_ptrs_and_offsets;
1370  const auto err =
1371  builder.initHashTableOnCpu(&key_handler,
1373  join_columns,
1374  join_column_types,
1375  join_bucket_info,
1376  dummy_str_proxy_translation_maps_ptrs_and_offsets,
1377  hash_table_entry_info,
1378  join_type_,
1379  executor_,
1380  query_hints_);
1381  ts2 = std::chrono::steady_clock::now();
1382  if (err) {
1383  throw HashJoinFail(std::string("Unrecognized error when initializing CPU hash table "
1384  "for bounding box intersection(") +
1385  std::to_string(err) + std::string(")"));
1386  }
1387  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
1388  if (skip_hashtable_caching) {
1389  VLOG(1) << "Skip to cache join hashtable for bounding box intersection";
1390  } else {
1391  auto hashtable_build_time =
1392  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
1395  hash_table,
1397  hashtable_build_time);
1398  }
1399  return hash_table;
1400 }
1401 
1402 #ifdef HAVE_CUDA
1403 
1404 std::shared_ptr<BaselineHashTable> BoundingBoxIntersectJoinHashTable::initHashTableOnGpu(
1405  const std::vector<JoinColumn>& join_columns,
1406  const std::vector<JoinColumnTypeInfo>& join_column_types,
1407  const std::vector<JoinBucketInfo>& join_bucket_info,
1408  const BaselineHashTableEntryInfo hash_table_entry_info,
1409  const size_t device_id) {
1411 
1412  VLOG(1) << "Building join hash table for bounding box intersection on GPU.";
1413 
1415  auto data_mgr = executor_->getDataMgr();
1416  CudaAllocator allocator(
1417  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1418  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
1419  CHECK_EQ(join_columns.size(), 1u);
1420  CHECK(!join_bucket_info.empty());
1421  auto& inverse_bucket_sizes_for_dimension =
1422  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
1423  auto inverse_bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
1424  inverse_bucket_sizes_for_dimension, allocator);
1425  const auto key_handler =
1426  BoundingBoxIntersectKeyHandler(inverse_bucket_sizes_for_dimension.size(),
1427  join_columns_gpu,
1428  inverse_bucket_sizes_gpu);
1429 
1430  const auto err = builder.initHashTableOnGpu(&key_handler,
1431  join_columns,
1432  join_type_,
1433  hash_table_entry_info,
1434  device_id,
1435  executor_,
1436  query_hints_);
1437  if (err) {
1438  throw HashJoinFail(std::string("Unrecognized error when initializing GPU hash table "
1439  "for bounding box intersection (") +
1440  std::to_string(err) + std::string(")"));
1441  }
1442  return builder.getHashTable();
1443 }
1444 
1445 std::shared_ptr<BaselineHashTable>
1446 BoundingBoxIntersectJoinHashTable::copyCpuHashTableToGpu(
1447  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
1448  const size_t device_id) {
1450 
1451  auto data_mgr = executor_->getDataMgr();
1452 
1453  // copy hash table to GPU
1454  BaselineJoinHashTableBuilder gpu_builder;
1455  gpu_builder.allocateDeviceMemory(
1456  cpu_hash_table->getHashTableEntryInfo(), device_id, executor_, query_hints_);
1457  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
1458  CHECK(gpu_hash_table);
1459  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1460  CHECK(gpu_buffer_ptr);
1461 
1462  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1463  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1464  auto device_allocator = std::make_unique<CudaAllocator>(
1465  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1466  device_allocator->copyToDevice(
1467  gpu_buffer_ptr,
1468  cpu_hash_table->getCpuBuffer(),
1469  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1470  return gpu_hash_table;
1471 }
1472 
1473 #endif // HAVE_CUDA
1474 
1475 #define LL_CONTEXT executor_->cgen_state_->context_
1476 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1477 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1478 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1479 #define ROW_FUNC executor_->cgen_state_->row_func_
1480 
1482  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1483  const auto key_component_width = getKeyComponentWidth();
1484  CHECK(key_component_width == 4 || key_component_width == 8);
1485  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1486  llvm::Value* key_buff_lv{nullptr};
1487  switch (key_component_width) {
1488  case 4:
1489  key_buff_lv =
1490  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1491  break;
1492  case 8:
1493  key_buff_lv =
1494  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1495  break;
1496  default:
1497  CHECK(false);
1498  }
1499 
1500  const auto& inner_outer_pair = inner_outer_pairs_[0];
1501  const auto outer_geo = inner_outer_pair.second;
1502  const auto outer_geo_ti = outer_geo->get_type_info();
1503 
1504  llvm::Value* arr_ptr = nullptr;
1505  CodeGenerator code_generator(executor_);
1506  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1507 
1508  if (outer_geo_ti.is_geometry()) {
1509  // TODO(adb): for points we will use the coords array, but for other geometries we
1510  // will need to use the bounding box. For now only support points.
1511  CHECK_EQ(outer_geo_ti.get_type(), kPOINT);
1512 
1513  if (const auto outer_geo_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_geo)) {
1514  const auto outer_geo_col_lvs = code_generator.codegen(outer_geo_col, true, co);
1515  CHECK_EQ(outer_geo_col_lvs.size(), size_t(1));
1516  auto column_key = outer_geo_col->getColumnKey();
1517  column_key.column_id = column_key.column_id + 1;
1518  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(column_key);
1519  CHECK(coords_cd);
1520 
1521  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1522  "array_buff",
1523  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1524  {outer_geo_col_lvs.front(), code_generator.posArg(outer_geo_col)});
1525  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1526  << "Bounding box intersection only supports TINYINT coordinates columns.";
1527  arr_ptr = code_generator.castArrayPointer(array_ptr,
1528  coords_cd->columnType.get_elem_type());
1529  } else if (const auto outer_geo_function_operator =
1530  dynamic_cast<const Analyzer::GeoOperator*>(outer_geo)) {
1531  // Process points dynamically constructed by geo function operators
1532  const auto outer_geo_function_operator_lvs =
1533  code_generator.codegen(outer_geo_function_operator, true, co);
1534  CHECK_EQ(outer_geo_function_operator_lvs.size(), size_t(2));
1535  arr_ptr = outer_geo_function_operator_lvs.front();
1536  } else if (const auto outer_geo_expr =
1537  dynamic_cast<const Analyzer::GeoExpr*>(outer_geo)) {
1538  UNREACHABLE() << outer_geo_expr->toString();
1539  }
1540  } else if (outer_geo_ti.is_fixlen_array()) {
1541  // Process dynamically constructed points
1542  const auto outer_geo_cast_coord_array =
1543  dynamic_cast<const Analyzer::UOper*>(outer_geo);
1544  CHECK_EQ(outer_geo_cast_coord_array->get_optype(), kCAST);
1545  const auto outer_geo_coord_array = dynamic_cast<const Analyzer::ArrayExpr*>(
1546  outer_geo_cast_coord_array->get_operand());
1547  CHECK(outer_geo_coord_array);
1548  CHECK(outer_geo_coord_array->isLocalAlloc());
1549  CHECK_EQ(outer_geo_coord_array->getElementCount(), 2);
1550  auto elem_size = (outer_geo_ti.get_compression() == kENCODING_GEOINT)
1551  ? sizeof(int32_t)
1552  : sizeof(double);
1553  CHECK_EQ(outer_geo_ti.get_size(), int(2 * elem_size));
1554  const auto outer_geo_constructed_lvs = code_generator.codegen(outer_geo, true, co);
1555  // CHECK_EQ(outer_geo_constructed_lvs.size(), size_t(2)); // Pointer and size
1556  const auto array_ptr = outer_geo_constructed_lvs.front(); // Just need the pointer
1557  arr_ptr = LL_BUILDER.CreateGEP(
1558  array_ptr->getType()->getScalarType()->getPointerElementType(),
1559  array_ptr,
1560  LL_INT(0));
1561  arr_ptr = code_generator.castArrayPointer(array_ptr, SQLTypeInfo(kTINYINT, true));
1562  }
1563  if (!arr_ptr) {
1564  LOG(FATAL)
1565  << "Bounding box intersection currently only supports geospatial columns and "
1566  "constructed points.";
1567  }
1568 
1569  for (size_t i = 0; i < 2; i++) {
1570  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1571  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1572  key_buff_lv,
1573  LL_INT(i));
1574 
1575  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1576  // future compression schemes
1577  auto bucket_key =
1578  outer_geo_ti.get_compression() == kENCODING_GEOINT
1579  ? executor_->cgen_state_->emitExternalCall(
1580  "get_bucket_key_for_range_compressed",
1581  get_int_type(64, LL_CONTEXT),
1582  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])})
1583  : executor_->cgen_state_->emitExternalCall(
1584  "get_bucket_key_for_range_double",
1585  get_int_type(64, LL_CONTEXT),
1586  {arr_ptr, LL_INT(i), LL_FP(inverse_bucket_sizes_for_dimension_[i])});
1587  const auto col_lv = LL_BUILDER.CreateSExt(
1588  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1589  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1590  }
1591  return key_buff_lv;
1592 }
1593 
1595  const CompilationOptions& co) {
1596  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1597  const auto key_component_width = getKeyComponentWidth();
1598  CHECK(key_component_width == 4 || key_component_width == 8);
1599  auto hash_table = getHashTableForDevice(size_t(0));
1600  CHECK(hash_table);
1602 
1603  VLOG(1) << "Performing codgen for ManyToMany";
1604  const auto& inner_outer_pair = inner_outer_pairs_[0];
1605  const auto outer_col = inner_outer_pair.second;
1606 
1607  CodeGenerator code_generator(executor_);
1608  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1609  CHECK_EQ(col_lvs.size(), size_t(1));
1610 
1611  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1612  CHECK(outer_col_var);
1613  const auto coords_cd =
1614  Catalog_Namespace::get_metadata_for_column(outer_col_var->getColumnKey());
1615  CHECK(coords_cd);
1616 
1617  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1618  "array_buff",
1619  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1620  {col_lvs.front(), code_generator.posArg(outer_col)});
1621 
1622  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1623  // const auto arr_ptr =
1624  // code_generator.castArrayPointer(array_ptr,
1625  // coords_cd->columnType.get_elem_type());
1626  array_ptr->setName("array_ptr");
1627 
1628  auto num_keys_lv = executor_->cgen_state_->emitExternalCall(
1629  "get_num_buckets_for_bounds",
1630  get_int_type(32, LL_CONTEXT),
1631  {array_ptr,
1632  LL_INT(0),
1633  LL_FP(inverse_bucket_sizes_for_dimension_[0]),
1634  LL_FP(inverse_bucket_sizes_for_dimension_[1])});
1635  num_keys_lv->setName("num_keys_lv");
1636 
1637  return {num_keys_lv, array_ptr};
1638 }
1639 
1641  const CompilationOptions& co,
1642  const size_t index) {
1643  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1644  if (getHashType() == HashType::ManyToMany) {
1645  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1646  const auto key_component_width = getKeyComponentWidth();
1647  CHECK(key_component_width == 4 || key_component_width == 8);
1648  auto many_to_many_args = codegenManyKey(co);
1649  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1650  const auto composite_dict_ptr_type =
1651  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1652  const auto composite_key_dict =
1653  hash_ptr->getType()->isPointerTy()
1654  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1655  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1656  const auto key_component_count = getKeyComponentCount();
1657 
1658  auto one_to_many_ptr = hash_ptr;
1659 
1660  if (one_to_many_ptr->getType()->isPointerTy()) {
1661  one_to_many_ptr =
1662  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1663  } else {
1664  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1665  }
1666 
1667  const auto composite_key_dict_size = offsetBufferOff();
1668  one_to_many_ptr =
1669  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1670 
1671  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1672  // this is likely the maximum value we can do that is safe to use across
1673  // all supported GPU architectures.
1674  const int max_array_size = 200;
1675  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1676  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1677  out_arr_lv->setName("out_arr");
1678 
1679  const auto casted_out_arr_lv =
1680  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1681 
1682  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1683 
1684  auto rowid_ptr_i32 =
1685  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1686 
1687  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1688  "get_candidate_rows",
1689  llvm::Type::getInt64Ty(LL_CONTEXT),
1690  {
1691  rowid_ptr_i32,
1692  LL_INT(max_array_size),
1693  many_to_many_args[1],
1694  LL_INT(0),
1697  many_to_many_args[0],
1698  LL_INT(key_component_count), // key_component_count
1699  composite_key_dict, // ptr to hash table
1700  LL_INT(getEntryCount()), // entry_count
1701  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1702  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1703  });
1704 
1705  const auto slot_lv = LL_INT(int64_t(0));
1706 
1707  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1708  } else {
1709  VLOG(1) << "Building codegenMatchingSet for Baseline";
1710  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1711  const auto key_component_width = getKeyComponentWidth();
1712  CHECK(key_component_width == 4 || key_component_width == 8);
1713  auto key_buff_lv = codegenKey(co);
1715  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1716  const auto composite_dict_ptr_type =
1717  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1718  const auto composite_key_dict =
1719  hash_ptr->getType()->isPointerTy()
1720  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1721  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1722  const auto key_component_count = getKeyComponentCount();
1723  const auto key = executor_->cgen_state_->emitExternalCall(
1724  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1725  get_int_type(64, LL_CONTEXT),
1726  {key_buff_lv,
1727  LL_INT(key_component_count),
1728  composite_key_dict,
1729  LL_INT(getEntryCount())});
1730  auto one_to_many_ptr = hash_ptr;
1731  if (one_to_many_ptr->getType()->isPointerTy()) {
1732  one_to_many_ptr =
1733  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1734  } else {
1735  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1736  }
1737  const auto composite_key_dict_size = offsetBufferOff();
1738  one_to_many_ptr =
1739  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1741  std::vector<llvm::Value*>{
1742  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1743  false,
1744  false,
1745  false,
1747  executor_);
1748  }
1749  UNREACHABLE();
1750  return HashJoinMatchingSet{};
1751 }
1752 
1754  const ExecutorDeviceType device_type,
1755  const int device_id,
1756  bool raw) const {
1757  auto buffer = getJoinHashBuffer(device_type, device_id);
1758  if (!buffer) {
1759  return "EMPTY";
1760  }
1761  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1762  auto hash_table = hash_tables_for_device_[device_id];
1763  CHECK(hash_table);
1764  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1765 #ifdef HAVE_CUDA
1766  std::unique_ptr<int8_t[]> buffer_copy;
1767  if (device_type == ExecutorDeviceType::GPU) {
1768  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1769  CHECK(executor_);
1770  auto data_mgr = executor_->getDataMgr();
1771  auto device_allocator = std::make_unique<CudaAllocator>(
1772  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1773 
1774  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1775  }
1776  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1777 #else
1778  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1779 #endif // HAVE_CUDA
1780  auto ptr2 = ptr1 + offsetBufferOff();
1781  auto ptr3 = ptr1 + countBufferOff();
1782  auto ptr4 = ptr1 + payloadBufferOff();
1783  CHECK(hash_table);
1784  const auto layout = getHashType();
1785  return HashTable::toString(
1786  "geo",
1787  getHashTypeString(layout),
1788  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1790  hash_table->getEntryCount(),
1791  ptr1,
1792  ptr2,
1793  ptr3,
1794  ptr4,
1795  buffer_size,
1796  raw);
1797 }
1798 
1799 std::set<DecodedJoinHashBufferEntry> BoundingBoxIntersectJoinHashTable::toSet(
1800  const ExecutorDeviceType device_type,
1801  const int device_id) const {
1802  auto buffer = getJoinHashBuffer(device_type, device_id);
1803  auto hash_table = getHashTableForDevice(device_id);
1804  CHECK(hash_table);
1805  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1806 #ifdef HAVE_CUDA
1807  std::unique_ptr<int8_t[]> buffer_copy;
1808  if (device_type == ExecutorDeviceType::GPU) {
1809  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1810  CHECK(executor_);
1811  auto data_mgr = executor_->getDataMgr();
1812  auto allocator = std::make_unique<CudaAllocator>(
1813  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1814 
1815  allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1816  }
1817  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1818 #else
1819  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1820 #endif // HAVE_CUDA
1821  auto ptr2 = ptr1 + offsetBufferOff();
1822  auto ptr3 = ptr1 + countBufferOff();
1823  auto ptr4 = ptr1 + payloadBufferOff();
1824  const auto layout = getHashType();
1825  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1827  hash_table->getEntryCount(),
1828  ptr1,
1829  ptr2,
1830  ptr3,
1831  ptr4,
1832  buffer_size);
1833 }
1834 
1836  const std::vector<InnerOuter>& inner_outer_pairs) const {
1839  this->executor_->getDataMgr()->gpusPresent() &&
1842  }
1843  // otherwise, try to build on CPU
1845 }
1846 
1848  try {
1850  } catch (...) {
1851  CHECK(false);
1852  }
1853  return {};
1854 }
1855 
1857  QueryPlanHash key,
1858  CacheItemType item_type,
1859  DeviceIdentifier device_identifier) {
1860  auto timer = DEBUG_TIMER(__func__);
1861  VLOG(1) << "Checking CPU hash table cache.";
1863  HashtableCacheMetaInfo meta_info;
1865  auto cached_hashtable =
1866  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, meta_info);
1867  if (cached_hashtable) {
1868  return cached_hashtable;
1869  }
1870  return nullptr;
1871 }
1872 
1873 std::optional<std::pair<size_t, size_t>>
1875  QueryPlanHash key,
1876  CacheItemType item_type,
1877  DeviceIdentifier device_identifier) {
1879  HashtableCacheMetaInfo metaInfo;
1881  auto cached_hashtable =
1882  hash_table_cache_->getItemFromCache(key, item_type, device_identifier, metaInfo);
1883  if (cached_hashtable) {
1884  return std::make_pair(cached_hashtable->getEntryCount() / 2,
1885  cached_hashtable->getEmittedKeysCount());
1886  }
1887  return std::nullopt;
1888 }
1889 
1891  QueryPlanHash key,
1892  CacheItemType item_type,
1893  std::shared_ptr<HashTable> hashtable_ptr,
1894  DeviceIdentifier device_identifier,
1895  size_t hashtable_building_time) {
1897  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1898  HashtableCacheMetaInfo meta_info;
1900  meta_info.registered_query_hint = query_hints_;
1901  hash_table_cache_->putItemToCache(
1902  key,
1903  hashtable_ptr,
1904  item_type,
1905  device_identifier,
1906  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1907  hashtable_building_time,
1908  meta_info);
1909 }
1910 
1912  return condition_->get_optype() == kBW_EQ;
1913 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:461
std::vector< double > compute_bucket_sizes(const std::vector< double > &bucket_thresholds, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
static std::unique_ptr< BoundingBoxIntersectTuningParamRecycler > auto_tuner_cache_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
size_t DeviceIdentifier
Definition: DataRecycler.h:129
bool tuneOneStep(const TuningState::TuningDirection tuning_direction, const double step_overide)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
void setBoundingBoxIntersectionMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
JoinType
Definition: sqldefs.h:172
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:259
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
llvm::Value * codegenKey(const CompilationOptions &)
shared::TableKey getInnerTableId() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:129
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
double bbox_intersect_keys_per_bin
Definition: QueryHint.h:336
#define LOG(tag)
Definition: Logger.h:285
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:60
void reify(const HashType preferred_layout)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:585
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:377
const std::shared_ptr< Analyzer::BinOper > condition_
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const BaselineHashTableEntryInfo hash_table_entry_info, const bool skip_hashtable_caching)
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
#define CHECK_GE(x, y)
Definition: Logger.h:306
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
Definition: sqldefs.h:48
double g_bbox_intersect_target_entries_per_bin
Definition: Execute.cpp:109
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
void allocateDeviceMemory(const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinType join_type, const BaselineHashTableEntryInfo hash_table_entry_info, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
ExecutorDeviceType
std::string to_string(char const *&&v)
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
std::optional< BoundingBoxIntersectMetaInfo > bbox_intersect_meta_info
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
future< Result > async(Fn &&fn, Args &&...args)
std::optional< BoundingBoxIntersectMetaInfo > getBoundingBoxIntersectMetaInfo()
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< double > correct_uninitialized_bucket_sizes_to_thresholds(const std::vector< double > &bucket_sizes, const std::vector< double > &bucket_thresholds, const double initial_value)
CacheItemType
Definition: DataRecycler.h:38
HashType getHashTableLayout() const
Definition: HashTable.h:53
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
const Data_Namespace::MemoryLevel memory_level_
double bbox_intersect_bucket_threshold
Definition: QueryHint.h:331
static std::unique_ptr< HashtableRecycler > hash_table_cache_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::shared_ptr< BoundingBoxIntersectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const std::vector< InputTableInfo > & query_infos_
static constexpr size_t MAX_NUM_HASH_ENTRIES
Definition: HashJoin.h:136
executor_(executor)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:314
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t table_tuple_count, const Executor *executor)
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBoundingBoxIntersection &info)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t payloadBufferOff() const noexceptoverride
void approximate_distinct_tuples_on_device_bbox_intersect(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const BoundingBoxIntersectKeyHandler *key_handler, const int64_t num_elems)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
TuningState(const size_t bbox_intersect_max_table_size_bytes, const double bbox_intersect_target_entries_per_bin)
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:295
virtual shared::TableKey getInnerTableId() const noexcept=0
#define VLOGGING(n)
Definition: Logger.h:289
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
#define CHECK_LT(x, y)
Definition: Logger.h:303
HashType getHashType() const noexceptoverride
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:179
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:366
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const BaselineHashTableEntryInfo hash_table_entry_info, const JoinType join_type, const Executor *executor, const RegisteredQueryHint &query_hint)
bool operator()(const HashTableProps &new_props, const bool new_bbox_intersect_threshold)
virtual void reifyWithLayout(const HashType layout)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
size_t QueryPlanHash
void approximate_distinct_tuples_bbox_intersect(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
size_t bbox_intersect_max_size
Definition: QueryHint.h:333
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
bool force_baseline_hash_join
Definition: QueryHint.h:342
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3500
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
Definition: sqldefs.h:30
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1015
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
bool bbox_intersect_allow_gpu_build
Definition: QueryHint.h:334
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
ThreadId thread_id_
Definition: Logger.h:138
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
void copyFromDevice(void *host_dst, const void *device_src, const size_t num_bytes) const override
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:880
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:388
bool force_one_to_many_hash_join
Definition: QueryHint.h:343
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:175
size_t g_bbox_intersect_max_table_size_bytes
Definition: Execute.cpp:108
HashTableProps(const size_t entry_count, const size_t emitted_keys_count, const size_t hash_table_size, const std::vector< double > &bucket_sizes)
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:470
size_t getComponentBufferSize() const noexceptoverride