OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RangeJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "QueryEngine/Execute.h"
26 
27 // clang-format off
70 // clang-format on
71 
72 std::shared_ptr<RangeJoinHashTable> RangeJoinHashTable::getInstance(
73  const std::shared_ptr<Analyzer::BinOper> condition,
74  const Analyzer::RangeOper* range_expr,
75  const std::vector<InputTableInfo>& query_infos,
76  const Data_Namespace::MemoryLevel memory_level,
77  const JoinType join_type,
78  const int device_count,
79  ColumnCacheMap& column_cache,
80  Executor* executor,
81  const HashTableBuildDagMap& hashtable_build_dag_map,
82  const RegisteredQueryHint& query_hint,
83  const TableIdToNodeMap& table_id_to_node_map) {
84  // the hash table is built over the LHS of the range oper. we then use the lhs
85  // of the bin oper + the rhs of the range oper for the probe
86  auto range_expr_col_var =
87  dynamic_cast<const Analyzer::ColumnVar*>(range_expr->get_left_operand());
88  if (!range_expr_col_var || !range_expr_col_var->get_type_info().is_geometry()) {
89  throw HashJoinFail("Could not build hash tables for range join | " +
90  range_expr->toString());
91  }
92  auto cat = executor->getCatalog();
93  CHECK(cat);
94  CHECK(range_expr_col_var->get_type_info().is_geometry());
95 
96  auto coords_cd = cat->getMetadataForColumn(range_expr_col_var->get_table_id(),
97  range_expr_col_var->get_column_id() + 1);
98  CHECK(coords_cd);
99 
100  auto range_join_inner_col_expr =
101  makeExpr<Analyzer::ColumnVar>(coords_cd->columnType,
102  coords_cd->tableId,
103  coords_cd->columnId,
104  range_expr_col_var->get_rte_idx());
105 
106  std::vector<InnerOuter> inner_outer_pairs;
107  inner_outer_pairs.emplace_back(
108  InnerOuter{dynamic_cast<Analyzer::ColumnVar*>(range_join_inner_col_expr.get()),
109  condition->get_left_operand()});
110 
111  const auto& query_info =
112  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
113  .info;
114 
115  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
116  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
117  throw TooManyHashEntries();
118  }
119 
120  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
122  condition.get(), executor, inner_outer_pairs)
123  : 0;
124 
125  auto join_hash_table = std::make_shared<RangeJoinHashTable>(condition,
126  join_type,
127  range_expr,
128  range_join_inner_col_expr,
129  query_infos,
130  memory_level,
131  column_cache,
132  executor,
133  inner_outer_pairs,
134  device_count,
135  hashtable_build_dag_map,
136  table_id_to_node_map);
138  HashJoin::getInnerTableId(inner_outer_pairs), shard_count, executor);
139  try {
140  join_hash_table->reifyWithLayout(HashType::OneToMany);
141  } catch (const HashJoinFail& e) {
142  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
143  "involved in equijoin | ") +
144  e.what());
145  } catch (const ColumnarConversionNotSupported& e) {
146  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
147  e.what());
148  } catch (const std::exception& e) {
149  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
150  << e.what();
151  }
152 
153  return join_hash_table;
154 }
155 
157  auto timer = DEBUG_TIMER(__func__);
158  CHECK(layout == HashType::OneToMany);
159 
160  const auto& query_info =
162  .info;
163 
164  if (query_info.fragments.empty()) {
165  return;
166  }
167 
168  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
169  << "for table_id: " << getInnerTableId();
170 
171  std::vector<ColumnsForDevice> columns_per_device;
172  const auto catalog = executor_->getCatalog();
173  CHECK(catalog);
174 
175  auto& data_mgr = catalog->getDataMgr();
176  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
177  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
178  const auto shard_count = shardCount();
179  for (int device_id = 0; device_id < device_count_; ++device_id) {
180  fragments_per_device.emplace_back(
181  shard_count
182  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
183  : query_info.fragments);
185  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
186  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
187  }
188  // for overlaps join, we need to fetch columns regardless of the availability of
189  // cached hash table to calculate various params, i.e., bucket size info todo
190  // (yoonmin) : relax this
191  const auto columns_for_device =
192  fetchColumnsForDevice(fragments_per_device[device_id],
193  device_id,
195  ? dev_buff_owners[device_id].get()
196  : nullptr);
197  columns_per_device.push_back(columns_for_device);
198  }
199 
201 
202  const auto bucket_range =
203  dynamic_cast<const Analyzer::Constant*>(range_expr_->get_right_operand());
204 
205  CHECK(bucket_range);
206  CHECK(bucket_range->get_type_info().is_fp() &&
207  bucket_range->get_type_info().get_size() == 8); // TODO
208 
209  const auto bucket_range_datum = bucket_range->get_constval();
210 
211  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
212  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
213 
215  inverse_bucket_sizes_for_dimension_, columns_per_device, device_count_);
216 
218 
219  // to properly lookup cached hash table, we need to use join columns listed as lhs and
220  // rhs of the overlaps op instead of physical (and hidden) column tailored to range join
221  // expr in other words, we need to use geometry column (point) instead of its hidden
222  // array column i.e., see `get_physical_cols` function
223  std::vector<InnerOuter> inner_outer_pairs_for_cache_lookup;
224  inner_outer_pairs_for_cache_lookup.emplace_back(InnerOuter{
225  dynamic_cast<const Analyzer::ColumnVar*>(range_expr_->get_left_operand()),
226  condition_->get_left_operand()});
227  auto hashtable_access_path_info =
228  HashtableRecycler::getHashtableAccessPathInfo(inner_outer_pairs_for_cache_lookup,
229  {},
230  condition_->get_optype(),
231  join_type_,
234  shard_count,
235  fragments_per_device,
236  executor_);
237  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
238  table_keys_ = hashtable_access_path_info.table_keys;
244  fragments_per_device,
245  device_count_);
246 
247  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
248  inner_outer_pairs_for_cache_lookup.front().first->get_table_id() > 0) {
249  std::vector<int> alternative_table_key{
252  const auto table_keys =
253  std::unordered_set<size_t>{boost::hash_value(alternative_table_key)};
254  std::vector<size_t> per_device_chunk_key(device_count_);
255 
256  for (int device_id = 0; device_id < device_count_; ++device_id) {
257  auto chunk_key_hash = boost::hash_value(composite_key_info_.cache_key_chunks);
258  boost::hash_combine(chunk_key_hash,
259  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
260  per_device_chunk_key.push_back(chunk_key_hash);
262  inner_outer_pairs_for_cache_lookup,
263  columns_per_device.front().join_columns.front().num_elems,
264  chunk_key_hash,
265  condition_->get_optype(),
268  {}};
269  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
270  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
271  table_keys);
272  }
273  }
274 
276  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
277  if (auto generic_hash_table =
278  initHashTableOnCpuFromCache(hashtable_cache_key_.front(),
281  if (auto hash_table =
282  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
283  // See if a hash table of a different layout was returned.
284  // If it was OneToMany, we can reuse it on ManyToMany.
285  if (layout == HashType::ManyToMany &&
286  hash_table->getLayout() == HashType::OneToMany) {
287  // use the cached hash table
289  }
290 
292 #ifdef HAVE_CUDA
293  for (int device_id = 0; device_id < device_count_; ++device_id) {
294  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table,
295  layout,
296  hash_table->getEntryCount(),
297  hash_table->getEmittedKeysCount(),
298  device_id);
299  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
300  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
301  }
302 #else
303  UNREACHABLE();
304 #endif
305  } else {
307  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
308  // do not move hash_table to keep valid ptr of it within the hash table recycler
309  hash_tables_for_device_[0] = hash_table;
310  }
311  return;
312  }
313  }
314  }
315 
316  auto [entry_count, emitted_keys_count] =
317  computeRangeHashTableCounts(shard_count, columns_per_device);
318 
319  size_t hash_table_size = OverlapsJoinHashTable::calculateHashTableSize(
320  inverse_bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
321 
322  VLOG(1) << "Finalized range join hash table: entry count " << entry_count
323  << " hash table size " << hash_table_size;
324 
325  std::vector<std::future<void>> init_threads;
326  for (int device_id = 0; device_id < device_count_; ++device_id) {
327  init_threads.push_back(
330  this,
331  /* columns_for_device */ columns_per_device[device_id],
332  /* layout_type */ layout,
333  /* entry_count */ entry_count,
334  /* emitted_keys_count */ emitted_keys_count,
335  /* device_id */ device_id,
336  /* parent_thread_id */ logger::thread_id()));
337  }
338  for (auto& init_thread : init_threads) {
339  init_thread.wait();
340  }
341  for (auto& init_thread : init_threads) {
342  init_thread.get();
343  }
344 }
345 
347  const HashType layout,
348  const size_t entry_count,
349  const size_t emitted_keys_count,
350  const int device_id,
351  const logger::ThreadId parent_thread_id) {
352  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
353  CHECK_EQ(getKeyComponentWidth(), size_t(8));
355 
357  VLOG(1) << "Building range join hash table on CPU.";
358  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
359  columns_for_device.join_column_types,
360  columns_for_device.join_buckets,
361  layout,
362  entry_count,
363  emitted_keys_count);
364  CHECK(hash_table);
365 
366 #ifdef HAVE_CUDA
368  auto gpu_hash_table = copyCpuHashTableToGpu(
369  hash_table, layout, entry_count, emitted_keys_count, device_id);
370  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
371  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
372  } else {
373 #else
375 #endif
376  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
377  hash_tables_for_device_[0] = std::move(hash_table);
378 #ifdef HAVE_CUDA
379  }
380 #endif
381  } else {
382 #ifdef HAVE_CUDA
383  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
384  columns_for_device.join_column_types,
385  columns_for_device.join_buckets,
386  layout,
387  entry_count,
388  emitted_keys_count,
389  device_id);
390  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
391  hash_tables_for_device_[device_id] = std::move(hash_table);
392 #else
393  UNREACHABLE();
394 #endif
395  }
396 }
397 // #endif
398 
399 #ifdef HAVE_CUDA
400 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnGpu(
401  const std::vector<JoinColumn>& join_columns,
402  const std::vector<JoinColumnTypeInfo>& join_column_types,
403  const std::vector<JoinBucketInfo>& join_bucket_info,
404  const HashType layout,
405  const size_t entry_count,
406  const size_t emitted_keys_count,
407  const size_t device_id) {
409 
410  VLOG(1) << "Building range join hash table on GPU.";
411 
413  auto data_mgr = executor_->getDataMgr();
414  CudaAllocator allocator(
415  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
416  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
417  CHECK_EQ(join_columns.size(), 1u);
418  CHECK(!join_bucket_info.empty());
419 
420  auto& inverse_bucket_sizes_for_dimension =
421  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
422 
423  auto bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
424  inverse_bucket_sizes_for_dimension, allocator);
425 
426  const auto key_handler = RangeKeyHandler(isInnerColCompressed(),
427  inverse_bucket_sizes_for_dimension.size(),
428  join_columns_gpu,
429  bucket_sizes_gpu);
430 
431  const auto err = builder.initHashTableOnGpu(&key_handler,
432  join_columns,
433  layout,
434  join_type_,
437  entry_count,
438  emitted_keys_count,
439  device_id,
440  executor_);
441  if (err) {
442  throw HashJoinFail(
443  std::string("Unrecognized error when initializing GPU range join hash table (") +
444  std::to_string(err) + std::string(")"));
445  }
446  return builder.getHashTable();
447 }
448 #endif
449 
450 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnCpu(
451  const std::vector<JoinColumn>& join_columns,
452  const std::vector<JoinColumnTypeInfo>& join_column_types,
453  const std::vector<JoinBucketInfo>& join_bucket_info,
454  const HashType layout,
455  const size_t entry_count,
456  const size_t emitted_keys_count) {
457  auto timer = DEBUG_TIMER(__func__);
458  decltype(std::chrono::steady_clock::now()) ts1, ts2;
459  ts1 = std::chrono::steady_clock::now();
460  const auto composite_key_info =
462  CHECK(!join_columns.empty());
463  CHECK(!join_bucket_info.empty());
464 
466  const auto key_component_count =
467  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
468 
469  auto key_handler =
471  key_component_count,
472  &join_columns[0],
473  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
474 
477  dummy_str_proxy_translation_maps_ptrs_and_offsets;
478  const auto err =
479  builder.initHashTableOnCpu(&key_handler,
480  composite_key_info,
481  join_columns,
482  join_column_types,
483  join_bucket_info,
484  dummy_str_proxy_translation_maps_ptrs_and_offsets,
485  entry_count,
486  emitted_keys_count,
487  layout,
488  join_type_,
491  ts2 = std::chrono::steady_clock::now();
492  if (err) {
493  throw HashJoinFail(std::string("Unrecognized error when initializing CPU "
494  "range join hash table (") +
495  std::to_string(err) + std::string(")"));
496  }
497  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
498  auto hashtable_build_time =
499  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
502  hash_table,
504  hashtable_build_time);
505  return hash_table;
506 }
507 
509  const size_t shard_count,
510  std::vector<ColumnsForDevice>& columns_per_device) {
512  const auto [tuple_count, emitted_keys_count] =
514  columns_per_device,
517  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
518 
519  return std::make_pair(
520  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
521  emitted_keys_count);
522 }
523 
525  const std::vector<double>& inverse_bucket_sizes_for_dimension,
526  std::vector<ColumnsForDevice>& columns_per_device,
527  const size_t chosen_max_hashtable_size,
528  const double chosen_bucket_threshold) {
529 #ifdef _WIN32
530  // WIN32 needs have C++20 set for designated initialisation to work
531  CountDistinctDescriptor count_distinct_desc{
533  0,
534  11,
535  true,
539  1,
540  };
541 #else
542  CountDistinctDescriptor count_distinct_desc{
544  .min_val = 0,
545  .bitmap_sz_bits = 11,
546  .approximate = true,
550  .sub_bitmap_count = 1,
551  };
552 #endif
553  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
554 
555  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
556  if (columns_per_device.front().join_columns.front().num_elems == 0) {
557  return std::make_pair(0, 0);
558  }
559 
560  for (auto& columns_for_device : columns_per_device) {
561  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
563  }
564 
565  // Number of keys must match dimension of buckets
566  CHECK_EQ(columns_per_device.front().join_columns.size(),
567  columns_per_device.front().join_buckets.size());
569  const auto composite_key_info =
571  int thread_count = cpu_threads();
572  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
573  auto hll_result = &hll_buffer_all_cpus[0];
574 
575  std::vector<int32_t> num_keys_for_row;
576  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
577 
579  num_keys_for_row,
580  count_distinct_desc.bitmap_sz_bits,
581  padded_size_bytes,
582  columns_per_device.front().join_columns,
583  columns_per_device.front().join_column_types,
584  columns_per_device.front().join_buckets,
586  thread_count);
587 
588  for (int i = 1; i < thread_count; ++i) {
589  hll_unify(hll_result,
590  hll_result + i * padded_size_bytes,
591  1 << count_distinct_desc.bitmap_sz_bits);
592  }
593  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
594  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
595  }
596 #ifdef HAVE_CUDA
597  auto& data_mgr = executor_->getCatalog()->getDataMgr();
598  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
599  for (auto& host_hll_buffer : host_hll_buffers) {
600  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
601  }
602  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
603  std::vector<std::future<void>> approximate_distinct_device_threads;
604  for (int device_id = 0; device_id < device_count_; ++device_id) {
605  approximate_distinct_device_threads.emplace_back(std::async(
607  [device_id,
608  &columns_per_device,
609  &count_distinct_desc,
610  &data_mgr,
611  &host_hll_buffers,
612  &emitted_keys_count_device_threads,
613  this] {
614  auto allocator = std::make_unique<CudaAllocator>(
615  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
616  auto device_hll_buffer =
617  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
618  data_mgr.getCudaMgr()->zeroDeviceMem(
619  device_hll_buffer,
620  count_distinct_desc.bitmapPaddedSizeBytes(),
621  device_id,
623  const auto& columns_for_device = columns_per_device[device_id];
624  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
625  columns_for_device.join_columns, *allocator);
626 
627  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
628  const auto& bucket_sizes_for_dimension =
629  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
630  auto bucket_sizes_gpu =
631  allocator->alloc(bucket_sizes_for_dimension.size() * sizeof(double));
632  allocator->copyToDevice(bucket_sizes_gpu,
633  bucket_sizes_for_dimension.data(),
634  bucket_sizes_for_dimension.size() * sizeof(double));
635  const size_t row_counts_buffer_sz =
636  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
637  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
638  data_mgr.getCudaMgr()->zeroDeviceMem(
639  row_counts_buffer,
640  row_counts_buffer_sz,
641  device_id,
643  const auto key_handler =
645  bucket_sizes_for_dimension.size(),
646  join_columns_gpu,
647  reinterpret_cast<double*>(bucket_sizes_gpu));
648  const auto key_handler_gpu =
649  transfer_flat_object_to_gpu(key_handler, *allocator);
651  reinterpret_cast<uint8_t*>(device_hll_buffer),
652  count_distinct_desc.bitmap_sz_bits,
653  reinterpret_cast<int32_t*>(row_counts_buffer),
654  key_handler_gpu,
655  columns_for_device.join_columns[0].num_elems,
656  executor_->blockSize(),
657  executor_->gridSize());
658 
659  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
660  allocator->copyFromDevice(
661  &host_emitted_keys_count,
662  row_counts_buffer +
663  (columns_per_device.front().join_columns[0].num_elems - 1) *
664  sizeof(int32_t),
665  sizeof(int32_t));
666 
667  auto& host_hll_buffer = host_hll_buffers[device_id];
668  allocator->copyFromDevice(&host_hll_buffer[0],
669  device_hll_buffer,
670  count_distinct_desc.bitmapPaddedSizeBytes());
671  }));
672  }
673  for (auto& child : approximate_distinct_device_threads) {
674  child.get();
675  }
677  auto& result_hll_buffer = host_hll_buffers.front();
678  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
679  for (int device_id = 1; device_id < device_count_; ++device_id) {
680  auto& host_hll_buffer = host_hll_buffers[device_id];
681  hll_unify(hll_result,
682  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
683  1 << count_distinct_desc.bitmap_sz_bits);
684  }
685  size_t emitted_keys_count = 0;
686  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
687  emitted_keys_count += emitted_keys_count_device;
688  }
689  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
690  emitted_keys_count);
691 #else
692  UNREACHABLE();
693  return {0, 0};
694 #endif // HAVE_CUDA
695 }
696 
697 #define LL_CONTEXT executor_->cgen_state_->context_
698 #define LL_BUILDER executor_->cgen_state_->ir_builder_
699 #define LL_INT(v) executor_->cgen_state_->llInt(v)
700 #define LL_FP(v) executor_->cgen_state_->llFp(v)
701 #define ROW_FUNC executor_->cgen_state_->row_func_
702 
704  llvm::Value* offset_ptr) {
705  const auto key_component_width = getKeyComponentWidth();
706  CHECK(key_component_width == 4 || key_component_width == 8);
707  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
708  llvm::Value* key_buff_lv{nullptr};
709  switch (key_component_width) {
710  case 4:
711  key_buff_lv =
712  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
713  break;
714  case 8:
715  key_buff_lv =
716  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
717  break;
718  default:
719  CHECK(false);
720  }
721 
722  const auto& inner_outer_pair = inner_outer_pairs_[0];
723  const auto outer_col = inner_outer_pair.second;
724  const auto outer_col_ti = outer_col->get_type_info();
725 
726  if (outer_col_ti.is_geometry()) {
727  CodeGenerator code_generator(executor_);
728  // TODO(adb): for points we will use the coords array, but for other
729  // geometries we will need to use the bounding box. For now only support
730  // points.
731  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
732  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
733 
734  const auto col_lvs = code_generator.codegen(outer_col, true, co);
735  CHECK_EQ(col_lvs.size(), size_t(1));
736 
737  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
738  CHECK(outer_col_var);
739 
740  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
741  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
742  CHECK(coords_cd);
743 
744  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
745  "array_buff",
746  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
747  {col_lvs.front(), code_generator.posArg(outer_col)});
748  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
749  << "Only TINYINT coordinates columns are supported in geo overlaps "
750  "hash join.";
751 
752  const auto arr_ptr =
753  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
754 
755  // load and unpack offsets
756  const auto offset =
757  LL_BUILDER.CreateLoad(offset_ptr->getType()->getPointerElementType(),
758  offset_ptr,
759  "packed_bucket_offset");
760  const auto x_offset =
761  LL_BUILDER.CreateTrunc(offset, llvm::Type::getInt32Ty(LL_CONTEXT));
762 
763  const auto y_offset_shifted =
764  LL_BUILDER.CreateLShr(offset, LL_INT(static_cast<int64_t>(32)));
765  const auto y_offset =
766  LL_BUILDER.CreateTrunc(y_offset_shifted, llvm::Type::getInt32Ty(LL_CONTEXT));
767 
768  const auto x_bucket_offset =
769  LL_BUILDER.CreateSExt(x_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
770  const auto y_bucket_offset =
771  LL_BUILDER.CreateSExt(y_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
772 
773  for (size_t i = 0; i < 2; i++) {
774  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
775  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
776  key_buff_lv,
777  LL_INT(i));
778 
779  const auto funcName = isProbeCompressed() ? "get_bucket_key_for_range_compressed"
780  : "get_bucket_key_for_range_double";
781 
782  // Note that get_bucket_key_for_range_compressed will need to be
783  // specialized for future compression schemes
784  auto bucket_key = executor_->cgen_state_->emitExternalCall(
785  funcName,
788 
789  auto bucket_key_shifted = i == 0
790  ? LL_BUILDER.CreateAdd(x_bucket_offset, bucket_key)
791  : LL_BUILDER.CreateAdd(y_bucket_offset, bucket_key);
792 
793  const auto col_lv = LL_BUILDER.CreateSExt(
794  bucket_key_shifted, get_int_type(key_component_width * 8, LL_CONTEXT));
795  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
796  }
797  } else {
798  LOG(FATAL) << "Range join key currently only supported for geospatial types.";
799  }
800  return key_buff_lv;
801 }
802 
804  const CompilationOptions& co,
805  const size_t index,
806  llvm::Value* range_offset) {
807  const auto key_component_width = getKeyComponentWidth();
808  CHECK(key_component_width == 4 || key_component_width == 8);
809 
810  auto key_buff_lv = codegenKey(co, range_offset);
812 
813  auto hash_ptr = codegenHashTableLoad(index, executor_);
814  const auto composite_dict_ptr_type =
815  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
816 
817  const auto composite_key_dict =
818  hash_ptr->getType()->isPointerTy()
819  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
820  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
821 
822  const auto key_component_count = getKeyComponentCount();
823 
824  const auto funcName =
825  "get_composite_key_index_" + std::to_string(key_component_width * 8);
826 
827  const auto key = executor_->cgen_state_->emitExternalCall(funcName,
829  {key_buff_lv,
830  LL_INT(key_component_count),
831  composite_key_dict,
832  LL_INT(getEntryCount())});
833 
834  auto one_to_many_ptr = hash_ptr;
835  if (one_to_many_ptr->getType()->isPointerTy()) {
836  one_to_many_ptr =
837  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
838  } else {
839  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
840  }
841  const auto composite_key_dict_size = offsetBufferOff();
842  one_to_many_ptr =
843  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
844 
846  /* hash_join_idx_args_in */ {one_to_many_ptr,
847  key,
848  LL_INT(int64_t(0)),
849  LL_INT(getEntryCount() - 1)},
850  /* is_sharded */ false,
851  /* is_nullable */ false,
852  /* is_bw_eq */ false,
853  /* sub_buff_size */ getComponentBufferSize(),
854  /* executor */ executor_);
855 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:446
#define LL_INT(v)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
int getInnerTableId() const noexceptoverride
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:136
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
llvm::Value * codegenKey(const CompilationOptions &co, llvm::Value *offset)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:95
HashType getHashType() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:119
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:217
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:515
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:351
#define UNREACHABLE()
Definition: Logger.h:267
void reifyWithLayout(const HashType layout) override
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:375
CountDistinctImplType impl_type_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:235
#define LL_CONTEXT
const Expr * get_left_operand() const
Definition: Analyzer.h:546
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
#define LL_FP(v)
const std::shared_ptr< Analyzer::BinOper > condition_
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:101
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
future< Result > async(Fn &&fn, Args &&...args)
std::string toString() const override
Definition: Analyzer.cpp:2669
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count)
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold) override
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
HashTableBuildDagMap hashtable_build_dag_map_
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
std::pair< size_t, size_t > computeRangeHashTableCounts(const size_t shard_count, std::vector< ColumnsForDevice > &columns_per_device)
std::vector< QueryPlanHash > hashtable_cache_key_
virtual int getInnerTableId() const noexcept=0
HashJoinMatchingSet codegenMatchingSetWithOffset(const CompilationOptions &, const size_t, llvm::Value *)
const double bucket_threshold_
const Expr * get_right_operand() const
Definition: Analyzer.h:547
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:771
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:233
std::optional< HashType > layout_override_
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
bool isProbeCompressed() const
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:154
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
uint64_t ThreadId
Definition: Logger.h:363
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t offsetBufferOff() const noexceptoverride
bool isInnerColCompressed() const
ThreadId thread_id()
Definition: Logger.cpp:817
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadId parent_thread_id)
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
Data_Namespace::MemoryLevel effective_memory_level_
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:103
const Analyzer::RangeOper * range_expr_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:24
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
CompositeKeyInfo composite_key_info_
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:100
#define VLOG(n)
Definition: Logger.h:317
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:150
const size_t max_hashtable_size_
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:455