OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RangeJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
19 #include "QueryEngine/Execute.h"
25 
26 // clang-format off
69 // clang-format on
70 
71 std::shared_ptr<RangeJoinHashTable> RangeJoinHashTable::getInstance(
72  const std::shared_ptr<Analyzer::BinOper> condition,
73  const Analyzer::RangeOper* range_expr,
74  const std::vector<InputTableInfo>& query_infos,
75  const Data_Namespace::MemoryLevel memory_level,
76  const JoinType join_type,
77  const int device_count,
78  ColumnCacheMap& column_cache,
79  Executor* executor,
80  const HashTableBuildDagMap& hashtable_build_dag_map,
81  const RegisteredQueryHint& query_hints,
82  const TableIdToNodeMap& table_id_to_node_map) {
83  // the hash table is built over the LHS of the range oper. we then use the lhs
84  // of the bin oper + the rhs of the range oper for the probe
85  auto range_expr_col_var =
86  dynamic_cast<const Analyzer::ColumnVar*>(range_expr->get_left_operand());
87  if (!range_expr_col_var || !range_expr_col_var->get_type_info().is_geometry()) {
88  throw HashJoinFail("Could not build hash tables for range join | " +
89  range_expr->toString());
90  }
91  auto cat = executor->getCatalog();
92  CHECK(cat);
93  CHECK(range_expr_col_var->get_type_info().is_geometry());
94 
95  auto coords_cd = cat->getMetadataForColumn(range_expr_col_var->get_table_id(),
96  range_expr_col_var->get_column_id() + 1);
97  CHECK(coords_cd);
98 
99  auto range_join_inner_col_expr =
100  makeExpr<Analyzer::ColumnVar>(coords_cd->columnType,
101  coords_cd->tableId,
102  coords_cd->columnId,
103  range_expr_col_var->get_rte_idx());
104 
105  std::vector<InnerOuter> inner_outer_pairs;
106  inner_outer_pairs.emplace_back(
107  InnerOuter{dynamic_cast<Analyzer::ColumnVar*>(range_join_inner_col_expr.get()),
108  condition->get_left_operand()});
109 
110  const auto& query_info =
111  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
112  .info;
113 
114  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
115  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
116  throw TooManyHashEntries();
117  }
118 
119  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
121  condition.get(), executor, inner_outer_pairs)
122  : 0;
123 
124  auto join_hash_table = std::make_shared<RangeJoinHashTable>(condition,
125  join_type,
126  range_expr,
127  range_join_inner_col_expr,
128  query_infos,
129  memory_level,
130  column_cache,
131  executor,
132  inner_outer_pairs,
133  device_count,
134  query_hints,
135  hashtable_build_dag_map,
136  table_id_to_node_map);
138  HashJoin::getInnerTableId(inner_outer_pairs), shard_count, executor);
139  try {
140  join_hash_table->reifyWithLayout(HashType::OneToMany);
141  } catch (const HashJoinFail& e) {
142  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
143  "involved in equijoin | ") +
144  e.what());
145  } catch (const ColumnarConversionNotSupported& e) {
146  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
147  e.what());
148  } catch (const JoinHashTableTooBig& e) {
149  throw e;
150  } catch (const std::exception& e) {
151  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
152  << e.what();
153  }
154 
155  return join_hash_table;
156 }
157 
159  auto timer = DEBUG_TIMER(__func__);
160  CHECK(layout == HashType::OneToMany);
161 
162  const auto& query_info =
164  .info;
165 
166  if (query_info.fragments.empty()) {
167  return;
168  }
169 
170  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
171  << "for table_id: " << getInnerTableId();
172 
173  std::vector<ColumnsForDevice> columns_per_device;
174  const auto catalog = executor_->getCatalog();
175  CHECK(catalog);
176 
177  auto& data_mgr = catalog->getDataMgr();
178  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
179  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
180  const auto shard_count = shardCount();
181  for (int device_id = 0; device_id < device_count_; ++device_id) {
182  fragments_per_device.emplace_back(
183  shard_count
184  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
185  : query_info.fragments);
187  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
188  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
189  }
190  // for overlaps join, we need to fetch columns regardless of the availability of
191  // cached hash table to calculate various params, i.e., bucket size info todo
192  // (yoonmin) : relax this
193  const auto columns_for_device =
194  fetchColumnsForDevice(fragments_per_device[device_id],
195  device_id,
197  ? dev_buff_owners[device_id].get()
198  : nullptr);
199  columns_per_device.push_back(columns_for_device);
200  }
201 
203 
204  const auto bucket_range =
205  dynamic_cast<const Analyzer::Constant*>(range_expr_->get_right_operand());
206 
207  CHECK(bucket_range);
208  CHECK(bucket_range->get_type_info().is_fp() &&
209  bucket_range->get_type_info().get_size() == 8); // TODO
210 
211  const auto bucket_range_datum = bucket_range->get_constval();
212 
213  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
214  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
215 
217  inverse_bucket_sizes_for_dimension_, columns_per_device, device_count_);
218 
220 
221  // to properly lookup cached hash table, we need to use join columns listed as lhs and
222  // rhs of the overlaps op instead of physical (and hidden) column tailored to range join
223  // expr in other words, we need to use geometry column (point) instead of its hidden
224  // array column i.e., see `get_physical_cols` function
225  std::vector<InnerOuter> inner_outer_pairs_for_cache_lookup;
226  inner_outer_pairs_for_cache_lookup.emplace_back(InnerOuter{
227  dynamic_cast<const Analyzer::ColumnVar*>(range_expr_->get_left_operand()),
228  condition_->get_left_operand()});
229  auto hashtable_access_path_info =
230  HashtableRecycler::getHashtableAccessPathInfo(inner_outer_pairs_for_cache_lookup,
231  {},
232  condition_->get_optype(),
233  join_type_,
236  shard_count,
237  fragments_per_device,
238  executor_);
239  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
240  table_keys_ = hashtable_access_path_info.table_keys;
241 
242  auto get_inner_table_id = [&inner_outer_pairs_for_cache_lookup]() {
243  return inner_outer_pairs_for_cache_lookup.front().first->get_table_id();
244  };
245 
246  if (table_keys_.empty()) {
249  executor_->getCatalog()->getDatabaseId(),
250  get_inner_table_id());
251  }
252  CHECK(!table_keys_.empty());
253 
259  fragments_per_device,
260  device_count_);
261 
262  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
263  get_inner_table_id() > 0) {
264  std::vector<size_t> per_device_chunk_key;
265  for (int device_id = 0; device_id < device_count_; ++device_id) {
266  auto chunk_key_hash = boost::hash_value(composite_key_info_.cache_key_chunks);
267  boost::hash_combine(chunk_key_hash,
268  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
269  per_device_chunk_key.push_back(chunk_key_hash);
271  inner_outer_pairs_for_cache_lookup,
272  columns_per_device.front().join_columns.front().num_elems,
273  chunk_key_hash,
274  condition_->get_optype(),
277  {}};
278  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
279  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
280  table_keys_);
281  }
282  }
283 
285  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
286  if (auto generic_hash_table =
287  initHashTableOnCpuFromCache(hashtable_cache_key_.front(),
290  if (auto hash_table =
291  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
292  // See if a hash table of a different layout was returned.
293  // If it was OneToMany, we can reuse it on ManyToMany.
294  if (layout == HashType::ManyToMany &&
295  hash_table->getLayout() == HashType::OneToMany) {
296  // use the cached hash table
298  }
299 
301 #ifdef HAVE_CUDA
302  for (int device_id = 0; device_id < device_count_; ++device_id) {
303  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table,
304  layout,
305  hash_table->getEntryCount(),
306  hash_table->getEmittedKeysCount(),
307  device_id);
308  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
309  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
310  }
311 #else
312  UNREACHABLE();
313 #endif
314  } else {
316  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
317  // do not move hash_table to keep valid ptr of it within the hash table recycler
318  hash_tables_for_device_[0] = hash_table;
319  }
320  return;
321  }
322  }
323  }
324 
325  auto [entry_count, emitted_keys_count] =
326  computeRangeHashTableCounts(shard_count, columns_per_device);
327 
328  size_t hash_table_size = OverlapsJoinHashTable::calculateHashTableSize(
329  inverse_bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
330 
331  VLOG(1) << "Finalized range join hash table: entry count " << entry_count
332  << " hash table size " << hash_table_size;
333 
334  std::vector<std::future<void>> init_threads;
335  for (int device_id = 0; device_id < device_count_; ++device_id) {
336  init_threads.push_back(
339  this,
340  /* columns_for_device */ columns_per_device[device_id],
341  /* layout_type */ layout,
342  /* entry_count */ entry_count,
343  /* emitted_keys_count */ emitted_keys_count,
344  /* device_id */ device_id,
345  /* parent_thread_local_ids */ logger::thread_local_ids()));
346  }
347  for (auto& init_thread : init_threads) {
348  init_thread.wait();
349  }
350  for (auto& init_thread : init_threads) {
351  init_thread.get();
352  }
353 }
354 
356  const ColumnsForDevice& columns_for_device,
357  const HashType layout,
358  const size_t entry_count,
359  const size_t emitted_keys_count,
360  const int device_id,
361  const logger::ThreadLocalIds parent_thread_local_ids) {
362  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
363  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
364  CHECK_EQ(getKeyComponentWidth(), size_t(8));
366 
368  VLOG(1) << "Building range join hash table on CPU.";
369  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
370  columns_for_device.join_column_types,
371  columns_for_device.join_buckets,
372  layout,
373  entry_count,
374  emitted_keys_count);
375  CHECK(hash_table);
376 
377 #ifdef HAVE_CUDA
379  auto gpu_hash_table = copyCpuHashTableToGpu(
380  hash_table, layout, entry_count, emitted_keys_count, device_id);
381  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
382  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
383  } else {
384 #else
386 #endif
387  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
388  hash_tables_for_device_[0] = std::move(hash_table);
389 #ifdef HAVE_CUDA
390  }
391 #endif
392  } else {
393 #ifdef HAVE_CUDA
394  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
395  columns_for_device.join_column_types,
396  columns_for_device.join_buckets,
397  layout,
398  entry_count,
399  emitted_keys_count,
400  device_id);
401  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
402  hash_tables_for_device_[device_id] = std::move(hash_table);
403 #else
404  UNREACHABLE();
405 #endif
406  }
407 }
408 // #endif
409 
410 #ifdef HAVE_CUDA
411 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnGpu(
412  const std::vector<JoinColumn>& join_columns,
413  const std::vector<JoinColumnTypeInfo>& join_column_types,
414  const std::vector<JoinBucketInfo>& join_bucket_info,
415  const HashType layout,
416  const size_t entry_count,
417  const size_t emitted_keys_count,
418  const size_t device_id) {
420 
421  VLOG(1) << "Building range join hash table on GPU.";
422 
424  auto data_mgr = executor_->getDataMgr();
425  CudaAllocator allocator(
426  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
427  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
428  CHECK_EQ(join_columns.size(), 1u);
429  CHECK(!join_bucket_info.empty());
430 
431  auto& inverse_bucket_sizes_for_dimension =
432  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
433 
434  auto bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
435  inverse_bucket_sizes_for_dimension, allocator);
436 
437  const auto key_handler = RangeKeyHandler(isInnerColCompressed(),
438  inverse_bucket_sizes_for_dimension.size(),
439  join_columns_gpu,
440  bucket_sizes_gpu);
441 
442  const auto err = builder.initHashTableOnGpu(&key_handler,
443  join_columns,
444  layout,
445  join_type_,
448  entry_count,
449  emitted_keys_count,
450  device_id,
451  executor_,
452  query_hints_);
453  if (err) {
454  throw HashJoinFail(
455  std::string("Unrecognized error when initializing GPU range join hash table (") +
456  std::to_string(err) + std::string(")"));
457  }
458  return builder.getHashTable();
459 }
460 #endif
461 
462 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnCpu(
463  const std::vector<JoinColumn>& join_columns,
464  const std::vector<JoinColumnTypeInfo>& join_column_types,
465  const std::vector<JoinBucketInfo>& join_bucket_info,
466  const HashType layout,
467  const size_t entry_count,
468  const size_t emitted_keys_count) {
469  auto timer = DEBUG_TIMER(__func__);
470  decltype(std::chrono::steady_clock::now()) ts1, ts2;
471  ts1 = std::chrono::steady_clock::now();
472  const auto composite_key_info =
474  CHECK(!join_columns.empty());
475  CHECK(!join_bucket_info.empty());
476 
478  const auto key_component_count =
479  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
480 
481  auto key_handler =
483  key_component_count,
484  &join_columns[0],
485  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
486 
489  dummy_str_proxy_translation_maps_ptrs_and_offsets;
490  const auto err =
491  builder.initHashTableOnCpu(&key_handler,
492  composite_key_info,
493  join_columns,
494  join_column_types,
495  join_bucket_info,
496  dummy_str_proxy_translation_maps_ptrs_and_offsets,
497  entry_count,
498  emitted_keys_count,
499  layout,
500  join_type_,
503  query_hints_);
504  ts2 = std::chrono::steady_clock::now();
505  if (err) {
506  throw HashJoinFail(std::string("Unrecognized error when initializing CPU "
507  "range join hash table (") +
508  std::to_string(err) + std::string(")"));
509  }
510  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
511  auto hashtable_build_time =
512  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
515  hash_table,
517  hashtable_build_time);
518  return hash_table;
519 }
520 
522  const size_t shard_count,
523  std::vector<ColumnsForDevice>& columns_per_device) {
525  const auto [tuple_count, emitted_keys_count] =
527  columns_per_device,
530  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
531 
532  return std::make_pair(
533  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
534  emitted_keys_count);
535 }
536 
538  const std::vector<double>& inverse_bucket_sizes_for_dimension,
539  std::vector<ColumnsForDevice>& columns_per_device,
540  const size_t chosen_max_hashtable_size,
541  const double chosen_bucket_threshold) {
542 #ifdef _WIN32
543  // WIN32 needs have C++20 set for designated initialisation to work
544  CountDistinctDescriptor count_distinct_desc{
546  0,
547  11,
548  true,
552  1,
553  };
554 #else
555  CountDistinctDescriptor count_distinct_desc{
557  .min_val = 0,
558  .bitmap_sz_bits = 11,
559  .approximate = true,
563  .sub_bitmap_count = 1,
564  };
565 #endif
566  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
567 
568  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
569  if (columns_per_device.front().join_columns.front().num_elems == 0) {
570  return std::make_pair(0, 0);
571  }
572 
573  for (auto& columns_for_device : columns_per_device) {
574  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
576  }
577 
578  // Number of keys must match dimension of buckets
579  CHECK_EQ(columns_per_device.front().join_columns.size(),
580  columns_per_device.front().join_buckets.size());
582  const auto composite_key_info =
584  int thread_count = cpu_threads();
585  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
586  auto hll_result = &hll_buffer_all_cpus[0];
587 
588  std::vector<int32_t> num_keys_for_row;
589  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
590 
592  num_keys_for_row,
593  count_distinct_desc.bitmap_sz_bits,
594  padded_size_bytes,
595  columns_per_device.front().join_columns,
596  columns_per_device.front().join_column_types,
597  columns_per_device.front().join_buckets,
599  thread_count);
600 
601  for (int i = 1; i < thread_count; ++i) {
602  hll_unify(hll_result,
603  hll_result + i * padded_size_bytes,
604  1 << count_distinct_desc.bitmap_sz_bits);
605  }
606  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
607  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
608  }
609 #ifdef HAVE_CUDA
610  auto& data_mgr = executor_->getCatalog()->getDataMgr();
611  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
612  for (auto& host_hll_buffer : host_hll_buffers) {
613  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
614  }
615  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
616  std::vector<std::future<void>> approximate_distinct_device_threads;
617  for (int device_id = 0; device_id < device_count_; ++device_id) {
618  approximate_distinct_device_threads.emplace_back(std::async(
620  [device_id,
621  &columns_per_device,
622  &count_distinct_desc,
623  &data_mgr,
624  &host_hll_buffers,
625  &emitted_keys_count_device_threads,
626  this] {
627  auto allocator = std::make_unique<CudaAllocator>(
628  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
629  auto device_hll_buffer =
630  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
631  data_mgr.getCudaMgr()->zeroDeviceMem(
632  device_hll_buffer,
633  count_distinct_desc.bitmapPaddedSizeBytes(),
634  device_id,
636  const auto& columns_for_device = columns_per_device[device_id];
637  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
638  columns_for_device.join_columns, *allocator);
639 
640  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
641  const auto& bucket_sizes_for_dimension =
642  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
643  auto bucket_sizes_gpu =
644  allocator->alloc(bucket_sizes_for_dimension.size() * sizeof(double));
645  allocator->copyToDevice(bucket_sizes_gpu,
646  bucket_sizes_for_dimension.data(),
647  bucket_sizes_for_dimension.size() * sizeof(double));
648  const size_t row_counts_buffer_sz =
649  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
650  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
651  data_mgr.getCudaMgr()->zeroDeviceMem(
652  row_counts_buffer,
653  row_counts_buffer_sz,
654  device_id,
656  const auto key_handler =
658  bucket_sizes_for_dimension.size(),
659  join_columns_gpu,
660  reinterpret_cast<double*>(bucket_sizes_gpu));
661  const auto key_handler_gpu =
662  transfer_flat_object_to_gpu(key_handler, *allocator);
664  reinterpret_cast<uint8_t*>(device_hll_buffer),
665  count_distinct_desc.bitmap_sz_bits,
666  reinterpret_cast<int32_t*>(row_counts_buffer),
667  key_handler_gpu,
668  columns_for_device.join_columns[0].num_elems,
669  executor_->blockSize(),
670  executor_->gridSize());
671 
672  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
673  allocator->copyFromDevice(
674  &host_emitted_keys_count,
675  row_counts_buffer +
676  (columns_per_device.front().join_columns[0].num_elems - 1) *
677  sizeof(int32_t),
678  sizeof(int32_t));
679 
680  auto& host_hll_buffer = host_hll_buffers[device_id];
681  allocator->copyFromDevice(&host_hll_buffer[0],
682  device_hll_buffer,
683  count_distinct_desc.bitmapPaddedSizeBytes());
684  }));
685  }
686  for (auto& child : approximate_distinct_device_threads) {
687  child.get();
688  }
690  auto& result_hll_buffer = host_hll_buffers.front();
691  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
692  for (int device_id = 1; device_id < device_count_; ++device_id) {
693  auto& host_hll_buffer = host_hll_buffers[device_id];
694  hll_unify(hll_result,
695  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
696  1 << count_distinct_desc.bitmap_sz_bits);
697  }
698  size_t emitted_keys_count = 0;
699  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
700  emitted_keys_count += emitted_keys_count_device;
701  }
702  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
703  emitted_keys_count);
704 #else
705  UNREACHABLE();
706  return {0, 0};
707 #endif // HAVE_CUDA
708 }
709 
710 #define LL_CONTEXT executor_->cgen_state_->context_
711 #define LL_BUILDER executor_->cgen_state_->ir_builder_
712 #define LL_INT(v) executor_->cgen_state_->llInt(v)
713 #define LL_FP(v) executor_->cgen_state_->llFp(v)
714 #define ROW_FUNC executor_->cgen_state_->row_func_
715 
717  llvm::Value* offset_ptr) {
718  const auto key_component_width = getKeyComponentWidth();
719  CHECK(key_component_width == 4 || key_component_width == 8);
720  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
721  llvm::Value* key_buff_lv{nullptr};
722  switch (key_component_width) {
723  case 4:
724  key_buff_lv =
725  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
726  break;
727  case 8:
728  key_buff_lv =
729  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
730  break;
731  default:
732  CHECK(false);
733  }
734 
735  const auto& inner_outer_pair = inner_outer_pairs_[0];
736  const auto outer_col = inner_outer_pair.second;
737  const auto outer_col_ti = outer_col->get_type_info();
738 
739  if (outer_col_ti.is_geometry()) {
740  CodeGenerator code_generator(executor_);
741  // TODO(adb): for points we will use the coords array, but for other
742  // geometries we will need to use the bounding box. For now only support
743  // points.
744  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
745  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
746 
747  llvm::Value* arr_ptr{nullptr};
748  // prepare point column (arr) ptr to generate code for hash table key
749  if (auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col)) {
750  const auto col_lvs = code_generator.codegen(outer_col, true, co);
751  CHECK_EQ(col_lvs.size(), size_t(1));
752  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
753  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
754  CHECK(coords_cd);
755  const auto coords_ti = coords_cd->columnType;
756 
757  const auto array_buff_ptr = executor_->cgen_state_->emitExternalCall(
758  "array_buff",
759  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
760  {col_lvs.front(), code_generator.posArg(outer_col)});
761  CHECK(array_buff_ptr);
762  CHECK(coords_ti.get_elem_type().get_type() == kTINYINT)
763  << "Only TINYINT coordinates columns are supported in geo overlaps "
764  "hash join.";
765  arr_ptr =
766  code_generator.castArrayPointer(array_buff_ptr, coords_ti.get_elem_type());
767  } else if (auto geo_expr_outer_col =
768  dynamic_cast<const Analyzer::GeoOperator*>(outer_col)) {
769  const auto geo_expr_name = geo_expr_outer_col->getName();
770  if (func_resolve(geo_expr_name, "ST_Point"sv, "ST_Transform"sv, "ST_Centroid"sv)) {
771  // note that ST_SetSRID changes type info of the column, and is handled by
772  // translation phase, so when we use ST_SETSRID(ST_POINT(x, y), 4326)
773  // as a join column expression, we recognize it as ST_POINT (with SRID as 4326)
774  const auto col_lvs = code_generator.codegen(outer_col, true, co);
775  // listed functions keep point coordinates in the local variable (let say S)
776  // which is corresponding to the pointer that col_lvs[0] holds
777  // thus, all we need is to retrieve necessary coordinate from the S by varying
778  // its offset (i.e., i == 0 means x coordinate)
779  arr_ptr = LL_BUILDER.CreatePointerCast(
780  col_lvs[0], llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_));
781  } else {
782  throw std::runtime_error(
783  "RHS key of the range join operator has a geospatial function which is not "
784  "supported yet: " +
785  geo_expr_name);
786  }
787  } else {
788  throw std::runtime_error("Range join operator has an invalid rhs key: " +
789  outer_col->toString());
790  }
791 
792  // load and unpack offsets
793  const auto offset =
794  LL_BUILDER.CreateLoad(offset_ptr->getType()->getPointerElementType(),
795  offset_ptr,
796  "packed_bucket_offset");
797  const auto x_offset =
798  LL_BUILDER.CreateTrunc(offset, llvm::Type::getInt32Ty(LL_CONTEXT));
799 
800  const auto y_offset_shifted =
801  LL_BUILDER.CreateLShr(offset, LL_INT(static_cast<int64_t>(32)));
802  const auto y_offset =
803  LL_BUILDER.CreateTrunc(y_offset_shifted, llvm::Type::getInt32Ty(LL_CONTEXT));
804 
805  const auto x_bucket_offset =
806  LL_BUILDER.CreateSExt(x_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
807  const auto y_bucket_offset =
808  LL_BUILDER.CreateSExt(y_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
809 
810  for (size_t i = 0; i < 2; i++) {
811  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
812  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
813  key_buff_lv,
814  LL_INT(i));
815 
816  const auto funcName = isProbeCompressed() ? "get_bucket_key_for_range_compressed"
817  : "get_bucket_key_for_range_double";
818 
819  // Note that get_bucket_key_for_range_compressed will need to be
820  // specialized for future compression schemes
821  auto bucket_key = executor_->cgen_state_->emitExternalCall(
822  funcName,
825 
826  auto bucket_key_shifted = i == 0
827  ? LL_BUILDER.CreateAdd(x_bucket_offset, bucket_key)
828  : LL_BUILDER.CreateAdd(y_bucket_offset, bucket_key);
829 
830  const auto col_lv = LL_BUILDER.CreateSExt(
831  bucket_key_shifted, get_int_type(key_component_width * 8, LL_CONTEXT));
832  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
833  }
834  } else {
835  LOG(FATAL) << "Range join key currently only supported for geospatial types.";
836  }
837  return key_buff_lv;
838 }
839 
841  const CompilationOptions& co,
842  const size_t index,
843  llvm::Value* range_offset) {
844  const auto key_component_width = getKeyComponentWidth();
845  CHECK(key_component_width == 4 || key_component_width == 8);
846 
847  auto key_buff_lv = codegenKey(co, range_offset);
849 
850  auto hash_ptr = codegenHashTableLoad(index, executor_);
851  const auto composite_dict_ptr_type =
852  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
853 
854  const auto composite_key_dict =
855  hash_ptr->getType()->isPointerTy()
856  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
857  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
858 
859  const auto key_component_count = getKeyComponentCount();
860 
861  const auto funcName =
862  "get_composite_key_index_" + std::to_string(key_component_width * 8);
863 
864  const auto key = executor_->cgen_state_->emitExternalCall(funcName,
866  {key_buff_lv,
867  LL_INT(key_component_count),
868  composite_key_dict,
869  LL_INT(getEntryCount())});
870 
871  auto one_to_many_ptr = hash_ptr;
872  if (one_to_many_ptr->getType()->isPointerTy()) {
873  one_to_many_ptr =
874  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
875  } else {
876  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
877  }
878  const auto composite_key_dict_size = offsetBufferOff();
879  one_to_many_ptr =
880  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
881 
883  /* hash_join_idx_args_in */ {one_to_many_ptr,
884  key,
885  LL_INT(int64_t(0)),
886  LL_INT(getEntryCount() - 1)},
887  /* is_sharded */ false,
888  /* is_nullable */ false,
889  /* is_bw_eq */ false,
890  /* sub_buff_size */ getComponentBufferSize(),
891  /* executor */ executor_);
892 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:451
#define LL_INT(v)
#define CHECK_EQ(x, y)
Definition: Logger.h:297
auto func_resolve
int getInnerTableId() const noexceptoverride
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:164
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
llvm::Value * codegenKey(const CompilationOptions &co, llvm::Value *offset)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
HashType getHashType() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:129
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:283
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:582
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:361
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:333
void reifyWithLayout(const HashType layout) override
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:412
CountDistinctImplType impl_type_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:301
#define LL_CONTEXT
const Expr * get_left_operand() const
Definition: Analyzer.h:548
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
#define LL_FP(v)
const std::shared_ptr< Analyzer::BinOper > condition_
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
future< Result > async(Fn &&fn, Args &&...args)
std::string toString() const override
Definition: Analyzer.cpp:2732
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count)
RegisteredQueryHint query_hints_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold) override
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
HashTableBuildDagMap hashtable_build_dag_map_
std::pair< size_t, size_t > computeRangeHashTableCounts(const size_t shard_count, std::vector< ColumnsForDevice > &columns_per_device)
std::vector< QueryPlanHash > hashtable_cache_key_
virtual int getInnerTableId() const noexcept=0
HashJoinMatchingSet codegenMatchingSetWithOffset(const CompilationOptions &, const size_t, llvm::Value *)
const double bucket_threshold_
const Expr * get_right_operand() const
Definition: Analyzer.h:549
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:776
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:299
std::optional< HashType > layout_override_
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
bool isProbeCompressed() const
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:164
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, int db_id, int inner_table_id)
Definition: DataRecycler.h:154
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:531
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t offsetBufferOff() const noexceptoverride
bool isInnerColCompressed() const
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define CHECK(condition)
Definition: Logger.h:289
#define DEBUG_TIMER(name)
Definition: Logger.h:407
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
Data_Namespace::MemoryLevel effective_memory_level_
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
ThreadId thread_id_
Definition: Logger.h:136
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
const Analyzer::RangeOper * range_expr_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
CompositeKeyInfo composite_key_info_
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:873
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:383
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:160
const size_t max_hashtable_size_
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:460