OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RangeJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
19 #include "QueryEngine/Execute.h"
25 
26 // clang-format off
69 // clang-format on
70 
71 std::shared_ptr<RangeJoinHashTable> RangeJoinHashTable::getInstance(
72  const std::shared_ptr<Analyzer::BinOper> condition,
73  const Analyzer::RangeOper* range_expr,
74  const std::vector<InputTableInfo>& query_infos,
75  const Data_Namespace::MemoryLevel memory_level,
76  const JoinType join_type,
77  const int device_count,
78  ColumnCacheMap& column_cache,
79  Executor* executor,
80  const HashTableBuildDagMap& hashtable_build_dag_map,
81  const RegisteredQueryHint& query_hints,
82  const TableIdToNodeMap& table_id_to_node_map) {
83  // the hash table is built over the LHS of the range oper. we then use the lhs
84  // of the bin oper + the rhs of the range oper for the probe
85  auto range_expr_col_var =
86  dynamic_cast<const Analyzer::ColumnVar*>(range_expr->get_left_operand());
87  if (!range_expr_col_var || !range_expr_col_var->get_type_info().is_geometry()) {
88  throw HashJoinFail("Could not build hash tables for range join | " +
89  range_expr->toString());
90  }
91 
92  CHECK(range_expr_col_var->get_type_info().is_geometry());
93 
94  auto coords_column_key = range_expr_col_var->getColumnKey();
95  coords_column_key.column_id = coords_column_key.column_id + 1;
96  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(coords_column_key);
97  CHECK(coords_cd);
98 
99  auto range_join_inner_col_expr = makeExpr<Analyzer::ColumnVar>(
100  coords_cd->columnType, coords_column_key, range_expr_col_var->get_rte_idx());
101 
102  std::vector<InnerOuter> inner_outer_pairs;
103  inner_outer_pairs.emplace_back(
104  InnerOuter{dynamic_cast<Analyzer::ColumnVar*>(range_join_inner_col_expr.get()),
105  condition->get_left_operand()});
106 
107  const auto& query_info =
108  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
109  .info;
110 
111  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
112  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
113  throw TooManyHashEntries();
114  }
115 
116  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
118  condition.get(), executor, inner_outer_pairs)
119  : 0;
120 
121  auto join_hash_table = std::make_shared<RangeJoinHashTable>(condition,
122  join_type,
123  range_expr,
124  range_join_inner_col_expr,
125  query_infos,
126  memory_level,
127  column_cache,
128  executor,
129  inner_outer_pairs,
130  device_count,
131  query_hints,
132  hashtable_build_dag_map,
133  table_id_to_node_map);
135  HashJoin::getInnerTableId(inner_outer_pairs), shard_count, executor);
136  try {
137  join_hash_table->reifyWithLayout(HashType::OneToMany);
138  } catch (const HashJoinFail& e) {
139  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
140  "involved in equijoin | ") +
141  e.what());
142  } catch (const ColumnarConversionNotSupported& e) {
143  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
144  e.what());
145  } catch (const JoinHashTableTooBig& e) {
146  throw e;
147  } catch (const std::exception& e) {
148  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
149  << e.what();
150  }
151 
152  return join_hash_table;
153 }
154 
156  auto timer = DEBUG_TIMER(__func__);
157  CHECK(layout == HashType::OneToMany);
158 
159  const auto& query_info =
161  .info;
162 
163  if (query_info.fragments.empty()) {
164  return;
165  }
166 
167  const auto& table_key = getInnerTableId();
168  VLOG(1) << "Reify with layout " << getHashTypeString(layout) << "for " << table_key;
169 
170  std::vector<ColumnsForDevice> columns_per_device;
171 
172  auto data_mgr = executor_->getDataMgr();
173  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
174  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
175  const auto shard_count = shardCount();
176  for (int device_id = 0; device_id < device_count_; ++device_id) {
177  fragments_per_device.emplace_back(
178  shard_count
179  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
180  : query_info.fragments);
182  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
183  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
184  }
185  // for overlaps join, we need to fetch columns regardless of the availability of
186  // cached hash table to calculate various params, i.e., bucket size info todo
187  // (yoonmin) : relax this
188  const auto columns_for_device =
189  fetchColumnsForDevice(fragments_per_device[device_id],
190  device_id,
192  ? dev_buff_owners[device_id].get()
193  : nullptr);
194  columns_per_device.push_back(columns_for_device);
195  }
196 
198 
199  const auto bucket_range =
200  dynamic_cast<const Analyzer::Constant*>(range_expr_->get_right_operand());
201 
202  CHECK(bucket_range);
203  CHECK(bucket_range->get_type_info().is_fp() &&
204  bucket_range->get_type_info().get_size() == 8); // TODO
205 
206  const auto bucket_range_datum = bucket_range->get_constval();
207 
208  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
209  inverse_bucket_sizes_for_dimension_.emplace_back(1. / bucket_range_datum.doubleval);
210 
212  inverse_bucket_sizes_for_dimension_, columns_per_device, device_count_);
213 
215 
216  // to properly lookup cached hash table, we need to use join columns listed as lhs and
217  // rhs of the overlaps op instead of physical (and hidden) column tailored to range join
218  // expr in other words, we need to use geometry column (point) instead of its hidden
219  // array column i.e., see `get_physical_cols` function
220  std::vector<InnerOuter> inner_outer_pairs_for_cache_lookup;
221  inner_outer_pairs_for_cache_lookup.emplace_back(InnerOuter{
222  dynamic_cast<const Analyzer::ColumnVar*>(range_expr_->get_left_operand()),
223  condition_->get_left_operand()});
224  auto hashtable_access_path_info =
225  HashtableRecycler::getHashtableAccessPathInfo(inner_outer_pairs_for_cache_lookup,
226  {},
227  condition_->get_optype(),
228  join_type_,
231  shard_count,
232  fragments_per_device,
233  executor_);
234  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
235  table_keys_ = hashtable_access_path_info.table_keys;
236 
237  auto get_inner_table_key = [&inner_outer_pairs_for_cache_lookup]() {
238  auto col_var = inner_outer_pairs_for_cache_lookup.front().first;
239  return col_var->getTableKey();
240  };
241 
242  if (table_keys_.empty()) {
243  const auto& inner_table_key = get_inner_table_key();
245  composite_key_info_.cache_key_chunks, inner_table_key);
246  }
247  CHECK(!table_keys_.empty());
248 
254  fragments_per_device,
255  device_count_);
256 
257  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
258  get_inner_table_key().table_id > 0) {
259  std::vector<size_t> per_device_chunk_key;
260  for (int device_id = 0; device_id < device_count_; ++device_id) {
261  auto chunk_key_hash = boost::hash_value(composite_key_info_.cache_key_chunks);
262  boost::hash_combine(chunk_key_hash,
263  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
264  per_device_chunk_key.push_back(chunk_key_hash);
266  inner_outer_pairs_for_cache_lookup,
267  columns_per_device.front().join_columns.front().num_elems,
268  chunk_key_hash,
269  condition_->get_optype(),
272  {}};
273  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
274  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
275  table_keys_);
276  }
277  }
278 
280  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
281  if (auto generic_hash_table =
282  initHashTableOnCpuFromCache(hashtable_cache_key_.front(),
285  if (auto hash_table =
286  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
287  // See if a hash table of a different layout was returned.
288  // If it was OneToMany, we can reuse it on ManyToMany.
289  if (layout == HashType::ManyToMany &&
290  hash_table->getLayout() == HashType::OneToMany) {
291  // use the cached hash table
293  }
294 
296 #ifdef HAVE_CUDA
297  for (int device_id = 0; device_id < device_count_; ++device_id) {
298  auto gpu_hash_table = copyCpuHashTableToGpu(hash_table,
299  layout,
300  hash_table->getEntryCount(),
301  hash_table->getEmittedKeysCount(),
302  device_id);
303  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
304  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
305  }
306 #else
307  UNREACHABLE();
308 #endif
309  } else {
311  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
312  // do not move hash_table to keep valid ptr of it within the hash table recycler
313  hash_tables_for_device_[0] = hash_table;
314  }
315  return;
316  }
317  }
318  }
319 
320  auto [entry_count, emitted_keys_count] =
321  computeRangeHashTableCounts(shard_count, columns_per_device);
322 
323  size_t hash_table_size = OverlapsJoinHashTable::calculateHashTableSize(
324  inverse_bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
325 
326  VLOG(1) << "Finalized range join hash table: entry count " << entry_count
327  << " hash table size " << hash_table_size;
328 
329  std::vector<std::future<void>> init_threads;
330  for (int device_id = 0; device_id < device_count_; ++device_id) {
331  init_threads.push_back(
334  this,
335  /* columns_for_device */ columns_per_device[device_id],
336  /* layout_type */ layout,
337  /* entry_count */ entry_count,
338  /* emitted_keys_count */ emitted_keys_count,
339  /* device_id */ device_id,
340  /* parent_thread_local_ids */ logger::thread_local_ids()));
341  }
342  for (auto& init_thread : init_threads) {
343  init_thread.wait();
344  }
345  for (auto& init_thread : init_threads) {
346  init_thread.get();
347  }
348 }
349 
351  const ColumnsForDevice& columns_for_device,
352  const HashType layout,
353  const size_t entry_count,
354  const size_t emitted_keys_count,
355  const int device_id,
356  const logger::ThreadLocalIds parent_thread_local_ids) {
357  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
358  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
359  CHECK_EQ(getKeyComponentWidth(), size_t(8));
361 
363  VLOG(1) << "Building range join hash table on CPU.";
364  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
365  columns_for_device.join_column_types,
366  columns_for_device.join_buckets,
367  layout,
368  entry_count,
369  emitted_keys_count);
370  CHECK(hash_table);
371 
372 #ifdef HAVE_CUDA
374  auto gpu_hash_table = copyCpuHashTableToGpu(
375  hash_table, layout, entry_count, emitted_keys_count, device_id);
376  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
377  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
378  } else {
379 #else
381 #endif
382  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
383  hash_tables_for_device_[0] = std::move(hash_table);
384 #ifdef HAVE_CUDA
385  }
386 #endif
387  } else {
388 #ifdef HAVE_CUDA
389  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
390  columns_for_device.join_column_types,
391  columns_for_device.join_buckets,
392  layout,
393  entry_count,
394  emitted_keys_count,
395  device_id);
396  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
397  hash_tables_for_device_[device_id] = std::move(hash_table);
398 #else
399  UNREACHABLE();
400 #endif
401  }
402 }
403 // #endif
404 
405 #ifdef HAVE_CUDA
406 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnGpu(
407  const std::vector<JoinColumn>& join_columns,
408  const std::vector<JoinColumnTypeInfo>& join_column_types,
409  const std::vector<JoinBucketInfo>& join_bucket_info,
410  const HashType layout,
411  const size_t entry_count,
412  const size_t emitted_keys_count,
413  const size_t device_id) {
415 
416  VLOG(1) << "Building range join hash table on GPU.";
417 
419  auto data_mgr = executor_->getDataMgr();
420  CudaAllocator allocator(
421  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
422  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
423  CHECK_EQ(join_columns.size(), 1u);
424  CHECK(!join_bucket_info.empty());
425 
426  auto& inverse_bucket_sizes_for_dimension =
427  join_bucket_info[0].inverse_bucket_sizes_for_dimension;
428 
429  auto bucket_sizes_gpu = transfer_vector_of_flat_objects_to_gpu(
430  inverse_bucket_sizes_for_dimension, allocator);
431 
432  const auto key_handler = RangeKeyHandler(isInnerColCompressed(),
433  inverse_bucket_sizes_for_dimension.size(),
434  join_columns_gpu,
435  bucket_sizes_gpu);
436 
437  const auto err = builder.initHashTableOnGpu(&key_handler,
438  join_columns,
439  layout,
440  join_type_,
443  entry_count,
444  emitted_keys_count,
445  device_id,
446  executor_,
447  query_hints_);
448  if (err) {
449  throw HashJoinFail(
450  std::string("Unrecognized error when initializing GPU range join hash table (") +
451  std::to_string(err) + std::string(")"));
452  }
453  return builder.getHashTable();
454 }
455 #endif
456 
457 std::shared_ptr<BaselineHashTable> RangeJoinHashTable::initHashTableOnCpu(
458  const std::vector<JoinColumn>& join_columns,
459  const std::vector<JoinColumnTypeInfo>& join_column_types,
460  const std::vector<JoinBucketInfo>& join_bucket_info,
461  const HashType layout,
462  const size_t entry_count,
463  const size_t emitted_keys_count) {
464  auto timer = DEBUG_TIMER(__func__);
465  decltype(std::chrono::steady_clock::now()) ts1, ts2;
466  ts1 = std::chrono::steady_clock::now();
467  const auto composite_key_info =
469  CHECK(!join_columns.empty());
470  CHECK(!join_bucket_info.empty());
471 
473  const auto key_component_count =
474  join_bucket_info[0].inverse_bucket_sizes_for_dimension.size();
475 
476  auto key_handler =
478  key_component_count,
479  &join_columns[0],
480  join_bucket_info[0].inverse_bucket_sizes_for_dimension.data());
481 
484  dummy_str_proxy_translation_maps_ptrs_and_offsets;
485  const auto err =
486  builder.initHashTableOnCpu(&key_handler,
487  composite_key_info,
488  join_columns,
489  join_column_types,
490  join_bucket_info,
491  dummy_str_proxy_translation_maps_ptrs_and_offsets,
492  entry_count,
493  emitted_keys_count,
494  layout,
495  join_type_,
498  query_hints_);
499  ts2 = std::chrono::steady_clock::now();
500  if (err) {
501  throw HashJoinFail(std::string("Unrecognized error when initializing CPU "
502  "range join hash table (") +
503  std::to_string(err) + std::string(")"));
504  }
505  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
506  auto hashtable_build_time =
507  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
510  hash_table,
512  hashtable_build_time);
513  return hash_table;
514 }
515 
517  const size_t shard_count,
518  std::vector<ColumnsForDevice>& columns_per_device) {
520  const auto [tuple_count, emitted_keys_count] =
522  columns_per_device,
525  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
526 
527  return std::make_pair(
528  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
529  emitted_keys_count);
530 }
531 
533  const std::vector<double>& inverse_bucket_sizes_for_dimension,
534  std::vector<ColumnsForDevice>& columns_per_device,
535  const size_t chosen_max_hashtable_size,
536  const double chosen_bucket_threshold) {
537 #ifdef _WIN32
538  // WIN32 needs have C++20 set for designated initialisation to work
539  CountDistinctDescriptor count_distinct_desc{
541  0,
542  11,
543  true,
547  1,
548  };
549 #else
550  CountDistinctDescriptor count_distinct_desc{
552  .min_val = 0,
553  .bitmap_sz_bits = 11,
554  .approximate = true,
558  .sub_bitmap_count = 1,
559  };
560 #endif
561  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
562 
563  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
564  if (columns_per_device.front().join_columns.front().num_elems == 0) {
565  return std::make_pair(0, 0);
566  }
567 
568  for (auto& columns_for_device : columns_per_device) {
569  columns_for_device.setBucketInfo(inverse_bucket_sizes_for_dimension,
571  }
572 
573  // Number of keys must match dimension of buckets
574  CHECK_EQ(columns_per_device.front().join_columns.size(),
575  columns_per_device.front().join_buckets.size());
577  const auto composite_key_info =
579  int thread_count = cpu_threads();
580  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
581  auto hll_result = &hll_buffer_all_cpus[0];
582 
583  std::vector<int32_t> num_keys_for_row;
584  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
585 
587  num_keys_for_row,
588  count_distinct_desc.bitmap_sz_bits,
589  padded_size_bytes,
590  columns_per_device.front().join_columns,
591  columns_per_device.front().join_column_types,
592  columns_per_device.front().join_buckets,
594  thread_count);
595 
596  for (int i = 1; i < thread_count; ++i) {
597  hll_unify(hll_result,
598  hll_result + i * padded_size_bytes,
599  1 << count_distinct_desc.bitmap_sz_bits);
600  }
601  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
602  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
603  }
604 #ifdef HAVE_CUDA
605  auto& data_mgr = *executor_->getDataMgr();
606  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
607  for (auto& host_hll_buffer : host_hll_buffers) {
608  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
609  }
610  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
611  std::vector<std::future<void>> approximate_distinct_device_threads;
612  for (int device_id = 0; device_id < device_count_; ++device_id) {
613  approximate_distinct_device_threads.emplace_back(std::async(
615  [device_id,
616  &columns_per_device,
617  &count_distinct_desc,
618  &data_mgr,
619  &host_hll_buffers,
620  &emitted_keys_count_device_threads,
621  this] {
622  auto allocator = std::make_unique<CudaAllocator>(
623  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
624  auto device_hll_buffer =
625  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
626  data_mgr.getCudaMgr()->zeroDeviceMem(
627  device_hll_buffer,
628  count_distinct_desc.bitmapPaddedSizeBytes(),
629  device_id,
631  const auto& columns_for_device = columns_per_device[device_id];
632  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
633  columns_for_device.join_columns, *allocator);
634 
635  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
636  const auto& bucket_sizes_for_dimension =
637  columns_for_device.join_buckets[0].inverse_bucket_sizes_for_dimension;
638  auto bucket_sizes_gpu =
639  allocator->alloc(bucket_sizes_for_dimension.size() * sizeof(double));
640  allocator->copyToDevice(bucket_sizes_gpu,
641  bucket_sizes_for_dimension.data(),
642  bucket_sizes_for_dimension.size() * sizeof(double));
643  const size_t row_counts_buffer_sz =
644  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
645  auto row_counts_buffer = allocator->alloc(row_counts_buffer_sz);
646  data_mgr.getCudaMgr()->zeroDeviceMem(
647  row_counts_buffer,
648  row_counts_buffer_sz,
649  device_id,
651  const auto key_handler =
653  bucket_sizes_for_dimension.size(),
654  join_columns_gpu,
655  reinterpret_cast<double*>(bucket_sizes_gpu));
656  const auto key_handler_gpu =
657  transfer_flat_object_to_gpu(key_handler, *allocator);
659  reinterpret_cast<uint8_t*>(device_hll_buffer),
660  count_distinct_desc.bitmap_sz_bits,
661  reinterpret_cast<int32_t*>(row_counts_buffer),
662  key_handler_gpu,
663  columns_for_device.join_columns[0].num_elems,
664  executor_->blockSize(),
665  executor_->gridSize());
666 
667  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
668  allocator->copyFromDevice(
669  &host_emitted_keys_count,
670  row_counts_buffer +
671  (columns_per_device.front().join_columns[0].num_elems - 1) *
672  sizeof(int32_t),
673  sizeof(int32_t));
674 
675  auto& host_hll_buffer = host_hll_buffers[device_id];
676  allocator->copyFromDevice(&host_hll_buffer[0],
677  device_hll_buffer,
678  count_distinct_desc.bitmapPaddedSizeBytes());
679  }));
680  }
681  for (auto& child : approximate_distinct_device_threads) {
682  child.get();
683  }
685  auto& result_hll_buffer = host_hll_buffers.front();
686  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
687  for (int device_id = 1; device_id < device_count_; ++device_id) {
688  auto& host_hll_buffer = host_hll_buffers[device_id];
689  hll_unify(hll_result,
690  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
691  1 << count_distinct_desc.bitmap_sz_bits);
692  }
693  size_t emitted_keys_count = 0;
694  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
695  emitted_keys_count += emitted_keys_count_device;
696  }
697  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
698  emitted_keys_count);
699 #else
700  UNREACHABLE();
701  return {0, 0};
702 #endif // HAVE_CUDA
703 }
704 
705 #define LL_CONTEXT executor_->cgen_state_->context_
706 #define LL_BUILDER executor_->cgen_state_->ir_builder_
707 #define LL_INT(v) executor_->cgen_state_->llInt(v)
708 #define LL_FP(v) executor_->cgen_state_->llFp(v)
709 #define ROW_FUNC executor_->cgen_state_->row_func_
710 
712  llvm::Value* offset_ptr) {
713  const auto key_component_width = getKeyComponentWidth();
714  CHECK(key_component_width == 4 || key_component_width == 8);
715  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
716  llvm::Value* key_buff_lv{nullptr};
717  switch (key_component_width) {
718  case 4:
719  key_buff_lv =
720  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
721  break;
722  case 8:
723  key_buff_lv =
724  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
725  break;
726  default:
727  CHECK(false);
728  }
729 
730  const auto& inner_outer_pair = inner_outer_pairs_[0];
731  const auto outer_col = inner_outer_pair.second;
732  const auto outer_col_ti = outer_col->get_type_info();
733 
734  if (outer_col_ti.is_geometry()) {
735  CodeGenerator code_generator(executor_);
736  // TODO(adb): for points we will use the coords array, but for other
737  // geometries we will need to use the bounding box. For now only support
738  // points.
739  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
740  CHECK_EQ(inverse_bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
741 
742  llvm::Value* arr_ptr{nullptr};
743  // prepare point column (arr) ptr to generate code for hash table key
744  if (auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col)) {
745  const auto col_lvs = code_generator.codegen(outer_col, true, co);
746  CHECK_EQ(col_lvs.size(), size_t(1));
747  auto column_key = outer_col_var->getColumnKey();
748  column_key.column_id = column_key.column_id + 1;
749  const auto coords_cd = Catalog_Namespace::get_metadata_for_column(column_key);
750  CHECK(coords_cd);
751  const auto coords_ti = coords_cd->columnType;
752 
753  const auto array_buff_ptr = executor_->cgen_state_->emitExternalCall(
754  "array_buff",
755  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
756  {col_lvs.front(), code_generator.posArg(outer_col)});
757  CHECK(array_buff_ptr);
758  CHECK(coords_ti.get_elem_type().get_type() == kTINYINT)
759  << "Only TINYINT coordinates columns are supported in geo overlaps "
760  "hash join.";
761  arr_ptr =
762  code_generator.castArrayPointer(array_buff_ptr, coords_ti.get_elem_type());
763  } else if (auto geo_expr_outer_col =
764  dynamic_cast<const Analyzer::GeoOperator*>(outer_col)) {
765  const auto geo_expr_name = geo_expr_outer_col->getName();
766  if (func_resolve(geo_expr_name, "ST_Point"sv, "ST_Transform"sv, "ST_Centroid"sv)) {
767  // note that ST_SetSRID changes type info of the column, and is handled by
768  // translation phase, so when we use ST_SETSRID(ST_POINT(x, y), 4326)
769  // as a join column expression, we recognize it as ST_POINT (with SRID as 4326)
770  const auto col_lvs = code_generator.codegen(outer_col, true, co);
771  // listed functions keep point coordinates in the local variable (let say S)
772  // which is corresponding to the pointer that col_lvs[0] holds
773  // thus, all we need is to retrieve necessary coordinate from the S by varying
774  // its offset (i.e., i == 0 means x coordinate)
775  arr_ptr = LL_BUILDER.CreatePointerCast(
776  col_lvs[0], llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_));
777  } else {
778  throw std::runtime_error(
779  "RHS key of the range join operator has a geospatial function which is not "
780  "supported yet: " +
781  geo_expr_name);
782  }
783  } else {
784  throw std::runtime_error("Range join operator has an invalid rhs key: " +
785  outer_col->toString());
786  }
787 
788  // load and unpack offsets
789  const auto offset =
790  LL_BUILDER.CreateLoad(offset_ptr->getType()->getPointerElementType(),
791  offset_ptr,
792  "packed_bucket_offset");
793  const auto x_offset =
794  LL_BUILDER.CreateTrunc(offset, llvm::Type::getInt32Ty(LL_CONTEXT));
795 
796  const auto y_offset_shifted =
797  LL_BUILDER.CreateLShr(offset, LL_INT(static_cast<int64_t>(32)));
798  const auto y_offset =
799  LL_BUILDER.CreateTrunc(y_offset_shifted, llvm::Type::getInt32Ty(LL_CONTEXT));
800 
801  const auto x_bucket_offset =
802  LL_BUILDER.CreateSExt(x_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
803  const auto y_bucket_offset =
804  LL_BUILDER.CreateSExt(y_offset, llvm::Type::getInt64Ty(LL_CONTEXT));
805 
806  for (size_t i = 0; i < 2; i++) {
807  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
808  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
809  key_buff_lv,
810  LL_INT(i));
811 
812  const auto funcName = isProbeCompressed() ? "get_bucket_key_for_range_compressed"
813  : "get_bucket_key_for_range_double";
814 
815  // Note that get_bucket_key_for_range_compressed will need to be
816  // specialized for future compression schemes
817  auto bucket_key = executor_->cgen_state_->emitExternalCall(
818  funcName,
821 
822  auto bucket_key_shifted = i == 0
823  ? LL_BUILDER.CreateAdd(x_bucket_offset, bucket_key)
824  : LL_BUILDER.CreateAdd(y_bucket_offset, bucket_key);
825 
826  const auto col_lv = LL_BUILDER.CreateSExt(
827  bucket_key_shifted, get_int_type(key_component_width * 8, LL_CONTEXT));
828  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
829  }
830  } else {
831  LOG(FATAL) << "Range join key currently only supported for geospatial types.";
832  }
833  return key_buff_lv;
834 }
835 
837  const CompilationOptions& co,
838  const size_t index,
839  llvm::Value* range_offset) {
840  const auto key_component_width = getKeyComponentWidth();
841  CHECK(key_component_width == 4 || key_component_width == 8);
842 
843  auto key_buff_lv = codegenKey(co, range_offset);
845 
846  auto hash_ptr = codegenHashTableLoad(index, executor_);
847  const auto composite_dict_ptr_type =
848  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
849 
850  const auto composite_key_dict =
851  hash_ptr->getType()->isPointerTy()
852  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
853  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
854 
855  const auto key_component_count = getKeyComponentCount();
856 
857  const auto funcName =
858  "get_composite_key_index_" + std::to_string(key_component_width * 8);
859 
860  const auto key = executor_->cgen_state_->emitExternalCall(funcName,
862  {key_buff_lv,
863  LL_INT(key_component_count),
864  composite_key_dict,
865  LL_INT(getEntryCount())});
866 
867  auto one_to_many_ptr = hash_ptr;
868  if (one_to_many_ptr->getType()->isPointerTy()) {
869  one_to_many_ptr =
870  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
871  } else {
872  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
873  }
874  const auto composite_key_dict_size = offsetBufferOff();
875  one_to_many_ptr =
876  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
877 
879  /* hash_join_idx_args_in */ {one_to_many_ptr,
880  key,
881  LL_INT(int64_t(0)),
882  LL_INT(getEntryCount() - 1)},
883  /* is_sharded */ false,
884  /* is_nullable */ false,
885  /* is_bw_eq */ false,
886  /* sub_buff_size */ getComponentBufferSize(),
887  /* executor */ executor_);
888 }
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:452
#define LL_INT(v)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
auto func_resolve
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:165
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
llvm::Value * codegenKey(const CompilationOptions &co, llvm::Value *offset)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
shared::TableKey getInnerTableId() const noexceptoverride
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:106
HashType getHashType() const noexceptoverride
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:130
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
#define LOG(tag)
Definition: Logger.h:285
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:779
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:580
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:363
llvm::Value * castArrayPointer(llvm::Value *ptr, const SQLTypeInfo &elem_ti)
#define UNREACHABLE()
Definition: Logger.h:337
void reifyWithLayout(const HashType layout) override
const ColumnDescriptor * get_metadata_for_column(const ::shared::ColumnKey &column_key)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:416
CountDistinctImplType impl_type_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:305
#define LL_CONTEXT
const Expr * get_left_operand() const
Definition: Analyzer.h:552
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
#define LL_FP(v)
const std::shared_ptr< Analyzer::BinOper > condition_
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:112
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
future< Result > async(Fn &&fn, Args &&...args)
std::string toString() const override
Definition: Analyzer.cpp:2747
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count)
RegisteredQueryHint query_hints_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold) override
const std::vector< InputTableInfo > & query_infos_
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
HashTableBuildDagMap hashtable_build_dag_map_
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
std::pair< size_t, size_t > computeRangeHashTableCounts(const size_t shard_count, std::vector< ColumnsForDevice > &columns_per_device)
std::vector< QueryPlanHash > hashtable_cache_key_
HashJoinMatchingSet codegenMatchingSetWithOffset(const CompilationOptions &, const size_t, llvm::Value *)
const double bucket_threshold_
const Expr * get_right_operand() const
Definition: Analyzer.h:553
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
static std::shared_ptr< RangeJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const Analyzer::RangeOper *range_expr, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
virtual shared::TableKey getInnerTableId() const noexcept=0
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
std::vector< double > inverse_bucket_sizes_for_dimension_
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::optional< HashType > layout_override_
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
bool isProbeCompressed() const
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:165
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:532
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t offsetBufferOff() const noexceptoverride
bool isInnerColCompressed() const
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
Data_Namespace::MemoryLevel effective_memory_level_
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
ThreadId thread_id_
Definition: Logger.h:138
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:114
const Analyzer::RangeOper * range_expr_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
CompositeKeyInfo composite_key_info_
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:874
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:111
#define VLOG(n)
Definition: Logger.h:387
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:161
const size_t max_hashtable_size_
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:461