OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RangeJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "QueryEngine/Execute.h"
26 
27 // clang-format off
70 // clang-format on
71 
72 std::shared_ptr<RangeJoinHashTable> RangeJoinHashTable::getInstance(
73  const std::shared_ptr<Analyzer::BinOper> condition,
74  const Analyzer::RangeOper* range_expr,
75  const std::vector<InputTableInfo>& query_infos,
76  const Data_Namespace::MemoryLevel memory_level,
77  const JoinType join_type,
78  const int device_count,
79  ColumnCacheMap& column_cache,
80  Executor* executor,
81  const HashTableBuildDagMap& hashtable_build_dag_map,
82  const RegisteredQueryHint& query_hint,
83  const TableIdToNodeMap& table_id_to_node_map) {
84  // the hash table is built over the LHS of the range oper. we then use the lhs
85  // of the bin oper + the rhs of the range oper for the probe
86  auto range_expr_col_var =
87  dynamic_cast<const Analyzer::ColumnVar*>(range_expr->get_left_operand());
88  if (!range_expr_col_var || !range_expr_col_var->get_type_info().is_geometry()) {
89  throw HashJoinFail("Could not build hash tables for range join | " +
90  range_expr->toString());
91  }
92  auto cat = executor->getCatalog();
93  CHECK(cat);
94  CHECK(range_expr_col_var->get_type_info().is_geometry());
95 
96  auto coords_cd = cat->getMetadataForColumn(range_expr_col_var->get_table_id(),
97  range_expr_col_var->get_column_id() + 1);
98  CHECK(coords_cd);
99 
100  auto range_join_inner_col_expr =
101  makeExpr<Analyzer::ColumnVar>(coords_cd->columnType,
102  coords_cd->tableId,
103  coords_cd->columnId,
104  range_expr_col_var->get_rte_idx());
105 
106  std::vector<InnerOuter> inner_outer_pairs;
107  inner_outer_pairs.emplace_back(
108  InnerOuter{dynamic_cast<Analyzer::ColumnVar*>(range_join_inner_col_expr.get()),
109  condition->get_left_operand()});
110 
111  const auto& query_info =
112  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
113  .info;
114 
115  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
116  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
117  throw TooManyHashEntries();
118  }
119 
120  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
122  condition.get(), executor, inner_outer_pairs)
123  : 0;
124 
125  auto hashtable_cache_key_string =
127  condition->get_optype(),
128  join_type,
129  hashtable_build_dag_map,
130  executor);
131 
132  auto join_hash_table =
133  std::make_shared<RangeJoinHashTable>(condition,
134  join_type,
135  range_expr,
136  range_join_inner_col_expr,
137  query_infos,
138  memory_level,
139  column_cache,
140  executor,
141  inner_outer_pairs,
142  device_count,
143  hashtable_cache_key_string.first,
144  hashtable_cache_key_string.second,
145  hashtable_build_dag_map,
146  table_id_to_node_map);
148  HashJoin::getInnerTableId(inner_outer_pairs), shard_count, executor);
149  try {
150  join_hash_table->reifyWithLayout(HashType::OneToMany);
151  } catch (const HashJoinFail& e) {
152  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
153  "involved in equijoin | ") +
154  e.what());
155  } catch (const ColumnarConversionNotSupported& e) {
156  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
157  e.what());
158  } catch (const std::exception& e) {
159  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
160  << e.what();
161  }
162 
163  return join_hash_table;
164 }
165 
167  auto timer = DEBUG_TIMER(__func__);
168  CHECK(layout == HashType::OneToMany);
169 
170  const auto& query_info =
172  .info;
173 
174  if (query_info.fragments.empty()) {
175  return;
176  }
177 
178  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
179  << "for table_id: " << getInnerTableId();
180 
181  std::vector<ColumnsForDevice> columns_per_device;
182  const auto catalog = executor_->getCatalog();
183  CHECK(catalog);
184 
185  auto& data_mgr = catalog->getDataMgr();
186  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
188  for (int device_id = 0; device_id < device_count_; ++device_id) {
189  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
190  }
191  }
192  const auto shard_count = shardCount();
193  for (int device_id = 0; device_id < device_count_; ++device_id) {
194  const auto fragments =
195  shard_count
196  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
197  : query_info.fragments;
198  const auto columns_for_device =
199  fetchColumnsForDevice(fragments,
200  device_id,
202  ? dev_buff_owners[device_id].get()
203  : nullptr);
204  columns_per_device.push_back(columns_for_device);
205  }
206 
208 
209  const auto bucket_range =
210  dynamic_cast<const Analyzer::Constant*>(range_expr_->get_right_operand());
211 
212  CHECK(bucket_range);
213  CHECK(bucket_range->get_type_info().is_fp() &&
214  bucket_range->get_type_info().get_size() == 8); // TODO
215 
216  const auto bucket_range_datum = bucket_range->get_constval();
217 
218  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
219  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
220 
222  inverse_bucket_sizes_for_dimension_, columns_per_device, device_count_);
223 
224  auto [entry_count, emitted_keys_count] =
225  computeRangeHashTableCounts(shard_count, columns_per_device);
226 
227  size_t hash_table_size = OverlapsJoinHashTable::calculateHashTableSize(
228  inverse_bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
229 
230  VLOG(1) << "Finalized range join hash table: entry count " << entry_count
231  << " hash table size " << hash_table_size;
232 
233  std::vector<std::future<void>> init_threads;
234  for (int device_id = 0; device_id < device_count_; ++device_id) {
235  const auto fragments =
236  shard_count
237  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
238  : query_info.fragments;
239  init_threads.push_back(
242  this,
243  /* columns_for_device */ columns_per_device[device_id],
244  /* layout_type */ layout,
245  /* entry_count */ entry_count,
246  /* emitted_keys_count */ emitted_keys_count,
247  /* device_id */ device_id,
248  /* parent_thread_id */ logger::thread_id()));
249  }
250  for (auto& init_thread : init_threads) {
251  init_thread.wait();
252  }
253  for (auto& init_thread : init_threads) {
254  init_thread.get();
255  }
256 }
257 
259  const HashType layout,
260  const size_t entry_count,
261  const size_t emitted_keys_count,
262  const int device_id,
263  const logger::ThreadId parent_thread_id) {
264  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
265  CHECK_EQ(getKeyComponentWidth(), size_t(8));
267  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
268 
269  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
270  VLOG(1) << "Building range join hash table on CPU.";
271  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
272  columns_for_device.join_column_types,
273  columns_for_device.join_buckets,
274  layout,
275  entry_count,
276  emitted_keys_count);
277  CHECK(hash_table);
278 
279 #ifdef HAVE_CUDA
281  auto gpu_hash_table = copyCpuHashTableToGpu(
282  std::move(hash_table), layout, entry_count, emitted_keys_count, device_id);
283  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
284  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
285  } else {
286 #else
287  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
288 #endif
289  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
290  hash_tables_for_device_[0] = std::move(hash_table);
291 #ifdef HAVE_CUDA
292  }
293 #endif
294  } else {
295 #ifdef HAVE_CUDA
296  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
297  columns_for_device.join_column_types,
298  columns_for_device.join_buckets,
299  layout,
300  entry_count,
301  emitted_keys_count,
302  device_id);
303  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
304  hash_tables_for_device_[device_id] = std::move(hash_table);
305 #else
306  UNREACHABLE();
307 #endif
308  }
309 }
310 // #endif
311 
312 #ifdef HAVE_CUDA
313 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnGpu(
314  const std::vector<JoinColumn>& join_columns,
315  const std::vector<JoinColumnTypeInfo>& join_column_types,
316  const std::vector<JoinBucketInfo>& join_bucket_info,
317  const HashType layout,
318  const size_t entry_count,
319  const size_t emitted_keys_count,
320  const size_t device_id) {
322 
323  VLOG(1) << "Building range join hash table on GPU.";
324 
326  auto data_mgr = executor_->getDataMgr();
327  CudaAllocator allocator(data_mgr, device_id);
328  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
329  CHECK_EQ(join_columns.size(), 1u);
330  CHECK(!join_bucket_info.empty());
331 
332  auto& inverse_bucket_sizes_for_dimension =
333  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
334 
335  auto bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
336  inverse_bucket_sizes_for_dimension, allocator);
337 
338  const auto key_handler = RangeKeyHandler(isInnerColCompressed(),
339  inverse_bucket_sizes_for_dimension.size(),
340  join_columns_gpu,
341  bucket_sizes_gpu);
342 
343  const auto err = builder.initHashTableOnGpu(&key_handler,
344  join_columns,
345  layout,
346  join_type_,
349  entry_count,
350  emitted_keys_count,
351  device_id,
352  executor_);
353  if (err) {
354  throw HashJoinFail(
355  std::string("Unrecognized error when initializing GPU range join hash table (") +
356  std::to_string(err) + std::string(")"));
357  }
358  return builder.getHashTable();
359 }
360 #endif
361 
362 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnCpu(
363  const std::vector<JoinColumn>& join_columns,
364  const std::vector<JoinColumnTypeInfo>& join_column_types,
365  const std::vector<JoinBucketInfo>& join_bucket_info,
366  const HashType layout,
367  const size_t entry_count,
368  const size_t emitted_keys_count) {
369  auto timer = DEBUG_TIMER(__func__);
370  decltype(std::chrono::steady_clock::now()) ts1, ts2;
371  ts1 = std::chrono::steady_clock::now();
372  const auto composite_key_info =
374  CHECK(!join_columns.empty());
375  CHECK(!join_bucket_info.empty());
376 
380 
381  if ((query_plan_dag_.compare(EMPTY_QUERY_PLAN) == 0 ||
383  inner_outer_pairs_.front().first->get_table_id() > 0) {
384  // sometimes we cannot retrieve query plan dag, so try to recycler cache
385  // with the old-passioned cache key if we deal with hashtable of non-temporary table
387  join_columns.front().num_elems,
389  condition_->get_optype(),
394  VLOG(2) << "Use alternative hashtable cache key due to unavailable query plan dag "
395  "extraction (hashtable_cache_key: "
396  << hashtable_cache_key_ << ")";
397  }
398 
399  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
400  if (auto generic_hash_table =
404  if (auto hash_table =
405  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
406  // See if a hash table of a different layout was returned.
407  // If it was OneToMany, we can reuse it on ManyToMany.
408  if (layout == HashType::ManyToMany &&
409  hash_table->getLayout() == HashType::OneToMany) {
410  // use the cached hash table
412  return hash_table;
413  }
414  }
415  }
416 
418  const auto key_component_count =
419  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
420 
421  auto key_handler =
423  key_component_count,
424  &join_columns[0],
425  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
426 
428  const auto err = builder.initHashTableOnCpu(&key_handler,
429  composite_key_info,
430  join_columns,
431  join_column_types,
432  join_bucket_info,
433  entry_count,
434  emitted_keys_count,
435  layout,
436  join_type_,
439  ts2 = std::chrono::steady_clock::now();
440  if (err) {
441  throw HashJoinFail(std::string("Unrecognized error when initializing CPU "
442  "range join hash table (") +
443  std::to_string(err) + std::string(")"));
444  }
445  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
446  auto hashtable_build_time =
447  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
450  hash_table,
452  hashtable_build_time);
453  return hash_table;
454 }
455 
457  const size_t shard_count,
458  std::vector<ColumnsForDevice>& columns_per_device) {
460  const auto [tuple_count, emitted_keys_count] =
462  columns_per_device,
465  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
466 
467  return std::make_pair(
468  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
469  emitted_keys_count);
470 }
471 
473  const std::vector<double>& inverse_bucket_sizes_for_dimension,
474  std::vector<ColumnsForDevice>& columns_per_device,
475  const size_t chosen_max_hashtable_size,
476  const double chosen_bucket_threshold) {
477  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
478 #ifdef _WIN32
479  // WIN32 needs have C++20 set for designated initialisation to work
480  CountDistinctDescriptor count_distinct_desc{
482  0,
483  11,
484  true,
485  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
488  1,
489  };
490 #else
491  CountDistinctDescriptor count_distinct_desc{
493  .min_val = 0,
494  .bitmap_sz_bits = 11,
495  .approximate = true,
496  .device_type = effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
499  .sub_bitmap_count = 1,
500  };
501 #endif
502  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
503 
504  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
505  if (columns_per_device.front().join_columns.front().num_elems == 0) {
506  return std::make_pair(0, 0);
507  }
508 
509  for (auto& columns_for_device : columns_per_device) {
510  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
512  }
513 
514  // Number of keys must match dimension of buckets
515  CHECK_EQ(columns_per_device.front().join_columns.size(),
516  columns_per_device.front().join_buckets.size());
517  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
518  const auto composite_key_info =
520 
521  const auto cached_count_info =
525  if (cached_count_info.has_value() && cached_count_info.value().first) {
526  VLOG(1) << "Using a cached tuple count: " << cached_count_info.value().first
527  << ", emitted keys count: " << cached_count_info.value().second;
528  return std::make_pair(cached_count_info.value().first,
529  cached_count_info.value().second);
530  }
531  int thread_count = cpu_threads();
532  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
533  auto hll_result = &hll_buffer_all_cpus[0];
534 
535  std::vector<int32_t> num_keys_for_row;
536  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
537 
539  num_keys_for_row,
540  count_distinct_desc.bitmap_sz_bits,
541  padded_size_bytes,
542  columns_per_device.front().join_columns,
543  columns_per_device.front().join_column_types,
544  columns_per_device.front().join_buckets,
546  thread_count);
547 
548  for (int i = 1; i < thread_count; ++i) {
549  hll_unify(hll_result,
550  hll_result + i * padded_size_bytes,
551  1 << count_distinct_desc.bitmap_sz_bits);
552  }
553  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
554  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
555  }
556 #ifdef HAVE_CUDA
557  auto& data_mgr = executor_->getCatalog()->getDataMgr();
558  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
559  for (auto& host_hll_buffer : host_hll_buffers) {
560  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
561  }
562  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
563  std::vector<std::future<void>> approximate_distinct_device_threads;
564  for (int device_id = 0; device_id < device_count_; ++device_id) {
565  approximate_distinct_device_threads.emplace_back(std::async(
567  [device_id,
568  &columns_per_device,
569  &count_distinct_desc,
570  &data_mgr,
571  &host_hll_buffers,
572  &emitted_keys_count_device_threads,
573  this] {
574  auto allocator = data_mgr.createGpuAllocator(device_id);
575  auto device_hll_buffer =
576  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
577  data_mgr.getCudaMgr()->zeroDeviceMem(
578  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
579  const auto& columns_for_device = columns_per_device[device_id];
580  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
581  columns_for_device.join_columns, *allocator);
582 
583  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
584  const auto& bucket_sizes_for_dimension =
585  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
586  auto bucket_sizes_gpu =
587  allocator->alloc(bucket_sizes_for_dimension.size() * sizeof(double));
588  allocator->copyToDevice(bucket_sizes_gpu,
589  bucket_sizes_for_dimension.data(),
590  bucket_sizes_for_dimension.size() * sizeof(double));
591  const size_t row_counts_buffer_sz =
592  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
593  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
594  data_mgr.getCudaMgr()->zeroDeviceMem(
595  row_counts_buffer, row_counts_buffer_sz, device_id);
596  const auto key_handler =
598  bucket_sizes_for_dimension.size(),
599  join_columns_gpu,
600  reinterpret_cast<double*>(bucket_sizes_gpu));
601  const auto key_handler_gpu =
602  transfer_flat_object_to_gpu(key_handler, *allocator);
604  reinterpret_cast<uint8_t*>(device_hll_buffer),
605  count_distinct_desc.bitmap_sz_bits,
606  reinterpret_cast<int32_t*>(row_counts_buffer),
607  key_handler_gpu,
608  columns_for_device.join_columns[0].num_elems,
609  executor_->blockSize(),
610  executor_->gridSize());
611 
612  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
613  allocator->copyFromDevice(
614  &host_emitted_keys_count,
615  row_counts_buffer +
616  (columns_per_device.front().join_columns[0].num_elems - 1) *
617  sizeof(int32_t),
618  sizeof(int32_t));
619 
620  auto& host_hll_buffer = host_hll_buffers[device_id];
621  allocator->copyFromDevice(&host_hll_buffer[0],
622  device_hll_buffer,
623  count_distinct_desc.bitmapPaddedSizeBytes());
624  }));
625  }
626  for (auto& child : approximate_distinct_device_threads) {
627  child.get();
628  }
629  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
630  auto& result_hll_buffer = host_hll_buffers.front();
631  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
632  for (int device_id = 1; device_id < device_count_; ++device_id) {
633  auto& host_hll_buffer = host_hll_buffers[device_id];
634  hll_unify(hll_result,
635  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
636  1 << count_distinct_desc.bitmap_sz_bits);
637  }
638  size_t emitted_keys_count = 0;
639  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
640  emitted_keys_count += emitted_keys_count_device;
641  }
642  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
643  emitted_keys_count);
644 #else
645  UNREACHABLE();
646  return {0, 0};
647 #endif // HAVE_CUDA
648 }
649 
650 #define LL_CONTEXT executor_->cgen_state_->context_
651 #define LL_BUILDER executor_->cgen_state_->ir_builder_
652 #define LL_INT(v) executor_->cgen_state_->llInt(v)
653 #define LL_FP(v) executor_->cgen_state_->llFp(v)
654 #define ROW_FUNC executor_->cgen_state_->row_func_
655 
657  llvm::Value* offset_ptr) {
658  const auto key_component_width = getKeyComponentWidth();
659  CHECK(key_component_width == 4 || key_component_width == 8);
660  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
661  llvm::Value* key_buff_lv{nullptr};
662  switch (key_component_width) {
663  case 4:
664  key_buff_lv =
665  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
666  break;
667  case 8:
668  key_buff_lv =
669  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
670  break;
671  default:
672  CHECK(false);
673  }
674 
675  const auto& inner_outer_pair = inner_outer_pairs_[0];
676  const auto outer_col = inner_outer_pair.second;
677  const auto outer_col_ti = outer_col->get_type_info();
678 
679  if (outer_col_ti.is_geometry()) {
680  CodeGenerator code_generator(executor_);
681  // TODO(adb): for points we will use the coords array, but for other
682  // geometries we will need to use the bounding box. For now only support
683  // points.
684  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
685  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
686 
687  const auto col_lvs = code_generator.codegen(outer_col, true, co);
688  CHECK_EQ(col_lvs.size(), size_t(1));
689 
690  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
691  CHECK(outer_col_var);
692 
693  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
694  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
695  CHECK(coords_cd);
696 
697  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
698  "array_buff",
699  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
700  {col_lvs.front(), code_generator.posArg(outer_col)});
701  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
702  << "Only TINYINT coordinates columns are supported in geo overlaps "
703  "hash join.";
704 
705  const auto arr_ptr =
706  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
707 
708  // load and unpack offsets
709  const auto offset = LL_BUILDER.CreateLoad(offset_ptr, "packed_bucket_offset");
710  const auto x_offset =
711  LL_BUILDER.CreateTrunc(offset, llvm::Type::getInt32Ty(LL_CONTEXT));
712 
713  const auto y_offset_shifted =
714  LL_BUILDER.CreateLShr(offset, LL_INT(static_cast<int64_t>(32)));
715  const auto y_offset =
716  LL_BUILDER.CreateTrunc(y_offset_shifted, llvm::Type::getInt32Ty(LL_CONTEXT));
717 
718  const auto x_bucket_offset =
719  LL_BUILDER.CreateSExt(x_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
720  const auto y_bucket_offset =
721  LL_BUILDER.CreateSExt(y_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
722 
723  for (size_t i = 0; i < 2; i++) {
724  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
725 
726  const auto funcName = isProbeCompressed() ? "get_bucket_key_for_range_compressed"
727  : "get_bucket_key_for_range_double";
728 
729  // Note that get_bucket_key_for_range_compressed will need to be
730  // specialized for future compression schemes
731  auto bucket_key = executor_->cgen_state_->emitExternalCall(
732  funcName,
735 
736  auto bucket_key_shifted = i == 0
737  ? LL_BUILDER.CreateAdd(x_bucket_offset, bucket_key)
738  : LL_BUILDER.CreateAdd(y_bucket_offset, bucket_key);
739 
740  const auto col_lv = LL_BUILDER.CreateSExt(
741  bucket_key_shifted, get_int_type(key_component_width * 8, LL_CONTEXT));
742  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
743  }
744  } else {
745  LOG(FATAL) << "Range join key currently only supported for geospatial types.";
746  }
747  return key_buff_lv;
748 }
749 
751  const CompilationOptions& co,
752  const size_t index,
753  llvm::Value* range_offset) {
754  const auto key_component_width = getKeyComponentWidth();
755  CHECK(key_component_width == 4 || key_component_width == 8);
756 
757  auto key_buff_lv = codegenKey(co, range_offset);
759 
760  auto hash_ptr = codegenHashTableLoad(index, executor_);
761  const auto composite_dict_ptr_type =
762  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
763 
764  const auto composite_key_dict =
765  hash_ptr->getType()->isPointerTy()
766  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
767  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
768 
769  const auto key_component_count = getKeyComponentCount();
770 
771  const auto funcName =
772  "get_composite_key_index_" + std::to_string(key_component_width * 8);
773 
774  const auto key = executor_->cgen_state_->emitExternalCall(funcName,
776  {key_buff_lv,
777  LL_INT(key_component_count),
778  composite_key_dict,
779  LL_INT(getEntryCount())});
780 
781  auto one_to_many_ptr = hash_ptr;
782  if (one_to_many_ptr->getType()->isPointerTy()) {
783  one_to_many_ptr =
784  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
785  } else {
786  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
787  }
788  const auto composite_key_dict_size = offsetBufferOff();
789  one_to_many_ptr =
790  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
791 
793  /* hash_join_idx_args_in */ {one_to_many_ptr,
794  key,
795  LL_INT(int64_t(0)),
796  LL_INT(getEntryCount() - 1)},
797  /* is_sharded */ false,
798  /* is_nullable */ false,
799  /* is_bw_eq */ false,
800  /* sub_buff_size */ getComponentBufferSize(),
801  /* executor */ executor_);
802 }
#define LL_INT(v)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int getInnerTableId() const noexceptoverride
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:108
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
llvm::Value * codegenKey(const CompilationOptions &co, llvm::Value *offset)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
HashType getHashType() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:99
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold)
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:203
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:512
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:294
#define UNREACHABLE()
Definition: Logger.h:253
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
void reifyWithLayout(const HashType layout) override
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:357
CountDistinctImplType impl_type_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:221
#define LL_CONTEXT
const Expr * get_left_operand() const
Definition: Analyzer.h:538
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
#define LL_FP(v)
static std::pair< QueryPlan, HashtableCacheMetaInfo > getHashtableKeyString(const std::vector< InnerOuter > &inner_outer_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, Executor *executor)
const std::shared_ptr< Analyzer::BinOper > condition_
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:81
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
future< Result > async(Fn &&fn, Args &&...args)
std::string toString() const override
Definition: Analyzer.cpp:2642
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count)
int count
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold) override
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
std::pair< size_t, size_t > computeRangeHashTableCounts(const size_t shard_count, std::vector< ColumnsForDevice > &columns_per_device)
virtual int getInnerTableId() const noexcept=0
HashJoinMatchingSet codegenMatchingSetWithOffset(const CompilationOptions &, const size_t, llvm::Value *)
const double bucket_threshold_
const Expr * get_right_operand() const
Definition: Analyzer.h:539
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:587
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:334
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:219
std::optional< HashType > layout_override_
bool isProbeCompressed() const
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:134
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
uint64_t ThreadId
Definition: Logger.h:345
size_t offsetBufferOff() const noexceptoverride
bool isInnerColCompressed() const
ThreadId thread_id()
Definition: Logger.cpp:791
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadId parent_thread_id)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:83
const Analyzer::RangeOper * range_expr_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:132
int cpu_threads()
Definition: thread_count.h:24
CompositeKeyInfo composite_key_info_
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
#define VLOG(n)
Definition: Logger.h:303
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:130
const size_t max_hashtable_size_
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap