OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <thread>
23 
24 #include "Logger/Logger.h"
27 #include "QueryEngine/Execute.h"
32 
33 // let's only consider CPU hahstable recycler at this moment
34 std::unique_ptr<HashtableRecycler> PerfectJoinHashTable::hash_table_cache_ =
35  std::make_unique<HashtableRecycler>(CacheItemType::PERFECT_HT,
37 std::unique_ptr<HashingSchemeRecycler> PerfectJoinHashTable::hash_table_layout_cache_ =
38  std::make_unique<HashingSchemeRecycler>();
39 
40 namespace {
41 std::pair<InnerOuter, InnerOuterStringOpInfos> get_cols(
42  const Analyzer::BinOper* qual_bin_oper,
44  const TemporaryTables* temporary_tables) {
45  const auto lhs = qual_bin_oper->get_left_operand();
46  const auto rhs = qual_bin_oper->get_right_operand();
47  return HashJoin::normalizeColumnPair(lhs, rhs, cat, temporary_tables);
48 }
49 
51  ExpressionRange const& col_range,
52  bool const is_bw_eq) {
53  using EmptyRangeSize = boost::optional<size_t>;
54  auto empty_range_check = [](ExpressionRange const& col_range,
55  bool const is_bw_eq) -> EmptyRangeSize {
56  if (col_range.getIntMin() > col_range.getIntMax()) {
57  CHECK_EQ(col_range.getIntMin(), int64_t(0));
58  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
59  if (is_bw_eq) {
60  return size_t(1);
61  }
62  return size_t(0);
63  }
64  return EmptyRangeSize{};
65  };
66 
67  auto empty_range = empty_range_check(col_range, is_bw_eq);
68  if (empty_range) {
69  return {size_t(*empty_range), 1};
70  }
71 
72  int64_t bucket_normalization =
73  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
74  CHECK_GT(bucket_normalization, 0);
75  auto const normalized_max = col_range.getIntMax() / bucket_normalization;
76  auto const normalized_min = col_range.getIntMin() / bucket_normalization;
77  return {size_t(normalized_max - normalized_min + 1 + (is_bw_eq ? 1 : 0)),
78  bucket_normalization};
79 }
80 
81 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
82  if (col_range.getIntMin() > col_range.getIntMax()) {
83  CHECK_EQ(col_range.getIntMin(), int64_t(0));
84  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
85  return is_bw_eq ? 1 : 0;
86  }
87  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
88 }
89 
90 } // namespace
91 
92 namespace {
93 
94 bool shard_count_less_or_equal_device_count(const int inner_table_id,
95  const Executor* executor) {
96  const auto inner_table_info = executor->getTableInfo(inner_table_id);
97  std::unordered_set<int> device_holding_fragments;
98  auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
99  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
100  for (const auto& fragment : inner_table_info.fragments) {
101  if (fragment.shard != -1) {
102  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
103  if (!it_ok.second) {
104  return false;
105  }
106  }
107  }
108  return true;
109 }
110 
111 } // namespace
112 
114  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
115  const Executor* executor) {
116  const auto inner_col = equi_pair.first;
117  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
118  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
119  return 0;
120  }
121  if (outer_col->get_rte_idx()) {
122  return 0;
123  }
124  if (inner_col->get_type_info() != outer_col->get_type_info()) {
125  return 0;
126  }
127  const auto catalog = executor->getCatalog();
128  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
129  CHECK(inner_td);
130  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
131  CHECK(outer_td);
132  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
133  inner_td->nShards != outer_td->nShards) {
134  return 0;
135  }
136  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
137  return 0;
138  }
139  // The two columns involved must be the ones on which the tables have been sharded on.
140  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
141  outer_td->shardedColumnId == outer_col->get_column_id()) ||
142  (outer_td->shardedColumnId == inner_col->get_column_id() &&
143  inner_td->shardedColumnId == inner_col->get_column_id())
144  ? inner_td->nShards
145  : 0;
146 }
147 
149 std::shared_ptr<PerfectJoinHashTable> PerfectJoinHashTable::getInstance(
150  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
151  const std::vector<InputTableInfo>& query_infos,
152  const Data_Namespace::MemoryLevel memory_level,
153  const JoinType join_type,
154  const HashType preferred_hash_type,
155  const int device_count,
156  ColumnCacheMap& column_cache,
157  Executor* executor,
158  const HashTableBuildDagMap& hashtable_build_dag_map,
159  const RegisteredQueryHint& query_hints,
160  const TableIdToNodeMap& table_id_to_node_map) {
161  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
162  const auto cols_and_string_op_infos =
163  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
164  const auto& cols = cols_and_string_op_infos.first;
165  const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
166  const auto inner_col = cols.first;
167  CHECK(inner_col);
168  const auto& ti = inner_col->get_type_info();
169  auto col_range =
170  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
171  if (col_range.getType() == ExpressionRangeType::Invalid) {
172  throw HashJoinFail(
173  "Could not compute range for the expressions involved in the equijoin");
174  }
175  const auto rhs_source_col_range =
176  ti.is_string() ? getExpressionRange(inner_col, query_infos, executor) : col_range;
177  if (ti.is_string()) {
178  // The nullable info must be the same as the source column.
179  if (rhs_source_col_range.getType() == ExpressionRangeType::Invalid) {
180  throw HashJoinFail(
181  "Could not compute range for the expressions involved in the equijoin");
182  }
183  if (rhs_source_col_range.getIntMin() > rhs_source_col_range.getIntMax()) {
184  // If the inner column expression range is empty, use the inner col range
185  CHECK_EQ(rhs_source_col_range.getIntMin(), int64_t(0));
186  CHECK_EQ(rhs_source_col_range.getIntMax(), int64_t(-1));
187  col_range = rhs_source_col_range;
188  } else {
189  col_range = ExpressionRange::makeIntRange(
190  std::min(rhs_source_col_range.getIntMin(), col_range.getIntMin()),
191  std::max(rhs_source_col_range.getIntMax(), col_range.getIntMax()),
192  0,
193  rhs_source_col_range.hasNulls());
194  }
195  }
196 
197  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
198  const auto max_hash_entry_count =
200  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
201  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
202 
203  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
204  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
205  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
206  if (bucketized_entry_count > max_hash_entry_count) {
207  throw TooManyHashEntries();
208  }
209 
210  if (qual_bin_oper->get_optype() == kBW_EQ &&
211  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
212  throw HashJoinFail("Cannot translate null value for kBW_EQ");
213  }
214  decltype(std::chrono::steady_clock::now()) ts1, ts2;
215  if (VLOGGING(1)) {
216  ts1 = std::chrono::steady_clock::now();
217  }
218 
219  auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
220  new PerfectJoinHashTable(qual_bin_oper,
221  inner_col,
222  query_infos,
223  memory_level,
224  join_type,
225  preferred_hash_type,
226  col_range,
227  rhs_source_col_range,
228  bucketized_entry_count_info,
229  column_cache,
230  executor,
231  device_count,
232  query_hints,
233  hashtable_build_dag_map,
234  table_id_to_node_map,
235  inner_outer_string_op_infos));
236  try {
237  join_hash_table->reify();
238  } catch (const TableMustBeReplicated& e) {
239  // Throw a runtime error to abort the query
240  join_hash_table->freeHashBufferMemory();
241  throw std::runtime_error(e.what());
242  } catch (const HashJoinFail& e) {
243  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
244  // possible)
245  join_hash_table->freeHashBufferMemory();
246  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
247  "involved in equijoin | ") +
248  e.what());
249  } catch (const ColumnarConversionNotSupported& e) {
250  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
251  e.what());
252  } catch (const OutOfMemory& e) {
253  throw HashJoinFail(
254  std::string("Ran out of memory while building hash tables for equijoin | ") +
255  e.what());
256  } catch (const JoinHashTableTooBig& e) {
257  throw e;
258  } catch (const std::exception& e) {
259  throw std::runtime_error(
260  std::string("Fatal error while attempting to build hash tables for join: ") +
261  e.what());
262  }
263  if (VLOGGING(1)) {
264  ts2 = std::chrono::steady_clock::now();
265  VLOG(1) << "Built perfect hash table "
266  << getHashTypeString(join_hash_table->getHashType()) << " in "
267  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
268  << " ms";
269  }
270  return join_hash_table;
271 }
272 
274  const InnerOuter& inner_outer_col_pair,
275  const InnerOuterStringOpInfos& inner_outer_string_op_infos,
276  const Executor* executor) {
277  if (inner_outer_string_op_infos.first.size() ||
278  inner_outer_string_op_infos.second.size()) {
279  return true;
280  }
281  auto inner_col = inner_outer_col_pair.first;
282  auto outer_col_expr = inner_outer_col_pair.second;
283  const auto catalog = executor->getCatalog();
284  CHECK(catalog);
285  const auto inner_cd = get_column_descriptor_maybe(
286  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
287  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
288  inner_col->get_table_id(),
289  inner_cd,
290  executor->getTemporaryTables());
291  // Only strings may need dictionary translation.
292  if (!inner_ti.is_string()) {
293  return false;
294  }
295  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
296  CHECK(outer_col);
297  const auto outer_cd = get_column_descriptor_maybe(
298  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
299  // Don't want to deal with temporary tables for now, require translation.
300  if (!inner_cd || !outer_cd) {
301  return true;
302  }
303  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
304  outer_col->get_table_id(),
305  outer_cd,
306  executor->getTemporaryTables());
307  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
308  // If the two columns don't share the dictionary, translation is needed.
309  if (outer_ti.get_comp_param() != inner_ti.get_comp_param()) {
310  return true;
311  }
312  const auto inner_str_dict_proxy =
313  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
314  CHECK(inner_str_dict_proxy);
315  const auto outer_str_dict_proxy =
316  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
317  CHECK(outer_str_dict_proxy);
318 
319  return *inner_str_dict_proxy != *outer_str_dict_proxy;
320 }
321 
322 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
323  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
324  const int device_id,
325  const int device_count) {
326  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
327  for (const auto& fragment : fragments) {
328  CHECK_GE(fragment.shard, 0);
329  if (fragment.shard % device_count == device_id) {
330  shards_for_device.push_back(fragment);
331  }
332  }
333  return shards_for_device;
334 }
335 
337  const std::vector<ColumnsForDevice>& columns_per_device) const {
338  CHECK(!inner_outer_pairs_.empty());
339  const auto& rhs_col_ti = inner_outer_pairs_.front().first->get_type_info();
340  const auto max_unique_hash_input_entries =
342  rhs_col_ti, rhs_source_col_range_, qual_bin_oper_->get_optype() == kBW_EQ)
345  for (const auto& device_columns : columns_per_device) {
346  CHECK(!device_columns.join_columns.empty());
347  const auto rhs_join_col_num_entries = device_columns.join_columns.front().num_elems;
348  if (rhs_join_col_num_entries > max_unique_hash_input_entries) {
349  VLOG(1) << "Skipping attempt to build perfect hash one-to-one table as number of "
350  "rhs column entries ("
351  << rhs_join_col_num_entries << ") exceeds range for rhs join column ("
352  << max_unique_hash_input_entries << ").";
353  return false;
354  }
355  }
356  return true;
357 }
358 
360  auto timer = DEBUG_TIMER(__func__);
362  auto catalog = const_cast<Catalog_Namespace::Catalog*>(executor_->getCatalog());
363  const auto cols =
364  get_cols(qual_bin_oper_.get(), *catalog, executor_->temporary_tables_).first;
365  const auto inner_col = cols.first;
367  inner_col->get_table_id(),
369  executor_);
370  const auto& query_info = getInnerQueryInfo(inner_col).info;
371  if (query_info.fragments.empty()) {
372  return;
373  }
374  if (query_info.getNumTuplesUpperBound() >
375  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
376  throw TooManyHashEntries();
377  }
378  std::vector<std::future<void>> init_threads;
379  const int shard_count = shardCount();
380 
381  inner_outer_pairs_.push_back(cols);
382  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
383  // Todo(todd): Clean up the fact that we store the inner outer column pairs as a vector,
384  // even though only one is ever valid for perfect hash layout. Either move to 1 or keep
385  // the vector but move it to the HashTable parent class
388 
389  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
390  std::vector<ColumnsForDevice> columns_per_device;
391  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
392 
393  auto data_mgr = executor_->getDataMgr();
394  // check the existence of cached hash table here before fetching columns
395  // if available, skip the rest of logic and copy it to GPU if necessary
396  // there are few considerable things:
397  // 1. if table is sharded? --> deploy per-device logic
398  // here, each device may load different set of fragments, so their cache keys are
399  // different accordingly
400  // 2. otherwise, each device has the same hash table built from "all" fragments
401  // and their cache keys are the same (but we stick to per-device cache key vector)
402  // here, for CPU, we consider its # device to be one
403  // for GPU, each device builds its own hash table, or we build a single hash table on
404  // CPU and then copy it to each device
405  // 3. if cache key is not available? --> use alternative cache key
406 
407  // retrieve fragment lists and chunk key per device
408  std::vector<ChunkKey> chunk_key_per_device;
409  auto outer_col =
410  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pairs_.front().second);
411  for (int device_id = 0; device_id < device_count_; ++device_id) {
412  fragments_per_device.emplace_back(
413  shard_count
414  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
415  : query_info.fragments);
417  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
418  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
419  }
420  const auto chunk_key =
421  genChunkKey(fragments_per_device[device_id], outer_col, inner_col);
422  chunk_key_per_device.emplace_back(std::move(chunk_key));
423  }
424 
425  // try to extract cache key for hash table and its relevant info
426  auto hashtable_access_path_info =
429  qual_bin_oper_->get_optype(),
430  join_type_,
433  shard_count,
434  fragments_per_device,
435  executor_);
436  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
437  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
438  table_keys_ = hashtable_access_path_info.table_keys;
439 
440  if (table_keys_.empty()) {
441  // the actual chunks fetched per device can be different but they constitute the same
442  // table in the same db, so we can exploit this to create an alternative table key
444  chunk_key_per_device,
445  executor_->getCatalog()->getDatabaseId(),
446  getInnerTableId());
447  }
448  CHECK(!table_keys_.empty());
449 
450  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
451  getInnerTableId() > 0) {
452  // sometimes we cannot retrieve query plan dag, so try to recycler cache
453  // with the old-fashioned cache key if we deal with hashtable of non-temporary table
454  for (int device_id = 0; device_id < device_count_; ++device_id) {
455  const auto num_tuples = std::accumulate(
456  fragments_per_device[device_id].begin(),
457  fragments_per_device[device_id].end(),
458  size_t(0),
459  [](size_t sum, const auto& fragment) { return sum + fragment.getNumTuples(); });
461  inner_col,
462  outer_col ? outer_col : inner_col,
464  chunk_key_per_device[device_id],
465  num_tuples,
466  qual_bin_oper_->get_optype(),
467  join_type_};
468  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
469  }
470  }
471 
472  // register a mapping between cache key and its input table info for per-table cache
473  // invalidation if we have valid cache key for "all" devices (otherwise, we skip to use
474  // cached hash table for safety)
475  const bool invalid_cache_key =
476  HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_);
477  if (!invalid_cache_key) {
478  if (!shard_count) {
479  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_.front(),
480  table_keys_);
481  } else {
482  std::for_each(hashtable_cache_key_.cbegin(),
483  hashtable_cache_key_.cend(),
484  [this](QueryPlanHash key) {
485  hash_table_cache_->addQueryPlanDagForTableKeys(key, table_keys_);
486  });
487  }
488  }
489 
490  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
491 
492  // Assume we will need one-to-many if we have a string operation, as these tend
493  // to be cardinality-reducting operations, i.e. |S(t)| < |t|
494  // Todo(todd): Ostensibly only string ops on the rhs/inner expression cause rhs dups and
495  // so we may be too conservative here, but validate
496 
497  const bool has_string_ops = inner_outer_string_op_infos_.first.size() ||
498  inner_outer_string_op_infos_.second.size();
499 
500  // Also check if on the number of entries per column exceeds the rhs join hash table
501  // range, and skip trying to build a One-to-One hash table if so. There is a slight edge
502  // case where this can be overly pessimistic, and that is if the non-null values are all
503  // unique, but there are multiple null values, but we currently don't have the metadata
504  // to track null counts (only column nullability from the ddl and null existence from
505  // the encoded data), and this is probably too much of an edge case to worry about for
506  // now given the general performance benfits of skipping 1:1 if we are fairly confident
507  // it is doomed up front
508 
509  // Now check if on the number of entries per column exceeds the rhs join hash table
510  // range, and skip trying to build a One-to-One hash table if so
512  (has_string_ops || !isOneToOneHashPossible(columns_per_device))) {
514  }
515 
516  // todo (yoonmin) : support dictionary proxy cache for join including string op(s)
517  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
518  // construct string dictionary proxies if necessary
519  std::unique_lock<std::mutex> str_proxy_translation_lock(str_proxy_translation_mutex_);
520  if (needs_dict_translation_ && !str_proxy_translation_map_) {
521  CHECK_GE(inner_outer_pairs_.size(), 1UL);
522  auto const copied_col_range = col_range_;
526  col_range_,
527  executor_);
528  // update hash entry info if necessary
529  if (!(col_range_ == copied_col_range)) {
531  inner_col->get_type_info(), col_range_, isBitwiseEq());
532  }
533  }
534  }
535 
536  auto allow_hashtable_recycling =
538  needs_dict_translation_,
540  inner_col->get_table_id());
541  bool has_invalid_cached_hash_table = false;
542  if (effective_memory_level == Data_Namespace::CPU_LEVEL &&
544  allow_hashtable_recycling, invalid_cache_key, join_type_)) {
545  // build a hash table on CPU, and we have a chance to recycle the cached one if
546  // available
547  for (int device_id = 0; device_id < device_count_; ++device_id) {
548  auto hash_table =
549  initHashTableOnCpuFromCache(hashtable_cache_key_[device_id],
552  if (hash_table) {
553  hash_tables_for_device_[device_id] = hash_table;
554  hash_type_ = hash_table->getLayout();
555  } else {
556  has_invalid_cached_hash_table = true;
557  break;
558  }
559  }
560 
561  if (has_invalid_cached_hash_table) {
562  hash_tables_for_device_.clear();
563  hash_tables_for_device_.resize(device_count_);
564  } else {
566 #ifdef HAVE_CUDA
567  for (int device_id = 0; device_id < device_count_; ++device_id) {
568  auto cpu_hash_table = std::dynamic_pointer_cast<PerfectHashTable>(
569  hash_tables_for_device_[device_id]);
570  copyCpuHashTableToGpu(cpu_hash_table, device_id, data_mgr);
571  }
572 #else
573  UNREACHABLE();
574 #endif
575  }
576  return;
577  }
578  }
579 
580  // we have no cached hash table for this qual
581  // so, start building the hash table by fetching columns for devices
582  for (int device_id = 0; device_id < device_count_; ++device_id) {
583  columns_per_device.emplace_back(
584  fetchColumnsForDevice(fragments_per_device[device_id],
585  device_id,
587  ? dev_buff_owners[device_id].get()
588  : nullptr,
589  *catalog));
590  }
591 
592  try {
593  for (int device_id = 0; device_id < device_count_; ++device_id) {
594  const auto chunk_key = genChunkKey(fragments_per_device[device_id],
595  inner_outer_pairs_.front().second,
596  inner_outer_pairs_.front().first);
597  init_threads.push_back(std::async(std::launch::async,
599  this,
600  chunk_key,
601  columns_per_device[device_id],
602  hash_type_,
603  device_id,
605  }
606  for (auto& init_thread : init_threads) {
607  init_thread.wait();
608  }
609  for (auto& init_thread : init_threads) {
610  init_thread.get();
611  }
612  } catch (const NeedsOneToManyHash& e) {
613  VLOG(1) << "RHS/Inner hash join values detected to not be unique, falling back to "
614  "One-to-Many hash layout.";
618  init_threads.clear();
620  CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
621  }
622  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
623  for (int device_id = 0; device_id < device_count_; ++device_id) {
624  const auto chunk_key = genChunkKey(fragments_per_device[device_id],
625  inner_outer_pairs_.front().second,
626  inner_outer_pairs_.front().first);
627  init_threads.push_back(std::async(std::launch::async,
629  this,
630  chunk_key,
631  columns_per_device[device_id],
632  hash_type_,
633  device_id,
635  }
636  for (auto& init_thread : init_threads) {
637  init_thread.wait();
638  }
639  for (auto& init_thread : init_threads) {
640  init_thread.get();
641  }
642  }
643 }
644 
646  const std::vector<InnerOuter>& inner_outer_pairs) const {
648  inner_outer_pairs.front(), inner_outer_string_op_infos_, executor_)) {
651  }
652  return memory_level_;
653 }
654 
656  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
657  const int device_id,
658  DeviceAllocator* dev_buff_owner,
659  const Catalog_Namespace::Catalog& catalog) {
660  std::vector<JoinColumn> join_columns;
661  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
662  std::vector<JoinColumnTypeInfo> join_column_types;
663  std::vector<JoinBucketInfo> join_bucket_info;
664  std::vector<std::shared_ptr<void>> malloc_owner;
665  const auto effective_memory_level =
667  for (const auto& inner_outer_pair : inner_outer_pairs_) {
668  const auto inner_col = inner_outer_pair.first;
669  const auto inner_cd = get_column_descriptor_maybe(
670  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
671  if (inner_cd && inner_cd->isVirtualCol) {
673  }
674  join_columns.emplace_back(fetchJoinColumn(inner_col,
675  fragments,
676  effective_memory_level,
677  device_id,
678  chunks_owner,
679  dev_buff_owner,
680  malloc_owner,
681  executor_,
682  &column_cache_));
683  const auto& ti = inner_col->get_type_info();
684  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
685  0,
686  0,
688  isBitwiseEq(),
689  0,
691  }
692  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
693 }
694 
696  const ChunkKey& chunk_key,
697  const ColumnsForDevice& columns_for_device,
698  const HashType layout,
699  const int device_id,
700  const logger::ThreadLocalIds parent_thread_local_ids) {
701  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
702  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
703  const auto effective_memory_level =
705 
706  CHECK_EQ(columns_for_device.join_columns.size(), size_t(1));
707  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
708  auto& join_column = columns_for_device.join_columns.front();
709  if (layout == HashType::OneToOne) {
710  const auto err = initHashTableForDevice(chunk_key,
711  join_column,
712  inner_outer_pairs_.front(),
713  layout,
714 
715  effective_memory_level,
716  device_id);
717  if (err) {
718  throw NeedsOneToManyHash();
719  }
720  } else {
721  const auto err = initHashTableForDevice(chunk_key,
722  join_column,
723  inner_outer_pairs_.front(),
725  effective_memory_level,
726  device_id);
727  if (err) {
728  throw std::runtime_error("Unexpected error building one to many hash table: " +
729  std::to_string(err));
730  }
731  }
732 }
733 
735  const ChunkKey& chunk_key,
736  const JoinColumn& join_column,
737  const InnerOuter& cols,
738  const HashType layout,
739  const Data_Namespace::MemoryLevel effective_memory_level,
740  const int device_id) {
741  auto timer = DEBUG_TIMER(__func__);
742  const auto inner_col = cols.first;
743  CHECK(inner_col);
745  // the reason of why checking the layout is OneToOne is we start to build a hash table
746  // with OneToOne layout
747  VLOG(1) << "Stop building a hash table based on a column " << inner_col->toString()
748  << ": it is from an empty table";
749  return 0;
750  }
751 #ifndef HAVE_CUDA
752  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
753 #endif
754  int err{0};
755  const int32_t hash_join_invalid_val{-1};
756  auto hashtable_layout = layout;
757  auto allow_hashtable_recycling =
761  inner_col->get_table_id());
762  if (allow_hashtable_recycling) {
763  auto cached_hashtable_layout_type = hash_table_layout_cache_->getItemFromCache(
764  hashtable_cache_key_[device_id],
767  {});
768  if (cached_hashtable_layout_type) {
769  hash_type_ = *cached_hashtable_layout_type;
770  hashtable_layout = hash_type_;
771  }
772  }
773  const auto entry_count = getNormalizedHashEntryCount();
774  const auto hash_table_entry_count = hashtable_layout == HashType::OneToOne
775  ? entry_count
776  : 2 * entry_count + join_column.num_elems;
777  const auto hash_table_size = hash_table_entry_count * sizeof(int32_t);
781  }
782  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
783  CHECK(!chunk_key.empty());
784  std::shared_ptr<PerfectHashTable> hash_table{nullptr};
785  decltype(std::chrono::steady_clock::now()) ts1, ts2;
786  ts1 = std::chrono::steady_clock::now();
787  {
788  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
790  if (hashtable_layout == HashType::OneToOne) {
791  builder.initOneToOneHashTableOnCpu(join_column,
792  col_range_,
793  isBitwiseEq(),
794  cols,
796  join_type_,
797  hashtable_layout,
799  hash_join_invalid_val,
800  executor_);
801  hash_table = builder.getHashTable();
802  } else {
803  builder.initOneToManyHashTableOnCpu(join_column,
804  col_range_,
805  isBitwiseEq(),
806  cols,
808  join_type_,
810  hash_join_invalid_val,
811  executor_);
812  hash_table = builder.getHashTable();
813  }
814  ts2 = std::chrono::steady_clock::now();
815  auto build_time =
816  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
817  hash_table->setHashEntryInfo(hash_entry_info_);
818  hash_table->setColumnNumElems(join_column.num_elems);
819  if (allow_hashtable_recycling && hash_table) {
820  // add ht-related items to cache iff we have a valid hashtable
821  hash_table_layout_cache_->putItemToCache(hashtable_cache_key_[device_id],
822  hashtable_layout,
825  0,
826  0,
827  {});
830  hash_table,
832  build_time);
833  }
834  }
835  // Transfer the hash table on the GPU if we've only built it on CPU
836  // but the query runs on GPU (join on dictionary encoded columns).
838 #ifdef HAVE_CUDA
839  const auto& ti = inner_col->get_type_info();
840  CHECK(ti.is_string());
841  auto data_mgr = executor_->getDataMgr();
842  copyCpuHashTableToGpu(hash_table, device_id, data_mgr);
843 #else
844  UNREACHABLE();
845 #endif
846  } else {
847  CHECK(hash_table);
848  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
849  hash_tables_for_device_[device_id] = hash_table;
850  }
851  } else {
852 #ifdef HAVE_CUDA
854  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
855  builder.allocateDeviceMemory(join_column,
856  hashtable_layout,
858  shardCount(),
859  device_id,
861  executor_);
862  builder.initHashTableOnGpu(chunk_key,
863  join_column,
864  col_range_,
865  isBitwiseEq(),
866  cols,
867  join_type_,
868  hashtable_layout,
870  shardCount(),
871  hash_join_invalid_val,
872  device_id,
874  executor_);
875  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
876  hash_tables_for_device_[device_id] = builder.getHashTable();
877  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
878  // add layout to cache iff we have a valid hashtable
879  hash_table_layout_cache_->putItemToCache(
880  hashtable_cache_key_[device_id],
881  hash_tables_for_device_[device_id]->getLayout(),
884  0,
885  0,
886  {});
887  }
888 #else
889  UNREACHABLE();
890 #endif
891  }
892  return err;
893 }
894 
896  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
897  const Analyzer::Expr* outer_col_expr,
898  const Analyzer::ColumnVar* inner_col) const {
899  ChunkKey chunk_key{executor_->getCatalog()->getCurrentDB().dbId,
900  inner_col->get_table_id(),
901  inner_col->get_column_id()};
902  const auto& ti = inner_col->get_type_info();
903  std::for_each(fragments.cbegin(), fragments.cend(), [&chunk_key](const auto& fragment) {
904  // collect all frag ids to correctly generated cache key for a cached hash table
905  chunk_key.push_back(fragment.fragmentId);
906  });
907  if (ti.is_string()) {
908  CHECK_EQ(kENCODING_DICT, ti.get_compression());
909  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
910  CHECK(outer_col);
911  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
912  size_t outer_elem_count =
913  std::accumulate(outer_query_info.fragments.begin(),
914  outer_query_info.fragments.end(),
915  size_t(0),
916  [&chunk_key](size_t sum, const auto& fragment) {
917  chunk_key.push_back(fragment.fragmentId);
918  return sum + fragment.getNumTuples();
919  });
920  chunk_key.push_back(outer_elem_count);
921  }
922 
923  return chunk_key;
924 }
925 
926 std::shared_ptr<PerfectHashTable> PerfectJoinHashTable::initHashTableOnCpuFromCache(
927  QueryPlanHash key,
928  CacheItemType item_type,
929  DeviceIdentifier device_identifier) {
931  auto timer = DEBUG_TIMER(__func__);
932  VLOG(1) << "Checking CPU hash table cache.";
933  auto hashtable_ptr =
934  hash_table_cache_->getItemFromCache(key, item_type, device_identifier);
935  if (hashtable_ptr) {
936  return std::dynamic_pointer_cast<PerfectHashTable>(hashtable_ptr);
937  }
938  return nullptr;
939 }
940 
942  QueryPlanHash key,
943  CacheItemType item_type,
944  std::shared_ptr<PerfectHashTable> hashtable_ptr,
945  DeviceIdentifier device_identifier,
946  size_t hashtable_building_time) {
948  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
949  hash_table_cache_->putItemToCache(
950  key,
951  hashtable_ptr,
952  item_type,
953  device_identifier,
954  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
955  hashtable_building_time);
956 }
957 
958 llvm::Value* PerfectJoinHashTable::codegenHashTableLoad(const size_t table_idx) {
959  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
960  const auto hash_ptr = HashJoin::codegenHashTableLoad(table_idx, executor_);
961  if (hash_ptr->getType()->isIntegerTy(64)) {
962  return hash_ptr;
963  }
964  CHECK(hash_ptr->getType()->isPointerTy());
965  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
966  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
967  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
968 }
969 
970 std::vector<llvm::Value*> PerfectJoinHashTable::getHashJoinArgs(
971  llvm::Value* hash_ptr,
972  llvm::Value* key_lv,
973  const Analyzer::Expr* key_col,
974  const int shard_count,
975  const CompilationOptions& co) {
976  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
977  CodeGenerator code_generator(executor_);
978  CHECK(key_lv);
979  // Todo(todd): Fix below, it's gross (but didn't want to redo the plumbing yet)
980  // const auto key_lv = key_lvs.size() && key_lvs[0]
981  // ? key_lvs[0]
982  // : code_generator.codegen(key_col, true, co)[0];
983  auto const& key_col_ti = key_col->get_type_info();
984 
985  std::vector<llvm::Value*> hash_join_idx_args{
986  hash_ptr,
987  executor_->cgen_state_->castToTypeIn(key_lv, 64),
988  executor_->cgen_state_->llInt(col_range_.getIntMin()),
989  executor_->cgen_state_->llInt(col_range_.getIntMax())};
990  if (shard_count) {
991  const auto expected_hash_entry_count =
993  const auto entry_count_per_shard =
994  (expected_hash_entry_count + shard_count - 1) / shard_count;
995  hash_join_idx_args.push_back(
996  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
997  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
998  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
999  }
1000  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1001  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1002  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1003  inline_fixed_encoding_null_val(key_col_logical_ti)));
1004  }
1005  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1006  if (isBitwiseEq()) {
1007  if (special_date_bucketization_case) {
1008  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1010  } else {
1011  hash_join_idx_args.push_back(
1012  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1013  }
1014  }
1015 
1016  if (special_date_bucketization_case) {
1017  hash_join_idx_args.emplace_back(
1018  executor_->cgen_state_->llInt(hash_entry_info_.bucket_normalization));
1019  }
1020 
1021  return hash_join_idx_args;
1022 }
1023 
1025  const size_t index) {
1026  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1027  const auto cols =
1028  get_cols(
1029  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_)
1030  .first;
1031  auto key_col = cols.second;
1032  CHECK(key_col);
1033  auto val_col = cols.first;
1034  CHECK(val_col);
1035  auto pos_ptr = codegenHashTableLoad(index);
1036  CHECK(pos_ptr);
1037  const int shard_count = shardCount();
1038  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1039  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1040  if (key_col_var && val_col_var &&
1042  key_col_var,
1043  val_col_var,
1044  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1045  throw std::runtime_error(
1046  "Query execution fails because the query contains not supported self-join "
1047  "pattern. We suspect the query requires multiple left-deep join tree due to "
1048  "the "
1049  "join condition of the self-join and is not supported for now. Please consider "
1050  "rewriting table order in "
1051  "FROM clause.");
1052  }
1053  CodeGenerator code_generator(executor_);
1054 
1055  auto key_lv = HashJoin::codegenColOrStringOper(
1056  key_col, inner_outer_string_op_infos_.second, code_generator, co);
1057 
1058  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_lv, key_col, shard_count, co);
1059  const int64_t sub_buff_size = getComponentBufferSize();
1060  const auto& key_col_ti = key_col->get_type_info();
1061 
1062  auto bucketize = (key_col_ti.get_type() == kDATE);
1063  return HashJoin::codegenMatchingSet(hash_join_idx_args,
1064  shard_count,
1065  !key_col_ti.get_notnull(),
1066  isBitwiseEq(),
1067  sub_buff_size,
1068  executor_,
1069  bucketize);
1070 }
1071 
1072 size_t PerfectJoinHashTable::offsetBufferOff() const noexcept {
1073  return 0;
1074 }
1075 
1076 size_t PerfectJoinHashTable::countBufferOff() const noexcept {
1077  return getComponentBufferSize();
1078 }
1079 
1081  return 2 * getComponentBufferSize();
1082 }
1083 
1085  if (hash_tables_for_device_.empty()) {
1086  return 0;
1087  }
1088  auto hash_table = hash_tables_for_device_.front();
1089  if (hash_table && hash_table->getLayout() == HashType::OneToMany) {
1090  return hash_table->getEntryCount() * sizeof(int32_t);
1091  } else {
1092  return 0;
1093  }
1094 }
1095 
1097  CHECK_LT(device_id, hash_tables_for_device_.size());
1098  return hash_tables_for_device_[device_id].get();
1099 }
1100 
1102  std::shared_ptr<PerfectHashTable>& cpu_hash_table,
1103  const int device_id,
1104  Data_Namespace::DataMgr* data_mgr) {
1106  CHECK(data_mgr);
1107  CHECK(cpu_hash_table);
1108 
1109  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1110  PerfectJoinHashTableBuilder gpu_builder;
1111  gpu_builder.allocateDeviceMemory(cpu_hash_table->getColumnNumElems(),
1112  cpu_hash_table->getLayout(),
1113  cpu_hash_table->getHashEntryInfo(),
1114  shardCount(),
1115  device_id,
1116  device_count_,
1117  executor_);
1118 
1119  std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.getHashTable();
1120  CHECK(gpu_hash_table);
1121  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1122  CHECK(gpu_buffer_ptr);
1123 
1124  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1125  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1126 
1127  auto device_allocator = std::make_unique<CudaAllocator>(
1128  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1129  device_allocator->copyToDevice(
1130  gpu_buffer_ptr,
1131  cpu_hash_table->getCpuBuffer(),
1132  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1133  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1134  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1135 }
1136 
1138  const int device_id,
1139  bool raw) const {
1140  auto buffer = getJoinHashBuffer(device_type, device_id);
1141  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1142  auto hash_table = getHashTableForDevice(device_id);
1143 #ifdef HAVE_CUDA
1144  std::unique_ptr<int8_t[]> buffer_copy;
1145  if (device_type == ExecutorDeviceType::GPU) {
1146  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1147 
1148  auto data_mgr = executor_->getDataMgr();
1149  auto device_allocator = std::make_unique<CudaAllocator>(
1150  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1151  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1152  }
1153  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1154 #else
1155  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1156 #endif // HAVE_CUDA
1157  auto ptr2 = ptr1 + offsetBufferOff();
1158  auto ptr3 = ptr1 + countBufferOff();
1159  auto ptr4 = ptr1 + payloadBufferOff();
1160  return HashTable::toString("perfect",
1162  0,
1163  0,
1164  hash_table ? hash_table->getEntryCount() : 0,
1165  ptr1,
1166  ptr2,
1167  ptr3,
1168  ptr4,
1169  buffer_size,
1170  raw);
1171 }
1172 
1173 std::set<DecodedJoinHashBufferEntry> PerfectJoinHashTable::toSet(
1174  const ExecutorDeviceType device_type,
1175  const int device_id) const {
1176  auto buffer = getJoinHashBuffer(device_type, device_id);
1177  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1178  auto hash_table = getHashTableForDevice(device_id);
1179 #ifdef HAVE_CUDA
1180  std::unique_ptr<int8_t[]> buffer_copy;
1181  if (device_type == ExecutorDeviceType::GPU) {
1182  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1183 
1184  auto data_mgr = executor_->getDataMgr();
1185  auto device_allocator = std::make_unique<CudaAllocator>(
1186  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1187  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1188  }
1189  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1190 #else
1191  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1192 #endif // HAVE_CUDA
1193  auto ptr2 = ptr1 + offsetBufferOff();
1194  auto ptr3 = ptr1 + countBufferOff();
1195  auto ptr4 = ptr1 + payloadBufferOff();
1196  return HashTable::toSet(0,
1197  0,
1198  hash_table ? hash_table->getEntryCount() : 0,
1199  ptr1,
1200  ptr2,
1201  ptr3,
1202  ptr4,
1203  buffer_size);
1204 }
1205 
1207  const size_t index) {
1208  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1209  using namespace std::string_literals;
1210 
1212  const auto cols_and_string_op_infos = get_cols(
1213  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1214  const auto& cols = cols_and_string_op_infos.first;
1215  const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
1216  auto key_col = cols.second;
1217  CHECK(key_col);
1218  auto val_col = cols.first;
1219  CHECK(val_col);
1220  CodeGenerator code_generator(executor_);
1221  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1222  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1223  if (key_col_var && val_col_var &&
1225  key_col_var,
1226  val_col_var,
1227  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1228  throw std::runtime_error(
1229  "Query execution failed because the query contains not supported self-join "
1230  "pattern. We suspect the query requires multiple left-deep join tree due to "
1231  "the join condition of the self-join and is not supported for now. Please "
1232  "consider chaning the table order in the FROM clause.");
1233  }
1234 
1235  auto key_lv = HashJoin::codegenColOrStringOper(
1236  key_col, inner_outer_string_op_infos.second, code_generator, co);
1237 
1238  // CHECK_EQ(size_t(1), key_lvs.size());
1239  auto hash_ptr = codegenHashTableLoad(index);
1240  CHECK(hash_ptr);
1241  const int shard_count = shardCount();
1242  const auto hash_join_idx_args =
1243  getHashJoinArgs(hash_ptr, key_lv, key_col, shard_count, co);
1244 
1245  const auto& key_col_ti = key_col->get_type_info();
1246  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1247  : "hash_join_idx"s);
1248 
1249  if (isBitwiseEq()) {
1250  fname += "_bitwise";
1251  }
1252  if (shard_count) {
1253  fname += "_sharded";
1254  }
1255 
1256  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1257  fname += "_nullable";
1258  }
1259  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1260 }
1261 
1263  const Analyzer::ColumnVar* inner_col) const {
1264  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1265 }
1266 
1268  const int inner_table_id,
1269  const std::vector<InputTableInfo>& query_infos) {
1270  std::optional<size_t> ti_idx;
1271  for (size_t i = 0; i < query_infos.size(); ++i) {
1272  if (inner_table_id == query_infos[i].table_id) {
1273  ti_idx = i;
1274  break;
1275  }
1276  }
1277  CHECK(ti_idx);
1278  return query_infos[*ti_idx];
1279 }
1280 
1281 size_t get_entries_per_device(const size_t total_entries,
1282  const size_t shard_count,
1283  const size_t device_count,
1284  const Data_Namespace::MemoryLevel memory_level) {
1285  const auto entries_per_shard =
1286  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1287  size_t entries_per_device = entries_per_shard;
1288  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1289  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1290  CHECK_GT(shards_per_device, 0u);
1291  entries_per_device = entries_per_shard * shards_per_device;
1292  }
1293  return entries_per_device;
1294 }
1295 
1299  : 0;
1300 }
1301 
1303  return qual_bin_oper_->get_optype() == kBW_EQ;
1304 }
int get_table_id() const
Definition: Analyzer.h:202
llvm::Value * codegenHashTableLoad(const size_t table_idx)
BucketizedHashEntryInfo hash_entry_info_
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:297
std::vector< int > ChunkKey
Definition: types.h:36
size_t DeviceIdentifier
Definition: DataRecycler.h:129
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:164
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
ExpressionRange rhs_source_col_range_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:69
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1033
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
ExecutorDeviceType
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
size_t num_elems
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
const TableIdToNodeMap table_id_to_node_map_
const Expr * get_right_operand() const
Definition: Analyzer.h:452
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:361
#define UNREACHABLE()
Definition: Logger.h:333
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:236
#define CHECK_GE(x, y)
Definition: Logger.h:302
HashTableBuildDagMap hashtable_build_dag_map_
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1206
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:412
size_t payloadBufferOff() const noexceptoverride
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:380
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:544
void freeHashBufferMemory()
Definition: HashJoin.h:321
#define CHECK_GT(x, y)
Definition: Logger.h:301
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
size_t max_join_hash_table_size
Definition: QueryHint.h:325
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
BucketizedHashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
void copyCpuHashTableToGpu(std::shared_ptr< PerfectHashTable > &cpu_hash_table, const int device_id, Data_Namespace::DataMgr *data_mgr)
HashtableCacheMetaInfo hashtable_cache_meta_info_
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
static std::unique_ptr< HashtableRecycler > hash_table_cache_
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner, const Catalog_Namespace::Catalog &catalog)
future< Result > async(Fn &&fn, Args &&...args)
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:802
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, BucketizedHashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:166
int64_t bucket_normalization
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:220
CacheItemType
Definition: DataRecycler.h:38
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadLocalIds)
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
bool isOneToOneHashPossible(const std::vector< ColumnsForDevice > &columns_per_device) const
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:298
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
bool hasNulls() const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:776
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:83
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, llvm::Value *key_lvs, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
Definition: HashJoin.cpp:413
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
#define VLOGGING(n)
Definition: Logger.h:287
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:299
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
Definition: sqltypes.h:68
const std::vector< InputTableInfo > & query_infos_
#define CHECK_LE(x, y)
Definition: Logger.h:300
int getInnerTableId() const noexceptoverride
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:164
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, int db_id, int inner_table_id)
Definition: DataRecycler.h:154
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:284
size_t getNormalizedHashEntryCount() const
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
ColumnCacheMap & column_cache_
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:531
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:348
RegisteredQueryHint query_hints_
size_t QueryPlanHash
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
const InnerOuterStringOpInfos inner_outer_string_op_infos_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
int64_t getIntMax() const
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, const ExpressionRange &rhs_source_col_range, const BucketizedHashEntryInfo hash_entry_info, ColumnCacheMap &column_cache, Executor *executor, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map, const InnerOuterStringOpInfos &inner_outer_string_op_infos={})
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
Definition: HashJoin.h:107
#define CHECK(condition)
Definition: Logger.h:289
#define DEBUG_TIMER(name)
Definition: Logger.h:407
Definition: sqldefs.h:30
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
Definition: Analyzer.h:451
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
int get_column_id() const
Definition: Analyzer.h:203
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::vector< QueryPlanHash > hashtable_cache_key_
ThreadId thread_id_
Definition: Logger.h:136
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1052
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashType
Definition: HashTable.h:19
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1042
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:873
const StringDictionaryProxy::IdMap * str_proxy_translation_map_
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
size_t bucketized_hash_entry_count
std::unordered_set< size_t > table_keys_
#define VLOG(n)
Definition: Logger.h:383
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const int table_id)
bool isBitwiseEq() const override