OmniSciDB  b28c0d5765
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BaselineJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <future>
20 
24 #include "QueryEngine/Execute.h"
31 
32 // let's only consider CPU hashtable recycler at this moment
33 // todo (yoonmin): support GPU hashtable cache without regression
34 std::unique_ptr<HashtableRecycler> BaselineJoinHashTable::hash_table_cache_ =
35  std::make_unique<HashtableRecycler>(CacheItemType::BASELINE_HT,
37 std::unique_ptr<HashingSchemeRecycler> BaselineJoinHashTable::hash_table_layout_cache_ =
38  std::make_unique<HashingSchemeRecycler>();
39 
41 std::shared_ptr<BaselineJoinHashTable> BaselineJoinHashTable::getInstance(
42  const std::shared_ptr<Analyzer::BinOper> condition,
43  const std::vector<InputTableInfo>& query_infos,
44  const Data_Namespace::MemoryLevel memory_level,
45  const JoinType join_type,
46  const HashType preferred_hash_type,
47  const int device_count,
48  ColumnCacheMap& column_cache,
49  Executor* executor,
50  const HashTableBuildDagMap& hashtable_build_dag_map,
51  const RegisteredQueryHint& query_hints,
52  const TableIdToNodeMap& table_id_to_node_map) {
53  decltype(std::chrono::steady_clock::now()) ts1, ts2;
54 
55  if (VLOGGING(1)) {
56  VLOG(1) << "Building keyed hash table " << getHashTypeString(preferred_hash_type)
57  << " for qual: " << condition->toString();
58  ts1 = std::chrono::steady_clock::now();
59  }
60  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
61  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
62  const auto& inner_outer_cols = inner_outer_pairs.first;
63  const auto& col_pairs_string_op_infos = inner_outer_pairs.second;
64  auto join_hash_table = std::shared_ptr<BaselineJoinHashTable>(
65  new BaselineJoinHashTable(condition,
66  join_type,
67  query_infos,
68  memory_level,
69  column_cache,
70  executor,
71  inner_outer_cols,
72  col_pairs_string_op_infos,
73  device_count,
74  query_hints,
75  hashtable_build_dag_map,
76  table_id_to_node_map));
77  try {
78  join_hash_table->reify(preferred_hash_type);
79  } catch (const TableMustBeReplicated& e) {
80  // Throw a runtime error to abort the query
81  join_hash_table->freeHashBufferMemory();
82  throw std::runtime_error(e.what());
83  } catch (const HashJoinFail& e) {
84  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
85  // possible)
86  join_hash_table->freeHashBufferMemory();
87  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
88  "involved in equijoin | ") +
89  e.what());
90  } catch (const ColumnarConversionNotSupported& e) {
91  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
92  e.what());
93  } catch (const OutOfMemory& e) {
94  throw HashJoinFail(
95  std::string("Ran out of memory while building hash tables for equijoin | ") +
96  e.what());
97  } catch (const JoinHashTableTooBig& e) {
98  throw e;
99  } catch (const std::exception& e) {
100  throw std::runtime_error(
101  std::string("Fatal error while attempting to build hash tables for join: ") +
102  e.what());
103  }
104  if (VLOGGING(1)) {
105  ts2 = std::chrono::steady_clock::now();
106  VLOG(1) << "Built keyed hash table "
107  << getHashTypeString(join_hash_table->getHashType()) << " in "
108  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
109  << " ms";
110  }
111  return join_hash_table;
112 }
113 
115  const std::shared_ptr<Analyzer::BinOper> condition,
116  const JoinType join_type,
117  const std::vector<InputTableInfo>& query_infos,
118  const Data_Namespace::MemoryLevel memory_level,
119  ColumnCacheMap& column_cache,
120  Executor* executor,
121  const std::vector<InnerOuter>& inner_outer_pairs,
122  const std::vector<InnerOuterStringOpInfos>& col_pairs_string_op_infos,
123  const int device_count,
124  const RegisteredQueryHint& query_hints,
125  const HashTableBuildDagMap& hashtable_build_dag_map,
126  const TableIdToNodeMap& table_id_to_node_map)
127  : condition_(condition)
128  , join_type_(join_type)
129  , query_infos_(query_infos)
130  , memory_level_(memory_level)
131  , executor_(executor)
132  , column_cache_(column_cache)
133  , inner_outer_pairs_(inner_outer_pairs)
134  , inner_outer_string_op_infos_pairs_(col_pairs_string_op_infos)
135  , catalog_(executor->getCatalog())
136  , device_count_(device_count)
137  , query_hints_(query_hints)
138  , needs_dict_translation_(false)
139  , hashtable_build_dag_map_(hashtable_build_dag_map)
140  , table_id_to_node_map_(table_id_to_node_map) {
142  hash_tables_for_device_.resize(std::max(device_count_, 1));
143 }
144 
146  const Analyzer::BinOper* condition,
147  const Executor* executor,
148  const std::vector<InnerOuter>& inner_outer_pairs) {
149  for (const auto& inner_outer_pair : inner_outer_pairs) {
150  const auto pair_shard_count = get_shard_count(inner_outer_pair, executor);
151  if (pair_shard_count) {
152  return pair_shard_count;
153  }
154  }
155  return 0;
156 }
157 
159  const int device_id,
160  bool raw) const {
161  auto buffer = getJoinHashBuffer(device_type, device_id);
162  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
163  auto hash_table = hash_tables_for_device_[device_id];
164  CHECK(hash_table);
165  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
166 #ifdef HAVE_CUDA
167  std::unique_ptr<int8_t[]> buffer_copy;
168  if (device_type == ExecutorDeviceType::GPU) {
169  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
170 
171  auto& data_mgr = catalog_->getDataMgr();
172  auto device_allocator = std::make_unique<CudaAllocator>(
173  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
174  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
175  }
176  auto ptr1 = buffer_copy ? buffer_copy.get() : buffer;
177 #else
178  auto ptr1 = buffer;
179 #endif // HAVE_CUDA
180  auto ptr2 = ptr1 + offsetBufferOff();
181  auto ptr3 = ptr1 + countBufferOff();
182  auto ptr4 = ptr1 + payloadBufferOff();
183  CHECK(hash_table);
184  const auto layout = getHashType();
185  return HashTable::toString(
186  "keyed",
187  getHashTypeString(layout),
188  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
190  hash_table->getEntryCount(),
191  ptr1,
192  ptr2,
193  ptr3,
194  ptr4,
195  buffer_size,
196  raw);
197 }
198 
199 std::set<DecodedJoinHashBufferEntry> BaselineJoinHashTable::toSet(
200  const ExecutorDeviceType device_type,
201  const int device_id) const {
202  auto buffer = getJoinHashBuffer(device_type, device_id);
203  auto hash_table = getHashTableForDevice(device_id);
204  CHECK(hash_table);
205  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
206 #ifdef HAVE_CUDA
207  std::unique_ptr<int8_t[]> buffer_copy;
208  if (device_type == ExecutorDeviceType::GPU) {
209  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
210  auto& data_mgr = catalog_->getDataMgr();
211  auto device_allocator = std::make_unique<CudaAllocator>(
212  &data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
213  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
214  }
215  auto ptr1 = buffer_copy ? buffer_copy.get() : buffer;
216 #else
217  auto ptr1 = buffer;
218 #endif // HAVE_CUDA
219  auto ptr2 = ptr1 + offsetBufferOff();
220  auto ptr3 = ptr1 + countBufferOff();
221  auto ptr4 = ptr1 + payloadBufferOff();
222  const auto layout = hash_table->getLayout();
223  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
225  hash_table->getEntryCount(),
226  ptr1,
227  ptr2,
228  ptr3,
229  ptr4,
230  buffer_size);
231 }
232 
234  const std::vector<InnerOuter>& inner_outer_pairs,
235  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs,
236  const Executor* executor) {
237  const auto num_col_pairs = inner_outer_pairs.size();
238  CHECK_EQ(num_col_pairs, inner_outer_string_op_infos_pairs.size());
239  for (size_t col_pair_idx = 0; col_pair_idx < num_col_pairs; ++col_pair_idx) {
240  if (needs_dictionary_translation(inner_outer_pairs[col_pair_idx],
241  inner_outer_string_op_infos_pairs[col_pair_idx],
242  executor)) {
243  return true;
244  }
245  }
246  return false;
247 }
248 
249 void BaselineJoinHashTable::reify(const HashType preferred_layout) {
250  auto timer = DEBUG_TIMER(__func__);
252  const auto composite_key_info = HashJoin::getCompositeKeyInfo(
256 
261  executor_);
262 
263  auto layout = preferred_layout;
264  if (condition_->is_overlaps_oper()) {
265  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
266 
267  if (inner_outer_pairs_[0].second->get_type_info().is_array()) {
268  layout = HashType::ManyToMany;
269  } else {
270  layout = HashType::OneToMany;
271  }
272  try {
273  reifyWithLayout(layout);
274  return;
275  } catch (const std::exception& e) {
276  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
277  << e.what();
278  throw;
279  }
280  }
281 
282  // Automatically prefer One-To-Many layouts when string operations are involved as these
283  // tend to be cardinality-reducing operations.
284  // Todo(todd): Ostensibly only string ops on the rhs/inner expression cause rhs dups and
285  // so we may be too conservative here, but validate
286 
287  for (const auto& inner_outer_string_op_infos : inner_outer_string_op_infos_pairs_) {
288  if (inner_outer_string_op_infos.first.size() ||
289  inner_outer_string_op_infos.second.size()) {
290  layout = HashType::OneToMany;
291  break;
292  }
293  }
294 
295  try {
296  reifyWithLayout(layout);
297  } catch (const std::exception& e) {
298  VLOG(1) << "Caught exception while building baseline hash table: " << e.what();
299  // In perfect hash we CHECK that the layout is not OnetoMany here, but for baseline
300  // we are catching all exceptions, so should determine if that is safe first
301  // before we would CHECK and not throw an exception here
302  if (layout == HashType::OneToMany) {
303  throw(e);
304  }
307  }
308 }
309 
311  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
312  if (query_info.fragments.empty()) {
313  return;
314  }
315 
316  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
317  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
318  throw TooManyHashEntries();
319  }
320 
321  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
322  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
323  std::vector<ColumnsForDevice> columns_per_device;
324  const auto shard_count = shardCount();
325  auto entries_per_device =
326  get_entries_per_device(total_entries, shard_count, device_count_, memory_level_);
327  auto data_mgr = executor_->getDataMgr();
328  // cached hash table lookup logic is similar with perfect join hash table
329  // first, prepare fragment lists per device
330  std::vector<ChunkKey> chunk_key_per_device;
331  for (int device_id = 0; device_id < device_count_; ++device_id) {
332  fragments_per_device.emplace_back(
333  shard_count
334  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
335  : query_info.fragments);
337  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
338  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
339  }
340  const auto chunk_key = genChunkKey(fragments_per_device[device_id]);
341  chunk_key_per_device.emplace_back(std::move(chunk_key));
342  }
343 
344  // prepare per-device cache key
345  auto inner_outer_pairs = HashJoin::normalizeColumnPairs(
346  condition_.get(), *executor_->getCatalog(), executor_->getTemporaryTables());
347  const auto& inner_outer_cols = inner_outer_pairs.first;
348  const auto& col_pairs_string_op_infos = inner_outer_pairs.second;
349  auto hashtable_access_path_info =
351  col_pairs_string_op_infos,
352  condition_->get_optype(),
353  join_type_,
356  shard_count,
357  fragments_per_device,
358  executor_);
359  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
360  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
361  table_keys_ = hashtable_access_path_info.table_keys;
362 
363  // the actual chunks fetched per device can be different but they constitute the same
364  // table in the same db, so we can exploit this to create an alternative table key
365  if (table_keys_.empty()) {
367  chunk_key_per_device,
368  executor_->getCatalog()->getDatabaseId(),
369  getInnerTableId());
370  }
371  CHECK(!table_keys_.empty());
372 
373  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
374  getInnerTableId() > 0) {
375  // sometimes we cannot retrieve query plan dag, so try to recycler cache
376  // with the old-passioned cache key if we deal with hashtable of non-temporary table
377  for (int device_id = 0; device_id < device_count_; ++device_id) {
378  const auto num_tuples = std::accumulate(fragments_per_device[device_id].begin(),
379  fragments_per_device[device_id].end(),
380  size_t(0),
381  [](const auto& sum, const auto& fragment) {
382  return sum + fragment.getNumTuples();
383  });
386  num_tuples,
387  condition_->get_optype(),
388  join_type_,
389  chunk_key_per_device[device_id]};
390  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
391  }
392  }
393 
394  // register a mapping between cache key and input tables of the hash table
395  const auto invalid_cache_key =
396  HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_);
397  if (!invalid_cache_key) {
398  if (!shard_count) {
399  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_.front(),
400  table_keys_);
401  } else {
402  std::for_each(hashtable_cache_key_.cbegin(),
403  hashtable_cache_key_.cend(),
404  [this](QueryPlanHash key) {
405  hash_table_cache_->addQueryPlanDagForTableKeys(key, table_keys_);
406  });
407  }
408  }
409 
410  // now, let's try to check whether we have a cached hash table for this join qual
411  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
412 
413  // todo (yoonmin) : support dictionary proxy cache for join including string op(s)
414  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
415  std::unique_lock<std::mutex> str_proxy_translation_lock(str_proxy_translation_mutex_);
416  if (str_proxy_translation_maps_.empty()) {
417  const auto composite_key_info = HashJoin::getCompositeKeyInfo(
420  composite_key_info, inner_outer_string_op_infos_pairs_, executor_);
422  }
423  }
424 
425  auto allow_hashtable_recycling =
430  bool has_invalid_cached_hash_table = false;
431  if (effective_memory_level == Data_Namespace::CPU_LEVEL &&
433  allow_hashtable_recycling, invalid_cache_key, join_type_)) {
434  // build a hash table on CPU, and we have a chance to recycle the cached one if
435  // available
436  for (int device_id = 0; device_id < device_count_; ++device_id) {
437  auto hash_table =
438  initHashTableOnCpuFromCache(hashtable_cache_key_[device_id],
441  if (hash_table) {
442  hash_tables_for_device_[device_id] = hash_table;
443  } else {
444  has_invalid_cached_hash_table = true;
445  break;
446  }
447  }
448 
449  if (has_invalid_cached_hash_table) {
450  hash_tables_for_device_.clear();
451  hash_tables_for_device_.resize(device_count_);
452  } else {
454 #ifdef HAVE_CUDA
455  auto data_mgr = executor_->getDataMgr();
456  for (int device_id = 0; device_id < device_count_; ++device_id) {
457  auto cpu_hash_table = std::dynamic_pointer_cast<BaselineHashTable>(
458  hash_tables_for_device_[device_id]);
459  copyCpuHashTableToGpu(cpu_hash_table, device_id, data_mgr);
460  }
461 #else
462  UNREACHABLE();
463 #endif
464  }
465  return;
466  }
467  }
468 
469  // we have no cached hash table for this qual
470  // so, start building the hash table by fetching columns for devices
471  for (int device_id = 0; device_id < device_count_; ++device_id) {
472  const auto columns_for_device =
473  fetchColumnsForDevice(fragments_per_device[device_id],
474  device_id,
476  ? dev_buff_owners[device_id].get()
477  : nullptr);
478  columns_per_device.push_back(columns_for_device);
479  }
480 
481  auto hashtable_layout_type = layout;
482  size_t emitted_keys_count = 0;
483  if (hashtable_layout_type == HashType::OneToMany) {
484  CHECK(!columns_per_device.front().join_columns.empty());
485  emitted_keys_count = columns_per_device.front().join_columns.front().num_elems;
486  size_t tuple_count;
487  std::tie(tuple_count, std::ignore) = approximateTupleCount(columns_per_device);
488  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
489 
490  // reset entries per device with one to many info
491  entries_per_device =
492  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_);
493  }
494  std::vector<std::future<void>> init_threads;
495  for (int device_id = 0; device_id < device_count_; ++device_id) {
496  const auto fragments =
497  shard_count
498  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
499  : query_info.fragments;
500  init_threads.push_back(std::async(std::launch::async,
502  this,
503  columns_per_device[device_id],
504  hashtable_layout_type,
505  device_id,
506  entries_per_device,
507  emitted_keys_count,
509  logger::thread_id()));
510  }
511  for (auto& init_thread : init_threads) {
512  init_thread.wait();
513  }
514  for (auto& init_thread : init_threads) {
515  init_thread.get();
516  }
517 }
518 
520  const std::vector<ColumnsForDevice>& columns_per_device) const {
521  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
522  CountDistinctDescriptor count_distinct_desc{
524  0,
525  11,
526  true,
527  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
530  1};
531  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
532 
533  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
534 
535  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
536  int thread_count = cpu_threads();
537  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
538  auto hll_result = &hll_buffer_all_cpus[0];
539 
540  approximate_distinct_tuples(hll_result,
541  count_distinct_desc.bitmap_sz_bits,
542  padded_size_bytes,
543  columns_per_device.front().join_columns,
544  columns_per_device.front().join_column_types,
545  thread_count);
546  for (int i = 1; i < thread_count; ++i) {
547  hll_unify(hll_result,
548  hll_result + i * padded_size_bytes,
549  1 << count_distinct_desc.bitmap_sz_bits);
550  }
551  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
552  }
553 #ifdef HAVE_CUDA
554  auto data_mgr = executor_->getDataMgr();
555  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
556  for (auto& host_hll_buffer : host_hll_buffers) {
557  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
558  }
559  std::vector<std::future<void>> approximate_distinct_device_threads;
560  for (int device_id = 0; device_id < device_count_; ++device_id) {
561  approximate_distinct_device_threads.emplace_back(std::async(
563  [device_id,
564  &columns_per_device,
565  &count_distinct_desc,
566  data_mgr,
567  &host_hll_buffers] {
568  auto allocator = std::make_unique<CudaAllocator>(
569  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
570  auto device_hll_buffer =
571  allocator->alloc(count_distinct_desc.bitmapPaddedSizeBytes());
572  data_mgr->getCudaMgr()->zeroDeviceMem(
573  device_hll_buffer,
574  count_distinct_desc.bitmapPaddedSizeBytes(),
575  device_id,
577  const auto& columns_for_device = columns_per_device[device_id];
578  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
579  columns_for_device.join_columns, *allocator);
580  auto join_column_types_gpu = transfer_vector_of_flat_objects_to_gpu(
581  columns_for_device.join_column_types, *allocator);
582  const auto key_handler =
583  GenericKeyHandler(columns_for_device.join_columns.size(),
584  true,
585  join_columns_gpu,
586  join_column_types_gpu,
587  nullptr,
588  nullptr);
589  const auto key_handler_gpu =
590  transfer_flat_object_to_gpu(key_handler, *allocator);
592  reinterpret_cast<uint8_t*>(device_hll_buffer),
593  count_distinct_desc.bitmap_sz_bits,
594  key_handler_gpu,
595  columns_for_device.join_columns[0].num_elems);
596 
597  auto& host_hll_buffer = host_hll_buffers[device_id];
598  allocator->copyFromDevice(&host_hll_buffer[0],
599  device_hll_buffer,
600  count_distinct_desc.bitmapPaddedSizeBytes());
601  }));
602  }
603  for (auto& child : approximate_distinct_device_threads) {
604  child.get();
605  }
606  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
607  auto& result_hll_buffer = host_hll_buffers.front();
608  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
609  for (int device_id = 1; device_id < device_count_; ++device_id) {
610  auto& host_hll_buffer = host_hll_buffers[device_id];
611  hll_unify(hll_result,
612  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
613  1 << count_distinct_desc.bitmap_sz_bits);
614  }
615  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
616 #else
617  UNREACHABLE();
618  return {0, 0};
619 #endif // HAVE_CUDA
620 }
621 
623  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
624  const int device_id,
625  DeviceAllocator* dev_buff_owner) {
626  const auto effective_memory_level =
628 
629  std::vector<JoinColumn> join_columns;
630  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
631  std::vector<JoinColumnTypeInfo> join_column_types;
632  std::vector<JoinBucketInfo> join_bucket_info;
633  std::vector<std::shared_ptr<void>> malloc_owner;
634  for (const auto& inner_outer_pair : inner_outer_pairs_) {
635  const auto inner_col = inner_outer_pair.first;
636  const auto inner_cd = get_column_descriptor_maybe(
637  inner_col->get_column_id(), inner_col->get_table_id(), *catalog_);
638  if (inner_cd && inner_cd->isVirtualCol) {
640  }
641  join_columns.emplace_back(fetchJoinColumn(inner_col,
642  fragments,
643  effective_memory_level,
644  device_id,
645  chunks_owner,
646  dev_buff_owner,
647  malloc_owner,
648  executor_,
649  &column_cache_));
650  const auto& ti = inner_col->get_type_info();
651  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
652  0,
653  0,
655  isBitwiseEq(),
656  0,
658  }
659  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
660 }
661 
663  const HashType layout,
664  const int device_id,
665  const size_t entry_count,
666  const size_t emitted_keys_count,
668  const logger::ThreadId parent_thread_id) {
670  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
671  const auto effective_memory_level =
673  const auto err = initHashTableForDevice(columns_for_device.join_columns,
674  columns_for_device.join_column_types,
675  columns_for_device.join_buckets,
676  layout,
677  effective_memory_level,
678  entry_count,
679  emitted_keys_count,
680  device_id);
681  if (err) {
682  throw HashJoinFail(
683  std::string("Unrecognized error when initializing baseline hash table (") +
684  std::to_string(err) + std::string(")"));
685  }
686 }
687 
690  return 0;
691  }
694 }
695 
697  for (const auto& inner_outer_pair : inner_outer_pairs_) {
698  const auto inner_col = inner_outer_pair.first;
699  const auto& inner_col_ti = inner_col->get_type_info();
700  if (inner_col_ti.get_logical_size() > 4) {
701  CHECK_EQ(8, inner_col_ti.get_logical_size());
702  return 8;
703  }
704  }
705  return 4;
706 }
707 
709  return inner_outer_pairs_.size();
710 }
711 
713  const std::vector<InnerOuter>& inner_outer_pairs) const {
715  inner_outer_pairs, inner_outer_string_op_infos_pairs_, executor_)) {
718  }
719  return memory_level_;
720 }
721 
723  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
724  const int device_id,
725  Data_Namespace::DataMgr* data_mgr) {
727 
728  builder.allocateDeviceMemory(cpu_hash_table->getLayout(),
731  cpu_hash_table->getEntryCount(),
732  cpu_hash_table->getEmittedKeysCount(),
733  device_id,
734  executor_,
735  query_hints_);
736  auto gpu_target_hash_table = builder.getHashTable();
737  CHECK(gpu_target_hash_table);
738 
739  const auto gpu_buff = gpu_target_hash_table->getGpuBuffer();
740  CHECK(gpu_buff);
741  auto allocator = std::make_unique<CudaAllocator>(
742  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
743  allocator->copyToDevice(
744  gpu_buff,
745  cpu_hash_table->getCpuBuffer(),
746  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
747  hash_tables_for_device_[device_id] = std::move(gpu_target_hash_table);
748 }
749 
751  const std::vector<const StringDictionaryProxy::IdMap*>& str_proxy_translation_maps) {
752  StrProxyTranslationMapsPtrsAndOffsets translation_map_ptrs_and_offsets;
753  // First element of pair is vector of int32_t* pointing to translation map "vector"
754  // Second element of pair is vector of int32_t of min inner dictionary ids (offsets)
755  const size_t num_translation_maps = str_proxy_translation_maps.size();
756  translation_map_ptrs_and_offsets.first.reserve(num_translation_maps);
757  translation_map_ptrs_and_offsets.second.reserve(num_translation_maps);
758  for (const auto& str_proxy_translation_map : str_proxy_translation_maps) {
759  if (str_proxy_translation_map) {
760  translation_map_ptrs_and_offsets.first.emplace_back(
761  str_proxy_translation_map->data());
762  translation_map_ptrs_and_offsets.second.emplace_back(
763  str_proxy_translation_map->domainStart());
764  } else {
765  // dummy values
766  translation_map_ptrs_and_offsets.first.emplace_back(nullptr);
767  translation_map_ptrs_and_offsets.second.emplace_back(0);
768  }
769  }
770  return translation_map_ptrs_and_offsets;
771 }
772 
774  const std::vector<JoinColumn>& join_columns,
775  const std::vector<JoinColumnTypeInfo>& join_column_types,
776  const std::vector<JoinBucketInfo>& join_bucket_info,
777  const HashType layout,
778  const Data_Namespace::MemoryLevel effective_memory_level,
779  const size_t entry_count,
780  const size_t emitted_keys_count,
781  const int device_id) {
782  auto timer = DEBUG_TIMER(__func__);
783  const auto key_component_count = getKeyComponentCount();
784  int err = 0;
785  decltype(std::chrono::steady_clock::now()) ts1, ts2;
786  ts1 = std::chrono::steady_clock::now();
787  auto allow_hashtable_recycling =
792  HashType hashtable_layout = layout;
793  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
794  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
795 
796  const auto composite_key_info = HashJoin::getCompositeKeyInfo(
798 
799  CHECK(!join_columns.empty());
800 
802  CHECK_EQ(device_id, 0);
803  }
804  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
805  std::shared_ptr<HashTable> hash_table{nullptr};
806  const auto str_proxy_translation_map_ptrs_and_offsets =
809 
810  const auto key_handler =
811  GenericKeyHandler(key_component_count,
812  true,
813  &join_columns[0],
814  &join_column_types[0],
815  &str_proxy_translation_map_ptrs_and_offsets.first[0],
816  &str_proxy_translation_map_ptrs_and_offsets.second[0]);
817  err = builder.initHashTableOnCpu(&key_handler,
818  composite_key_info,
819  join_columns,
820  join_column_types,
821  join_bucket_info,
822  str_proxy_translation_map_ptrs_and_offsets,
823  entry_count,
824  join_columns.front().num_elems,
825  hashtable_layout,
826  join_type_,
829  query_hints_);
830  hash_tables_for_device_[device_id] = builder.getHashTable();
831  ts2 = std::chrono::steady_clock::now();
832  auto hashtable_build_time =
833  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
834  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
835  // add ht-related items to cache iff we have a valid hashtable
838  hash_tables_for_device_[device_id],
840  hashtable_build_time);
841 
842  hash_table_layout_cache_->putItemToCache(
843  hashtable_cache_key_[device_id],
844  hash_tables_for_device_[device_id]->getLayout(),
847  0,
848  0,
849  {});
850  }
851  // Transfer the hash table on the GPU if we've only built it on CPU
852  // but the query runs on GPU (join on dictionary encoded columns).
853  // Don't transfer the buffer if there was an error since we'll bail anyway.
854  if (memory_level_ == Data_Namespace::GPU_LEVEL && !err) {
855 #ifdef HAVE_CUDA
856  auto cpu_hash_table = std::dynamic_pointer_cast<BaselineHashTable>(
857  hash_tables_for_device_[device_id]);
858  copyCpuHashTableToGpu(cpu_hash_table, device_id, executor_->getDataMgr());
859 #else
860  CHECK(false);
861 #endif
862  }
863  } else {
864 #ifdef HAVE_CUDA
866 
867  auto data_mgr = executor_->getDataMgr();
868  CudaAllocator allocator(
869  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
870  auto join_column_types_gpu =
871  transfer_vector_of_flat_objects_to_gpu(join_column_types, allocator);
872  auto join_columns_gpu =
873  transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
874  const auto key_handler = GenericKeyHandler(key_component_count,
875  true,
876  join_columns_gpu,
877  join_column_types_gpu,
878  nullptr,
879  nullptr);
880 
881  err = builder.initHashTableOnGpu(&key_handler,
882  join_columns,
883  hashtable_layout,
884  join_type_,
887  entry_count,
888  emitted_keys_count,
889  device_id,
890  executor_,
891  query_hints_);
892  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
893  hash_tables_for_device_[device_id] = builder.getHashTable();
894  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
895  // add layout to cache iff we have a valid hashtable
896  hash_table_layout_cache_->putItemToCache(
897  hashtable_cache_key_[device_id],
898  hash_tables_for_device_[device_id]->getLayout(),
901  0,
902  0,
903  {});
904  }
905 #else
906  UNREACHABLE();
907 #endif
908  }
909  return err;
910 }
911 
912 #define LL_CONTEXT executor_->cgen_state_->context_
913 #define LL_BUILDER executor_->cgen_state_->ir_builder_
914 #define LL_INT(v) executor_->cgen_state_->llInt(v)
915 #define LL_FP(v) executor_->cgen_state_->llFp(v)
916 #define ROW_FUNC executor_->cgen_state_->row_func_
917 
919  const size_t index) {
920  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
922  const auto key_component_width = getKeyComponentWidth();
923  CHECK(key_component_width == 4 || key_component_width == 8);
924  auto key_buff_lv = codegenKey(co);
925  const auto hash_ptr = hashPtr(index);
926  const auto key_ptr_lv =
927  LL_BUILDER.CreatePointerCast(key_buff_lv, llvm::Type::getInt8PtrTy(LL_CONTEXT));
928  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
929  const auto hash_table = getHashTableForDevice(size_t(0));
930  return executor_->cgen_state_->emitExternalCall(
931  "baseline_hash_join_idx_" + std::to_string(key_component_width * 8),
933  {hash_ptr, key_ptr_lv, key_size_lv, LL_INT(hash_table->getEntryCount())});
934 }
935 
937  const CompilationOptions& co,
938  const size_t index) {
939  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
940  const auto hash_table = getHashTableForDevice(size_t(0));
941  CHECK(hash_table);
942  const auto key_component_width = getKeyComponentWidth();
943  CHECK(key_component_width == 4 || key_component_width == 8);
944  auto key_buff_lv = codegenKey(co);
946  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
947  const auto composite_dict_ptr_type =
948  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
949  const auto composite_key_dict =
950  hash_ptr->getType()->isPointerTy()
951  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
952  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
953  const auto key_component_count = getKeyComponentCount();
954  const auto key = executor_->cgen_state_->emitExternalCall(
955  "get_composite_key_index_" + std::to_string(key_component_width * 8),
957  {key_buff_lv,
958  LL_INT(key_component_count),
959  composite_key_dict,
960  LL_INT(hash_table->getEntryCount())});
961  auto one_to_many_ptr = hash_ptr;
962  if (one_to_many_ptr->getType()->isPointerTy()) {
963  one_to_many_ptr =
964  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
965  } else {
966  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
967  }
968  const auto composite_key_dict_size = offsetBufferOff();
969  one_to_many_ptr =
970  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
972  {one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(hash_table->getEntryCount() - 1)},
973  false,
974  false,
975  false,
977  executor_);
978 }
979 
980 size_t BaselineJoinHashTable::offsetBufferOff() const noexcept {
981  return getKeyBufferSize();
982 }
983 
984 size_t BaselineJoinHashTable::countBufferOff() const noexcept {
987  } else {
988  return getKeyBufferSize();
989  }
990 }
991 
995  } else {
996  return getKeyBufferSize();
997  }
998 }
999 
1001  const auto key_component_width = getKeyComponentWidth();
1002  CHECK(key_component_width == 4 || key_component_width == 8);
1003  const auto key_component_count = getKeyComponentCount();
1004  auto hash_table = getHashTableForDevice(size_t(0));
1005  CHECK(hash_table);
1006  if (layoutRequiresAdditionalBuffers(hash_table->getLayout())) {
1007  return hash_table->getEntryCount() * key_component_count * key_component_width;
1008  } else {
1009  return hash_table->getEntryCount() * (key_component_count + 1) * key_component_width;
1010  }
1011 }
1012 
1014  const auto hash_table = getHashTableForDevice(size_t(0));
1015  return hash_table->getEntryCount() * sizeof(int32_t);
1016 }
1017 
1019  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1020  const auto key_component_width = getKeyComponentWidth();
1021  CHECK(key_component_width == 4 || key_component_width == 8);
1022  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1023  llvm::Value* key_buff_lv{nullptr};
1024  switch (key_component_width) {
1025  case 4:
1026  key_buff_lv =
1027  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1028  break;
1029  case 8:
1030  key_buff_lv =
1031  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1032  break;
1033  default:
1034  CHECK(false);
1035  }
1036 
1037  CodeGenerator code_generator(executor_);
1038  for (size_t i = 0; i < getKeyComponentCount(); ++i) {
1039  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(
1040  key_buff_lv->getType()->getScalarType()->getPointerElementType(),
1041  key_buff_lv,
1042  LL_INT(i));
1043  const auto& inner_outer_pair = inner_outer_pairs_[i];
1044  const auto outer_col = inner_outer_pair.second;
1045  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1046  const auto val_col_var =
1047  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pair.first);
1048  if (key_col_var && val_col_var &&
1050  key_col_var,
1051  val_col_var,
1052  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1053  throw std::runtime_error(
1054  "Query execution fails because the query contains not supported self-join "
1055  "pattern. We suspect the query requires multiple left-deep join tree due to "
1056  "the join condition of the self-join and is not supported for now. Please "
1057  "consider rewriting table order in "
1058  "FROM clause.");
1059  }
1060  auto key_lv = HashJoin::codegenColOrStringOper(
1061  outer_col, inner_outer_string_op_infos_pairs_[i].second, code_generator, co);
1062  const auto key_lv_ext =
1063  LL_BUILDER.CreateSExt(key_lv, get_int_type(key_component_width * 8, LL_CONTEXT));
1064  LL_BUILDER.CreateStore(key_lv_ext, key_comp_dest_lv);
1065  }
1066  return key_buff_lv;
1067 }
1068 
1069 llvm::Value* BaselineJoinHashTable::hashPtr(const size_t index) {
1070  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1071  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1072  const auto pi8_type = llvm::Type::getInt8PtrTy(LL_CONTEXT);
1073  return hash_ptr->getType()->isPointerTy()
1074  ? LL_BUILDER.CreatePointerCast(hash_ptr, pi8_type)
1075  : LL_BUILDER.CreateIntToPtr(hash_ptr, pi8_type);
1076 }
1077 
1078 #undef ROW_FUNC
1079 #undef LL_INT
1080 #undef LL_BUILDER
1081 #undef LL_CONTEXT
1082 
1084  try {
1086  } catch (...) {
1087  CHECK(false);
1088  }
1089  return 0;
1090 }
1091 
1093  CHECK(!inner_outer_pairs_.empty());
1094  const auto first_inner_col = inner_outer_pairs_.front().first;
1095  return first_inner_col->get_rte_idx();
1096 }
1097 
1099  auto hash_table = getHashTableForDevice(size_t(0));
1100  CHECK(hash_table);
1101  if (layout_override_) {
1102  return *layout_override_;
1103  } else {
1104  return hash_table->getLayout();
1105  }
1106 }
1107 
1109  const std::vector<InnerOuter>& inner_outer_pairs) {
1110  CHECK(!inner_outer_pairs.empty());
1111  const auto first_inner_col = inner_outer_pairs.front().first;
1112  return first_inner_col->get_table_id();
1113 }
1114 
1116  QueryPlanHash key,
1117  CacheItemType item_type,
1118  DeviceIdentifier device_identifier) {
1119  auto timer = DEBUG_TIMER(__func__);
1120  VLOG(1) << "Checking CPU hash table cache.";
1122  return hash_table_cache_->getItemFromCache(key, item_type, device_identifier);
1123 }
1124 
1126  QueryPlanHash key,
1127  CacheItemType item_type,
1128  std::shared_ptr<HashTable> hashtable_ptr,
1129  DeviceIdentifier device_identifier,
1130  size_t hashtable_building_time) {
1132  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
1133  hash_table_cache_->putItemToCache(
1134  key,
1135  hashtable_ptr,
1136  item_type,
1137  device_identifier,
1138  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
1139  hashtable_building_time);
1140 }
1141 
1143  return condition_->get_optype() == kBW_EQ;
1144 }
1145 
1147  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments) const {
1148  std::vector<int> fragment_ids;
1149  std::for_each(
1150  fragments.cbegin(), fragments.cend(), [&fragment_ids](const auto& fragment) {
1151  fragment_ids.push_back(fragment.fragmentId);
1152  });
1153  return fragment_ids;
1154 }
QidScopeGuard set_thread_local_query_id(QueryId const query_id)
Definition: Logger.cpp:486
size_t offsetBufferOff() const noexceptoverride
catalog_(nullptr)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:164
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::vector< QueryPlanHash > hashtable_cache_key_
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1033
ExecutorDeviceType
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
virtual void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const size_t entry_count, const size_t emitted_keys_count, const logger::QueryId, const logger::ThreadId parent_thread_id)
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:996
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:107
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:361
#define UNREACHABLE()
Definition: Logger.h:266
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
HashTableBuildDagMap hashtable_build_dag_map_
size_t getKeyBufferSize() const noexcept
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:379
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const RegisteredQueryHint &query_hint)
const TableIdToNodeMap table_id_to_node_map_
size_t getComponentBufferSize() const noexceptoverride
RegisteredQueryHint query_hints_
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:544
void freeHashBufferMemory()
Definition: HashJoin.h:321
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:88
#define CHECK_GT(x, y)
Definition: Logger.h:234
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
int getInnerTableRteIdx() const noexceptoverride
std::string to_string(char const *&&v)
std::unordered_set< size_t > table_keys_
virtual ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
uint64_t QueryId
Definition: Logger.h:335
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
void reify(const HashType preferred_layout)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
future< Result > async(Fn &&fn, Args &&...args)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
HashType getHashType() const noexceptoverride
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForBaselineHashJoin &info)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
CacheItemType
Definition: DataRecycler.h:38
std::vector< InnerOuterStringOpInfos > inner_outer_string_op_infos_pairs_
static std::unique_ptr< HashtableRecycler > hash_table_cache_
ColumnCacheMap & column_cache_
#define LL_INT(v)
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:298
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const JoinType join_type, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &col_pairs_string_op_infos, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
static std::vector< const StringDictionaryProxy::IdMap * > translateCompositeStrDictProxies(const CompositeKeyInfo &composite_key_info, const std::vector< InnerOuterStringOpInfos > &string_op_infos_for_keys, const Executor *executor)
Definition: HashJoin.cpp:508
HashtableCacheMetaInfo hashtable_cache_meta_info_
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
#define LL_CONTEXT
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:776
virtual void reifyWithLayout(const HashType layout)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:279
#define VLOGGING(n)
Definition: Logger.h:220
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:232
int getInnerTableId() const noexceptoverride
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
#define LL_BUILDER
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:164
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, int db_id, int inner_table_id)
Definition: DataRecycler.h:154
std::optional< HashType > layout_override_
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const Catalog_Namespace::Catalog * catalog_
uint64_t ThreadId
Definition: Logger.h:367
size_t QueryPlanHash
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
const Data_Namespace::MemoryLevel memory_level_
llvm::Value * hashPtr(const size_t index)
ThreadId thread_id()
Definition: Logger.cpp:819
virtual int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const size_t entry_count, const size_t emitted_keys_count, const int device_id)
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const int64_t num_elems)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
bool g_enable_watchdog false
Definition: Execute.cpp:79
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override
#define CHECK(condition)
Definition: Logger.h:222
QueryId query_id()
Definition: Logger.cpp:473
#define DEBUG_TIMER(name)
Definition: Logger.h:374
Definition: sqldefs.h:30
virtual size_t getKeyComponentCount() const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void copyCpuHashTableToGpu(std::shared_ptr< BaselineHashTable > &cpu_hash_table, const int device_id, Data_Namespace::DataMgr *data_mgr)
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments) const
std::vector< const StringDictionaryProxy::IdMap * > str_proxy_translation_maps_
virtual size_t getKeyComponentWidth() const
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, DeviceAllocator &allocator)
Allocate GPU memory using GpuBuffers via DataMgr.
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
int cpu_threads()
Definition: thread_count.h:25
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1052
StrProxyTranslationMapsPtrsAndOffsets decomposeStrDictTranslationMaps(const std::vector< const StringDictionaryProxy::IdMap * > &str_proxy_translation_maps)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
#define VLOG(n)
Definition: Logger.h:316
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:160
const std::shared_ptr< Analyzer::BinOper > condition_
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const int table_id)
size_t countBufferOff() const noexceptoverride
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:460