OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <optional>
23 #include <thread>
24 
25 #include "Logger/Logger.h"
28 #include "QueryEngine/Execute.h"
33 
34 extern bool g_is_test_env;
35 
36 // let's only consider CPU hahstable recycler at this moment
37 std::unique_ptr<HashtableRecycler> PerfectJoinHashTable::hash_table_cache_ =
38  std::make_unique<HashtableRecycler>(CacheItemType::PERFECT_HT,
40 std::unique_ptr<HashingSchemeRecycler> PerfectJoinHashTable::hash_table_layout_cache_ =
41  std::make_unique<HashingSchemeRecycler>();
42 
43 namespace {
44 std::pair<InnerOuter, InnerOuterStringOpInfos> get_cols(
45  const Analyzer::BinOper* qual_bin_oper,
46  const TemporaryTables* temporary_tables) {
47  const auto lhs = qual_bin_oper->get_left_operand();
48  const auto rhs = qual_bin_oper->get_right_operand();
49  return HashJoin::normalizeColumnPair(lhs, rhs, temporary_tables);
50 }
51 
53  ExpressionRange const& col_range,
54  bool const is_bw_eq) {
55  using EmptyRangeSize = std::optional<size_t>;
56  auto empty_range_check = [](ExpressionRange const& col_range,
57  bool const is_bw_eq) -> EmptyRangeSize {
58  if (col_range.getIntMin() > col_range.getIntMax()) {
59  CHECK_EQ(col_range.getIntMin(), int64_t(0));
60  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
61  if (is_bw_eq) {
62  return size_t(1);
63  }
64  return size_t(0);
65  }
66  return EmptyRangeSize{};
67  };
68 
69  auto empty_range = empty_range_check(col_range, is_bw_eq);
70  if (empty_range) {
71  return {size_t(*empty_range), 1};
72  }
73 
74  int64_t bucket_normalization =
75  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
76  CHECK_GT(bucket_normalization, 0);
77  auto const normalized_max = col_range.getIntMax() / bucket_normalization;
78  auto const normalized_min = col_range.getIntMin() / bucket_normalization;
79  return {size_t(normalized_max - normalized_min + 1 + (is_bw_eq ? 1 : 0)),
80  bucket_normalization};
81 }
82 
83 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
84  if (col_range.getIntMin() > col_range.getIntMax()) {
85  CHECK_EQ(col_range.getIntMin(), int64_t(0));
86  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
87  return is_bw_eq ? 1 : 0;
88  }
89  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
90 }
91 
92 } // namespace
93 
94 namespace {
95 
97  const Executor* executor) {
98  const auto inner_table_info = executor->getTableInfo(inner_table_key);
99  std::unordered_set<int> device_holding_fragments;
100  auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
101  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
102  for (const auto& fragment : inner_table_info.fragments) {
103  if (fragment.shard != -1) {
104  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
105  if (!it_ok.second) {
106  return false;
107  }
108  }
109  }
110  return true;
111 }
112 
113 } // namespace
114 
116  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
117  const Executor* executor) {
118  const auto inner_col = equi_pair.first;
119  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
120  if (!outer_col || inner_col->getColumnKey().table_id < 0 ||
121  outer_col->getColumnKey().table_id < 0) {
122  return 0;
123  }
124  if (outer_col->get_rte_idx()) {
125  return 0;
126  }
127  if (inner_col->get_type_info() != outer_col->get_type_info()) {
128  return 0;
129  }
130 
131  const auto inner_td =
132  Catalog_Namespace::get_metadata_for_table(inner_col->getTableKey());
133  CHECK(inner_td);
134  const auto outer_td =
135  Catalog_Namespace::get_metadata_for_table(outer_col->getTableKey());
136  CHECK(outer_td);
137  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
138  inner_td->nShards != outer_td->nShards) {
139  return 0;
140  }
141  if (!shard_count_less_or_equal_device_count(inner_col->getTableKey(), executor)) {
142  return 0;
143  }
144  // The two columns involved must be the ones on which the tables have been sharded on.
145  return (inner_td->shardedColumnId == inner_col->getColumnKey().column_id &&
146  outer_td->shardedColumnId == outer_col->getColumnKey().column_id) ||
147  (outer_td->shardedColumnId == inner_col->getColumnKey().column_id &&
148  inner_td->shardedColumnId == inner_col->getColumnKey().column_id)
149  ? inner_td->nShards
150  : 0;
151 }
152 
154  const shared::TableKey& inner_table_key,
155  const std::vector<InputTableInfo>& query_infos) {
156  std::optional<size_t> ti_idx;
157  for (size_t i = 0; i < query_infos.size(); ++i) {
158  if (inner_table_key == query_infos[i].table_key) {
159  ti_idx = i;
160  break;
161  }
162  }
163  CHECK(ti_idx);
164  return query_infos[*ti_idx];
165 }
166 
168 std::shared_ptr<PerfectJoinHashTable> PerfectJoinHashTable::getInstance(
169  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
170  const std::vector<InputTableInfo>& query_infos,
171  const Data_Namespace::MemoryLevel memory_level,
172  const JoinType join_type,
173  const HashType preferred_hash_type,
174  const int device_count,
175  ColumnCacheMap& column_cache,
176  Executor* executor,
177  const HashTableBuildDagMap& hashtable_build_dag_map,
178  const RegisteredQueryHint& query_hints,
179  const TableIdToNodeMap& table_id_to_node_map) {
180  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
181  const auto cols_and_string_op_infos =
182  get_cols(qual_bin_oper.get(), executor->temporary_tables_);
183  const auto& cols = cols_and_string_op_infos.first;
184  const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
185  const auto inner_col = cols.first;
186  CHECK(inner_col);
187  const auto& ti = inner_col->get_type_info();
188  auto col_range =
189  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
190  if (col_range.getType() == ExpressionRangeType::Invalid) {
191  throw HashJoinFail(
192  "Could not compute range for the expressions involved in the equijoin");
193  }
194  const auto rhs_source_col_range =
195  ti.is_string() ? getExpressionRange(inner_col, query_infos, executor) : col_range;
196  if (ti.is_string()) {
197  // The nullable info must be the same as the source column.
198  if (rhs_source_col_range.getType() == ExpressionRangeType::Invalid) {
199  throw HashJoinFail(
200  "Could not compute range for the expressions involved in the equijoin");
201  }
202  if (rhs_source_col_range.getIntMin() > rhs_source_col_range.getIntMax()) {
203  // If the inner column expression range is empty, use the inner col range
204  CHECK_EQ(rhs_source_col_range.getIntMin(), int64_t(0));
205  CHECK_EQ(rhs_source_col_range.getIntMax(), int64_t(-1));
206  col_range = rhs_source_col_range;
207  } else {
208  col_range = ExpressionRange::makeIntRange(
209  std::min(rhs_source_col_range.getIntMin(), col_range.getIntMin()),
210  std::max(rhs_source_col_range.getIntMax(), col_range.getIntMax()),
211  0,
212  rhs_source_col_range.hasNulls());
213  }
214  }
215  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
216  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
217  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
218  size_t const rowid_size = sizeof(int32_t);
219  auto const max_num_hash_entries =
220  HashJoin::getMaximumNumHashEntriesCanHold(memory_level, executor, rowid_size);
221  if (bucketized_entry_count > max_num_hash_entries) {
223  bucketized_entry_count, max_num_hash_entries, memory_level));
224  }
225 
226  auto const& inner_table_info =
227  get_inner_query_info(inner_col->getTableKey(), query_infos).info;
228  auto const num_inner_table_tuple = inner_table_info.getFragmentNumTuplesUpperBound();
229  // when a table is small but has too wide hash entry value range, it's better to deploy
230  // baseline hash join to save unnecessary memory space and expensive hash table
231  // initialization & building cost required to build a perfect join hash table
232  auto const deploy_baseline_join =
233  !g_is_test_env &&
234  num_inner_table_tuple < g_num_tuple_threshold_switch_to_baseline &&
236  bucketized_entry_count;
237  if (deploy_baseline_join) {
238  std::ostringstream oss;
239  oss << "Switch to baseline hash join: a join column has too wide hash value range "
240  "when comparing the actual # rows";
241  oss << "(# hash entries: " << bucketized_entry_count
242  << ", # rows: " << num_inner_table_tuple << ")";
243  throw TooManyHashEntries(oss.str());
244  }
245 
246  if (qual_bin_oper->get_optype() == kBW_EQ &&
247  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
248  throw HashJoinFail("Cannot translate null value for kBW_EQ");
249  }
250  decltype(std::chrono::steady_clock::now()) ts1, ts2;
251  if (VLOGGING(1)) {
252  ts1 = std::chrono::steady_clock::now();
253  }
254  auto hash_type = preferred_hash_type;
255  if (query_hints.force_one_to_many_hash_join) {
256  LOG(INFO) << "A user's query hint forced the join operation to use OneToMany hash "
257  "join layout";
258  hash_type = HashType::OneToMany;
259  }
260  auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
261  new PerfectJoinHashTable(qual_bin_oper,
262  inner_col,
263  query_infos,
264  memory_level,
265  join_type,
266  hash_type,
267  col_range,
268  rhs_source_col_range,
269  bucketized_entry_count_info,
270  column_cache,
271  executor,
272  device_count,
273  query_hints,
274  hashtable_build_dag_map,
275  table_id_to_node_map,
276  rowid_size,
277  inner_outer_string_op_infos));
278  try {
279  join_hash_table->reify();
280  } catch (const TableMustBeReplicated& e) {
281  // Throw a runtime error to abort the query
282  join_hash_table->freeHashBufferMemory();
283  throw std::runtime_error(e.what());
284  } catch (const HashJoinFail& e) {
285  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
286  // possible)
287  join_hash_table->freeHashBufferMemory();
288  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
289  "involved in equijoin | ") +
290  e.what());
291  } catch (const ColumnarConversionNotSupported& e) {
292  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
293  e.what());
294  } catch (const OutOfMemory& e) {
295  throw HashJoinFail(
296  std::string("Ran out of memory while building hash tables for equijoin | ") +
297  e.what());
298  } catch (const JoinHashTableTooBig& e) {
299  throw e;
300  } catch (const std::exception& e) {
301  throw std::runtime_error(
302  std::string("Fatal error while attempting to build hash tables for join: ") +
303  e.what());
304  }
305  if (VLOGGING(1)) {
306  ts2 = std::chrono::steady_clock::now();
307  VLOG(1) << "Built perfect hash table "
308  << getHashTypeString(join_hash_table->getHashType()) << " in "
309  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
310  << " ms";
311  }
312  return join_hash_table;
313 }
314 
316  const InnerOuter& inner_outer_col_pair,
317  const InnerOuterStringOpInfos& inner_outer_string_op_infos,
318  const Executor* executor) {
319  if (inner_outer_string_op_infos.first.size() ||
320  inner_outer_string_op_infos.second.size()) {
321  return true;
322  }
323  auto inner_col = inner_outer_col_pair.first;
324  auto outer_col_expr = inner_outer_col_pair.second;
325  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
326  const auto& inner_col_key = inner_col->getColumnKey();
327  const auto& inner_ti = get_column_type(inner_col_key.column_id,
328  inner_col_key.table_id,
329  inner_cd,
330  executor->getTemporaryTables());
331  // Only strings may need dictionary translation.
332  if (!inner_ti.is_string()) {
333  return false;
334  }
335  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
336  CHECK(outer_col);
337  const auto outer_cd = get_column_descriptor_maybe(outer_col->getColumnKey());
338  // Don't want to deal with temporary tables for now, require translation.
339  if (!inner_cd || !outer_cd) {
340  return true;
341  }
342  const auto& outer_col_key = outer_col->getColumnKey();
343  const auto& outer_ti = get_column_type(outer_col_key.column_id,
344  outer_col_key.table_id,
345  outer_cd,
346  executor->getTemporaryTables());
347  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
348  // If the two columns don't share the dictionary, translation is needed.
349  if (outer_ti.getStringDictKey() != inner_ti.getStringDictKey()) {
350  return true;
351  }
352  const auto inner_str_dict_proxy =
353  executor->getStringDictionaryProxy(inner_ti.getStringDictKey(), true);
354  CHECK(inner_str_dict_proxy);
355  const auto outer_str_dict_proxy =
356  executor->getStringDictionaryProxy(outer_ti.getStringDictKey(), true);
357  CHECK(outer_str_dict_proxy);
358 
359  return *inner_str_dict_proxy != *outer_str_dict_proxy;
360 }
361 
362 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
363  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
364  const int device_id,
365  const int device_count) {
366  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
367  for (const auto& fragment : fragments) {
368  CHECK_GE(fragment.shard, 0);
369  if (fragment.shard % device_count == device_id) {
370  shards_for_device.push_back(fragment);
371  }
372  }
373  return shards_for_device;
374 }
375 
377  const std::vector<ColumnsForDevice>& columns_per_device) const {
378  CHECK(!inner_outer_pairs_.empty());
379  const auto& rhs_col_ti = inner_outer_pairs_.front().first->get_type_info();
380  const auto max_unique_hash_input_entries =
382  rhs_col_ti, rhs_source_col_range_, qual_bin_oper_->get_optype() == kBW_EQ)
385  for (const auto& device_columns : columns_per_device) {
386  CHECK(!device_columns.join_columns.empty());
387  const auto rhs_join_col_num_entries = device_columns.join_columns.front().num_elems;
388  if (rhs_join_col_num_entries > max_unique_hash_input_entries) {
389  VLOG(1) << "Skipping attempt to build perfect hash one-to-one table as number of "
390  "rhs column entries ("
391  << rhs_join_col_num_entries << ") exceeds range for rhs join column ("
392  << max_unique_hash_input_entries << ").";
393  return false;
394  }
395  }
396  return true;
397 }
398 
400  auto timer = DEBUG_TIMER(__func__);
402  const auto cols = get_cols(qual_bin_oper_.get(), executor_->temporary_tables_).first;
403  const auto inner_col = cols.first;
405  inner_col->getTableKey(),
407  executor_);
408  const auto& query_info = getInnerQueryInfo(inner_col).info;
409  if (query_info.fragments.empty()) {
410  return;
411  }
412  if (query_info.getNumTuplesUpperBound() > HashJoin::MAX_NUM_HASH_ENTRIES) {
413  throw TooManyHashEntries();
414  }
415  std::vector<std::future<void>> init_threads;
416  const int shard_count = shardCount();
417 
418  inner_outer_pairs_.push_back(cols);
419  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
420  // Todo(todd): Clean up the fact that we store the inner outer column pairs as a vector,
421  // even though only one is ever valid for perfect hash layout. Either move to 1 or keep
422  // the vector but move it to the HashTable parent class
425 
426  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
427  std::vector<ColumnsForDevice> columns_per_device;
428  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
429 
430  auto data_mgr = executor_->getDataMgr();
431  // check the existence of cached hash table here before fetching columns
432  // if available, skip the rest of logic and copy it to GPU if necessary
433  // there are few considerable things:
434  // 1. if table is sharded? --> deploy per-device logic
435  // here, each device may load different set of fragments, so their cache keys are
436  // different accordingly
437  // 2. otherwise, each device has the same hash table built from "all" fragments
438  // and their cache keys are the same (but we stick to per-device cache key vector)
439  // here, for CPU, we consider its # device to be one
440  // for GPU, each device builds its own hash table, or we build a single hash table on
441  // CPU and then copy it to each device
442  // 3. if cache key is not available? --> use alternative cache key
443 
444  // retrieve fragment lists and chunk key per device
445  std::vector<ChunkKey> chunk_key_per_device;
446  auto outer_col =
447  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pairs_.front().second);
448  for (int device_id = 0; device_id < device_count_; ++device_id) {
449  fragments_per_device.emplace_back(
450  shard_count
451  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
452  : query_info.fragments);
454  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
455  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id)));
456  }
457  const auto chunk_key =
458  genChunkKey(fragments_per_device[device_id], outer_col, inner_col);
459  chunk_key_per_device.emplace_back(std::move(chunk_key));
460  }
461 
462  // try to extract cache key for hash table and its relevant info
463  auto hashtable_access_path_info =
466  qual_bin_oper_->get_optype(),
467  join_type_,
470  shard_count,
471  fragments_per_device,
472  executor_);
473  hashtable_cache_key_ = hashtable_access_path_info.hashed_query_plan_dag;
474  hashtable_cache_meta_info_ = hashtable_access_path_info.meta_info;
475  table_keys_ = hashtable_access_path_info.table_keys;
476 
477  if (table_keys_.empty()) {
478  // the actual chunks fetched per device can be different but they constitute the same
479  // table in the same db, so we can exploit this to create an alternative table key
480  const auto& inner_table_key = getInnerTableId();
481  table_keys_ =
482  DataRecyclerUtil::getAlternativeTableKeys(chunk_key_per_device, inner_table_key);
483  }
484  CHECK(!table_keys_.empty());
485 
486  if (HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_) &&
487  getInnerTableId().table_id > 0) {
488  // sometimes we cannot retrieve query plan dag, so try to recycler cache
489  // with the old-fashioned cache key if we deal with hashtable of non-temporary table
490  for (int device_id = 0; device_id < device_count_; ++device_id) {
491  const auto num_tuples = std::accumulate(
492  fragments_per_device[device_id].begin(),
493  fragments_per_device[device_id].end(),
494  size_t(0),
495  [](size_t sum, const auto& fragment) { return sum + fragment.getNumTuples(); });
497  inner_col,
498  outer_col ? outer_col : inner_col,
500  chunk_key_per_device[device_id],
501  num_tuples,
502  qual_bin_oper_->get_optype(),
503  join_type_};
504  hashtable_cache_key_[device_id] = getAlternativeCacheKey(cache_key);
505  }
506  }
507 
508  // register a mapping between cache key and its input table info for per-table cache
509  // invalidation if we have valid cache key for "all" devices (otherwise, we skip to use
510  // cached hash table for safety)
511  auto allow_hashtable_recycling =
513  needs_dict_translation_,
515  inner_col->getTableKey());
516  const bool invalid_cache_key =
517  HashtableRecycler::isInvalidHashTableCacheKey(hashtable_cache_key_);
518  if (!invalid_cache_key && allow_hashtable_recycling) {
519  if (!shard_count) {
520  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_.front(),
521  table_keys_);
522  } else {
523  std::for_each(hashtable_cache_key_.cbegin(),
524  hashtable_cache_key_.cend(),
525  [this](QueryPlanHash key) {
526  hash_table_cache_->addQueryPlanDagForTableKeys(key, table_keys_);
527  });
528  }
529  auto found_cached_one_to_many_layout = std::any_of(
530  hashtable_cache_key_.cbegin(),
531  hashtable_cache_key_.cend(),
532  [](QueryPlanHash cache_key) {
533  auto cached_hashtable_layout_type = hash_table_layout_cache_->getItemFromCache(
534  cache_key,
537  {});
538  return cached_hashtable_layout_type &&
539  *cached_hashtable_layout_type == HashType::OneToMany;
540  });
541  if (found_cached_one_to_many_layout) {
542  // we need to sync hash_type for all devices
544  }
545  }
546 
547  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
548 
549  // Assume we will need one-to-many if we have a string operation, as these tend
550  // to be cardinality-reducting operations, i.e. |S(t)| < |t|
551  // Todo(todd): Ostensibly only string ops on the rhs/inner expression cause rhs dups and
552  // so we may be too conservative here, but validate
553 
554  const bool has_string_ops = inner_outer_string_op_infos_.first.size() ||
555  inner_outer_string_op_infos_.second.size();
556 
557  // Also check if on the number of entries per column exceeds the rhs join hash table
558  // range, and skip trying to build a One-to-One hash table if so. There is a slight edge
559  // case where this can be overly pessimistic, and that is if the non-null values are all
560  // unique, but there are multiple null values, but we currently don't have the metadata
561  // to track null counts (only column nullability from the ddl and null existence from
562  // the encoded data), and this is probably too much of an edge case to worry about for
563  // now given the general performance benfits of skipping 1:1 if we are fairly confident
564  // it is doomed up front
565 
566  // Now check if on the number of entries per column exceeds the rhs join hash table
567  // range, and skip trying to build a One-to-One hash table if so
569  (has_string_ops || !isOneToOneHashPossible(columns_per_device))) {
571  }
572 
573  // todo (yoonmin) : support dictionary proxy cache for join including string op(s)
574  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
575  // construct string dictionary proxies if necessary
576  std::unique_lock<std::mutex> str_proxy_translation_lock(str_proxy_translation_mutex_);
577  if (needs_dict_translation_ && !str_proxy_translation_map_) {
578  CHECK_GE(inner_outer_pairs_.size(), 1UL);
579  auto const copied_col_range = col_range_;
583  col_range_,
584  executor_);
585  // update hash entry info if necessary
586  if (!(col_range_ == copied_col_range)) {
588  inner_col->get_type_info(), col_range_, isBitwiseEq());
589  }
590  }
591  }
592  bool has_invalid_cached_hash_table = false;
593  if (effective_memory_level == Data_Namespace::CPU_LEVEL &&
595  allow_hashtable_recycling, invalid_cache_key, join_type_)) {
596  // build a hash table on CPU, and we have a chance to recycle the cached one if
597  // available
598  for (int device_id = 0; device_id < device_count_; ++device_id) {
599  auto hash_table =
600  initHashTableOnCpuFromCache(hashtable_cache_key_[device_id],
603  if (hash_table) {
604  hash_tables_for_device_[device_id] = hash_table;
605  hash_type_ = hash_table->getLayout();
606  } else {
607  has_invalid_cached_hash_table = true;
608  break;
609  }
610  }
611 
612  if (has_invalid_cached_hash_table) {
613  hash_tables_for_device_.clear();
614  hash_tables_for_device_.resize(device_count_);
615  } else {
617 #ifdef HAVE_CUDA
618  for (int device_id = 0; device_id < device_count_; ++device_id) {
619  auto cpu_hash_table = std::dynamic_pointer_cast<PerfectHashTable>(
620  hash_tables_for_device_[device_id]);
621  copyCpuHashTableToGpu(cpu_hash_table,
622  cpu_hash_table->getHashTableEntryInfo(),
623  device_id,
624  data_mgr);
625  }
626 #else
627  UNREACHABLE();
628 #endif
629  }
630  return;
631  }
632  }
633 
634  // we have no cached hash table for this qual
635  // so, start building the hash table by fetching columns for devices
636  for (int device_id = 0; device_id < device_count_; ++device_id) {
637  columns_per_device.emplace_back(
638  fetchColumnsForDevice(fragments_per_device[device_id],
639  device_id,
641  ? dev_buff_owners[device_id].get()
642  : nullptr));
643  }
644 
645  try {
646  for (int device_id = 0; device_id < device_count_; ++device_id) {
647  const auto chunk_key = genChunkKey(fragments_per_device[device_id],
648  inner_outer_pairs_.front().second,
649  inner_outer_pairs_.front().first);
650  init_threads.push_back(std::async(std::launch::async,
652  this,
653  chunk_key,
654  columns_per_device[device_id],
655  hash_type_,
656  device_id,
658  }
659  for (auto& init_thread : init_threads) {
660  init_thread.wait();
661  }
662  for (auto& init_thread : init_threads) {
663  init_thread.get();
664  }
665  } catch (const NeedsOneToManyHash& e) {
666  VLOG(1) << "RHS/Inner hash join values detected to not be unique, falling back to "
667  "One-to-Many hash layout.";
670  init_threads.clear();
672  CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
673  }
674  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
675  for (int device_id = 0; device_id < device_count_; ++device_id) {
676  const auto chunk_key = genChunkKey(fragments_per_device[device_id],
677  inner_outer_pairs_.front().second,
678  inner_outer_pairs_.front().first);
679  init_threads.push_back(std::async(std::launch::async,
681  this,
682  chunk_key,
683  columns_per_device[device_id],
684  hash_type_,
685  device_id,
687  }
688  for (auto& init_thread : init_threads) {
689  init_thread.wait();
690  }
691  for (auto& init_thread : init_threads) {
692  init_thread.get();
693  }
694  }
695  for (int device_id = 0; device_id < device_count_; ++device_id) {
696  auto const cache_key = hashtable_cache_key_[device_id];
697  auto const hash_table_ptr = hash_tables_for_device_[device_id];
698  if (hash_table_ptr) {
699  hash_table_layout_cache_->putItemToCache(cache_key,
700  hash_table_ptr->getLayout(),
703  0,
704  0,
705  {});
706  }
707  }
708 }
709 
711  const std::vector<InnerOuter>& inner_outer_pairs) const {
713  inner_outer_pairs.front(), inner_outer_string_op_infos_, executor_)) {
716  }
717  return memory_level_;
718 }
719 
721  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
722  const int device_id,
723  DeviceAllocator* dev_buff_owner) {
724  std::vector<JoinColumn> join_columns;
725  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
726  std::vector<JoinColumnTypeInfo> join_column_types;
727  std::vector<JoinBucketInfo> join_bucket_info;
728  std::vector<std::shared_ptr<void>> malloc_owner;
729  const auto effective_memory_level =
731  for (const auto& inner_outer_pair : inner_outer_pairs_) {
732  const auto inner_col = inner_outer_pair.first;
733  const auto inner_cd = get_column_descriptor_maybe(inner_col->getColumnKey());
734  if (inner_cd && inner_cd->isVirtualCol) {
736  }
737  join_columns.emplace_back(fetchJoinColumn(inner_col,
738  fragments,
739  effective_memory_level,
740  device_id,
741  chunks_owner,
742  dev_buff_owner,
743  malloc_owner,
744  executor_,
745  &column_cache_));
746  const auto& ti = inner_col->get_type_info();
747  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
748  0,
749  0,
751  isBitwiseEq(),
752  0,
754  }
755  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
756 }
757 
759  const ChunkKey& chunk_key,
760  const ColumnsForDevice& columns_for_device,
761  const HashType layout,
762  const int device_id,
763  const logger::ThreadLocalIds parent_thread_local_ids) {
764  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
765  DEBUG_TIMER_NEW_THREAD(parent_thread_local_ids.thread_id_);
766  const auto effective_memory_level =
768 
769  CHECK_EQ(columns_for_device.join_columns.size(), size_t(1));
770  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
771  auto& join_column = columns_for_device.join_columns.front();
772  if (layout == HashType::OneToOne) {
773  const auto err = initHashTableForDevice(chunk_key,
774  join_column,
775  inner_outer_pairs_.front(),
776  layout,
777  effective_memory_level,
778  device_id);
779  if (err) {
780  throw NeedsOneToManyHash();
781  }
782  } else {
783  const auto err = initHashTableForDevice(chunk_key,
784  join_column,
785  inner_outer_pairs_.front(),
787  effective_memory_level,
788  device_id);
789  if (err) {
790  throw std::runtime_error("Unexpected error building one to many hash table: " +
791  std::to_string(err));
792  }
793  }
794 }
795 
797  const ChunkKey& chunk_key,
798  const JoinColumn& join_column,
799  const InnerOuter& cols,
800  const HashType layout,
801  const Data_Namespace::MemoryLevel effective_memory_level,
802  const int device_id) {
803  auto timer = DEBUG_TIMER(__func__);
804  const auto inner_col = cols.first;
805  CHECK(inner_col);
806 #ifndef HAVE_CUDA
807  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
808 #endif
809  int err{0};
810  const int32_t hash_join_invalid_val{-1};
811  auto hashtable_layout = layout;
812  auto allow_hashtable_recycling =
816  inner_col->getTableKey());
817  PerfectHashTableEntryInfo hash_table_entry_info(
819  join_column.num_elems,
820  rowid_size_,
821  hashtable_layout,
823  auto const hash_table_size = hash_table_entry_info.computeHashTableSize();
825  hash_table_size > query_hints_.max_join_hash_table_size) {
827  }
829  hash_table_size > executor_->maxGpuSlabSize()) {
830  throw JoinHashTableTooBig(hash_table_size, executor_->maxGpuSlabSize());
831  }
832  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
833  CHECK(!chunk_key.empty());
834  std::shared_ptr<PerfectHashTable> hash_table{nullptr};
835  decltype(std::chrono::steady_clock::now()) ts1, ts2;
836  ts1 = std::chrono::steady_clock::now();
837  {
838  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
840  if (hashtable_layout == HashType::OneToOne) {
841  builder.initOneToOneHashTableOnCpu(join_column,
842  col_range_,
843  isBitwiseEq(),
844  cols,
846  join_type_,
848  hash_table_entry_info,
849  hash_join_invalid_val,
850  executor_);
851  hash_table = builder.getHashTable();
852  } else {
853  builder.initOneToManyHashTableOnCpu(join_column,
854  col_range_,
855  isBitwiseEq(),
856  cols,
858  join_type_,
860  hash_table_entry_info,
861  hash_join_invalid_val,
862  executor_);
863  hash_table = builder.getHashTable();
864  }
865  ts2 = std::chrono::steady_clock::now();
866  auto build_time =
867  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
868  hash_table->setHashEntryInfo(hash_entry_info_);
869  hash_table->setColumnNumElems(join_column.num_elems);
870  if (allow_hashtable_recycling && hash_table &&
871  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU) > 0) {
874  hash_table,
876  build_time);
877  }
878  }
879  // Transfer the hash table on the GPU if we've only built it on CPU
880  // but the query runs on GPU (join on dictionary encoded columns).
882 #ifdef HAVE_CUDA
883  const auto& ti = inner_col->get_type_info();
884  CHECK(ti.is_string());
885  auto data_mgr = executor_->getDataMgr();
886  copyCpuHashTableToGpu(hash_table, hash_table_entry_info, device_id, data_mgr);
887 #else
888  UNREACHABLE();
889 #endif
890  } else {
891  CHECK(hash_table);
892  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
893  hash_tables_for_device_[device_id] = hash_table;
894  }
895  } else {
896 #ifdef HAVE_CUDA
898  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
900  hash_table_entry_info,
901  shardCount(),
902  device_id,
904  executor_);
905  builder.initHashTableOnGpu(chunk_key,
906  join_column,
907  col_range_,
908  isBitwiseEq(),
909  cols,
910  join_type_,
912  hash_table_entry_info,
913  shardCount(),
914  hash_join_invalid_val,
915  device_id,
917  executor_);
918  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
919  hash_tables_for_device_[device_id] = builder.getHashTable();
920 #else
921  UNREACHABLE();
922 #endif
923  }
924  return err;
925 }
926 
928  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
929  const Analyzer::Expr* outer_col_expr,
930  const Analyzer::ColumnVar* inner_col) const {
931  const auto& column_key = inner_col->getColumnKey();
932  ChunkKey chunk_key{column_key.db_id, column_key.table_id, column_key.column_id};
933  const auto& ti = inner_col->get_type_info();
934  std::for_each(fragments.cbegin(), fragments.cend(), [&chunk_key](const auto& fragment) {
935  // collect all frag ids to correctly generated cache key for a cached hash table
936  chunk_key.push_back(fragment.fragmentId);
937  });
938  if (ti.is_string()) {
939  CHECK_EQ(kENCODING_DICT, ti.get_compression());
940  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
941  CHECK(outer_col);
942  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
943  size_t outer_elem_count =
944  std::accumulate(outer_query_info.fragments.begin(),
945  outer_query_info.fragments.end(),
946  size_t(0),
947  [&chunk_key](size_t sum, const auto& fragment) {
948  chunk_key.push_back(fragment.fragmentId);
949  return sum + fragment.getNumTuples();
950  });
951  chunk_key.push_back(outer_elem_count);
952  }
953 
954  return chunk_key;
955 }
956 
957 std::shared_ptr<PerfectHashTable> PerfectJoinHashTable::initHashTableOnCpuFromCache(
958  QueryPlanHash key,
959  CacheItemType item_type,
960  DeviceIdentifier device_identifier) {
962  auto timer = DEBUG_TIMER(__func__);
963  VLOG(1) << "Checking CPU hash table cache.";
964  auto hashtable_ptr =
965  hash_table_cache_->getItemFromCache(key, item_type, device_identifier);
966  if (hashtable_ptr) {
967  return std::dynamic_pointer_cast<PerfectHashTable>(hashtable_ptr);
968  }
969  return nullptr;
970 }
971 
973  QueryPlanHash key,
974  CacheItemType item_type,
975  std::shared_ptr<PerfectHashTable> hashtable_ptr,
976  DeviceIdentifier device_identifier,
977  size_t hashtable_building_time) {
979  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
980  hash_table_cache_->putItemToCache(
981  key,
982  hashtable_ptr,
983  item_type,
984  device_identifier,
985  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
986  hashtable_building_time);
987 }
988 
989 llvm::Value* PerfectJoinHashTable::codegenHashTableLoad(const size_t table_idx) {
990  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
991  const auto hash_ptr = HashJoin::codegenHashTableLoad(table_idx, executor_);
992  if (hash_ptr->getType()->isIntegerTy(64)) {
993  return hash_ptr;
994  }
995  CHECK(hash_ptr->getType()->isPointerTy());
996  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
997  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
998  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
999 }
1000 
1001 std::vector<llvm::Value*> PerfectJoinHashTable::getHashJoinArgs(
1002  llvm::Value* hash_ptr,
1003  llvm::Value* key_lv,
1004  const Analyzer::Expr* key_col,
1005  const int shard_count,
1006  const CompilationOptions& co) {
1007  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1008  CodeGenerator code_generator(executor_);
1009  CHECK(key_lv);
1010  // Todo(todd): Fix below, it's gross (but didn't want to redo the plumbing yet)
1011  // const auto key_lv = key_lvs.size() && key_lvs[0]
1012  // ? key_lvs[0]
1013  // : code_generator.codegen(key_col, true, co)[0];
1014  auto const& key_col_ti = key_col->get_type_info();
1015 
1016  std::vector<llvm::Value*> hash_join_idx_args{
1017  hash_ptr,
1018  executor_->cgen_state_->castToTypeIn(key_lv, 64),
1019  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1020  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1021  if (shard_count) {
1022  const auto expected_hash_entry_count =
1024  const auto entry_count_per_shard =
1025  (expected_hash_entry_count + shard_count - 1) / shard_count;
1026  hash_join_idx_args.push_back(
1027  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1028  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1029  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1030  }
1031  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1032  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1033  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1034  inline_fixed_encoding_null_val(key_col_logical_ti)));
1035  }
1036  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1037  if (isBitwiseEq()) {
1038  if (special_date_bucketization_case) {
1039  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1041  } else {
1042  hash_join_idx_args.push_back(
1043  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1044  }
1045  }
1046 
1047  if (special_date_bucketization_case) {
1048  hash_join_idx_args.emplace_back(
1049  executor_->cgen_state_->llInt(hash_entry_info_.bucket_normalization));
1050  }
1051 
1052  return hash_join_idx_args;
1053 }
1054 
1056  const size_t index) {
1057  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1058  const auto cols = get_cols(qual_bin_oper_.get(), executor_->temporary_tables_).first;
1059  auto key_col = cols.second;
1060  CHECK(key_col);
1061  auto val_col = cols.first;
1062  CHECK(val_col);
1063  auto pos_ptr = codegenHashTableLoad(index);
1064  CHECK(pos_ptr);
1065  const int shard_count = shardCount();
1066  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1067  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1068  if (key_col_var && val_col_var &&
1070  key_col_var,
1071  val_col_var,
1072  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1073  throw std::runtime_error(
1074  "Query execution fails because the query contains not supported self-join "
1075  "pattern. We suspect the query requires multiple left-deep join tree due to "
1076  "the "
1077  "join condition of the self-join and is not supported for now. Please consider "
1078  "rewriting table order in "
1079  "FROM clause.");
1080  }
1081  CodeGenerator code_generator(executor_);
1082 
1083  auto key_lv = HashJoin::codegenColOrStringOper(
1084  key_col, inner_outer_string_op_infos_.second, code_generator, co);
1085 
1086  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_lv, key_col, shard_count, co);
1087  const int64_t sub_buff_size = getComponentBufferSize();
1088  const auto& key_col_ti = key_col->get_type_info();
1089 
1090  auto bucketize = (key_col_ti.get_type() == kDATE);
1091  return HashJoin::codegenMatchingSet(hash_join_idx_args,
1092  shard_count,
1093  !key_col_ti.get_notnull(),
1094  isBitwiseEq(),
1095  sub_buff_size,
1096  executor_,
1097  bucketize);
1098 }
1099 
1100 size_t PerfectJoinHashTable::offsetBufferOff() const noexcept {
1101  return 0;
1102 }
1103 
1104 size_t PerfectJoinHashTable::countBufferOff() const noexcept {
1105  return getComponentBufferSize();
1106 }
1107 
1109  return 2 * getComponentBufferSize();
1110 }
1111 
1113  if (hash_tables_for_device_.empty()) {
1114  return 0;
1115  }
1116  auto hash_table = hash_tables_for_device_.front();
1117  if (hash_table && hash_table->getLayout() == HashType::OneToMany) {
1118  return hash_table->getEntryCount() * sizeof(int32_t);
1119  } else {
1120  return 0;
1121  }
1122 }
1123 
1125  CHECK_LT(device_id, hash_tables_for_device_.size());
1126  return hash_tables_for_device_[device_id].get();
1127 }
1128 
1130  std::shared_ptr<PerfectHashTable>& cpu_hash_table,
1131  const PerfectHashTableEntryInfo hash_table_entry_info,
1132  const int device_id,
1133  Data_Namespace::DataMgr* data_mgr) {
1135  CHECK(data_mgr);
1136  CHECK(cpu_hash_table);
1137 
1138  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1139  PerfectJoinHashTableBuilder gpu_builder;
1141  hash_table_entry_info,
1142  shardCount(),
1143  device_id,
1144  device_count_,
1145  executor_);
1146 
1147  std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.getHashTable();
1148  CHECK(gpu_hash_table);
1149  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1150  if (gpu_buffer_ptr) {
1151  auto device_allocator = std::make_unique<CudaAllocator>(
1152  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1153  device_allocator->copyToDevice(
1154  gpu_buffer_ptr,
1155  cpu_hash_table->getCpuBuffer(),
1156  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
1157  }
1158  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
1159  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
1160 }
1161 
1163  const int device_id,
1164  bool raw) const {
1165  auto buffer = getJoinHashBuffer(device_type, device_id);
1166  if (!buffer) {
1167  return "EMPTY";
1168  }
1169  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1170  auto hash_table = getHashTableForDevice(device_id);
1171 #ifdef HAVE_CUDA
1172  std::unique_ptr<int8_t[]> buffer_copy;
1173  if (device_type == ExecutorDeviceType::GPU) {
1174  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1175 
1176  auto data_mgr = executor_->getDataMgr();
1177  auto device_allocator = std::make_unique<CudaAllocator>(
1178  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1179  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1180  }
1181  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1182 #else
1183  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1184 #endif // HAVE_CUDA
1185  auto ptr2 = ptr1 + offsetBufferOff();
1186  auto ptr3 = ptr1 + countBufferOff();
1187  auto ptr4 = ptr1 + payloadBufferOff();
1188  return HashTable::toString("perfect",
1190  0,
1191  0,
1192  hash_table ? hash_table->getEntryCount() : 0,
1193  ptr1,
1194  ptr2,
1195  ptr3,
1196  ptr4,
1197  buffer_size,
1198  raw);
1199 }
1200 
1201 std::set<DecodedJoinHashBufferEntry> PerfectJoinHashTable::toSet(
1202  const ExecutorDeviceType device_type,
1203  const int device_id) const {
1204  auto buffer = getJoinHashBuffer(device_type, device_id);
1205  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1206  auto hash_table = getHashTableForDevice(device_id);
1207 #ifdef HAVE_CUDA
1208  std::unique_ptr<int8_t[]> buffer_copy;
1209  if (device_type == ExecutorDeviceType::GPU) {
1210  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1211 
1212  auto data_mgr = executor_->getDataMgr();
1213  auto device_allocator = std::make_unique<CudaAllocator>(
1214  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1215  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1216  }
1217  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1218 #else
1219  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1220 #endif // HAVE_CUDA
1221  auto ptr2 = ptr1 + offsetBufferOff();
1222  auto ptr3 = ptr1 + countBufferOff();
1223  auto ptr4 = ptr1 + payloadBufferOff();
1224  return HashTable::toSet(0,
1225  0,
1226  hash_table ? hash_table->getEntryCount() : 0,
1227  ptr1,
1228  ptr2,
1229  ptr3,
1230  ptr4,
1231  buffer_size);
1232 }
1233 
1235  const size_t index) {
1236  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1237  using namespace std::string_literals;
1238 
1240  const auto cols_and_string_op_infos =
1241  get_cols(qual_bin_oper_.get(), executor_->temporary_tables_);
1242  const auto& cols = cols_and_string_op_infos.first;
1243  const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
1244  auto key_col = cols.second;
1245  CHECK(key_col);
1246  auto val_col = cols.first;
1247  CHECK(val_col);
1248  CodeGenerator code_generator(executor_);
1249  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1250  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1251  if (key_col_var && val_col_var &&
1253  key_col_var,
1254  val_col_var,
1255  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1256  throw std::runtime_error(
1257  "Query execution failed because the query contains not supported self-join "
1258  "pattern. We suspect the query requires multiple left-deep join tree due to "
1259  "the join condition of the self-join and is not supported for now. Please "
1260  "consider chaning the table order in the FROM clause.");
1261  }
1262 
1263  auto key_lv = HashJoin::codegenColOrStringOper(
1264  key_col, inner_outer_string_op_infos.second, code_generator, co);
1265 
1266  // CHECK_EQ(size_t(1), key_lvs.size());
1267  auto hash_ptr = codegenHashTableLoad(index);
1268  CHECK(hash_ptr);
1269  const int shard_count = shardCount();
1270  const auto hash_join_idx_args =
1271  getHashJoinArgs(hash_ptr, key_lv, key_col, shard_count, co);
1272 
1273  const auto& key_col_ti = key_col->get_type_info();
1274  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1275  : "hash_join_idx"s);
1276 
1277  if (isBitwiseEq()) {
1278  fname += "_bitwise";
1279  }
1280  if (shard_count) {
1281  fname += "_sharded";
1282  }
1283 
1284  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1285  fname += "_nullable";
1286  }
1287  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1288 }
1289 
1291  const Analyzer::ColumnVar* inner_col) const {
1292  return get_inner_query_info(inner_col->getTableKey(), query_infos_);
1293 }
1294 
1295 size_t get_entries_per_device(const size_t total_entries,
1296  const size_t shard_count,
1297  const size_t device_count,
1298  const Data_Namespace::MemoryLevel memory_level) {
1299  const auto entries_per_shard =
1300  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1301  size_t entries_per_device = entries_per_shard;
1302  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1303  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1304  CHECK_GT(shards_per_device, 0u);
1305  entries_per_device = entries_per_shard * shards_per_device;
1306  }
1307  return entries_per_device;
1308 }
1309 
1313  : 0;
1314 }
1315 
1317  return qual_bin_oper_->get_optype() == kBW_EQ;
1318 }
llvm::Value * codegenHashTableLoad(const size_t table_idx)
BucketizedHashEntryInfo hash_entry_info_
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
size_t DeviceIdentifier
Definition: DataRecycler.h:129
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
size_t g_num_tuple_threshold_switch_to_baseline
Definition: Execute.cpp:106
JoinType
Definition: sqldefs.h:174
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
ExpressionRange rhs_source_col_range_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:259
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:69
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1049
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
size_t num_elems
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, const ExpressionRange &rhs_source_col_range, const BucketizedHashEntryInfo hash_entry_info, ColumnCacheMap &column_cache, Executor *executor, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map, const size_t rowid_size, const InnerOuterStringOpInfos &inner_outer_string_op_infos={})
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
#define LOG(tag)
Definition: Logger.h:285
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:796
const TableIdToNodeMap table_id_to_node_map_
const Expr * get_right_operand() const
Definition: Analyzer.h:456
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:60
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:377
#define UNREACHABLE()
Definition: Logger.h:338
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
bool shard_count_less_or_equal_device_count(const shared::TableKey &inner_table_key, const Executor *executor)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:254
#define CHECK_GE(x, y)
Definition: Logger.h:306
HashTableBuildDagMap hashtable_build_dag_map_
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:417
size_t payloadBufferOff() const noexceptoverride
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:1075
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
void allocateDeviceMemory(BucketizedHashEntryInfo hash_entry_info, PerfectHashTableEntryInfo hash_table_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:564
void freeHashBufferMemory()
Definition: HashJoin.h:337
#define CHECK_GT(x, y)
Definition: Logger.h:305
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
size_t max_join_hash_table_size
Definition: QueryHint.h:358
HashType getHashType() const noexceptoverride
ExecutorDeviceType
std::string to_string(char const *&&v)
BucketizedHashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
HashtableCacheMetaInfo hashtable_cache_meta_info_
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
static std::unique_ptr< HashtableRecycler > hash_table_cache_
bool g_is_test_env
Definition: Execute.cpp:149
size_t g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline
Definition: Execute.cpp:107
future< Result > async(Fn &&fn, Args &&...args)
shared::TableKey getInnerTableId() const noexceptoverride
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
int64_t bucket_normalization
CacheItemType
Definition: DataRecycler.h:38
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
Definition: Execute.h:241
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadLocalIds)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static size_t getMaximumNumHashEntriesCanHold(MemoryLevel memory_level, const Executor *executor, size_t rowid_size) noexcept
Definition: HashJoin.cpp:1056
static std::string generateTooManyHashEntriesErrMsg(size_t num_entries, size_t threshold, MemoryLevel memory_level)
Definition: HashJoin.h:165
bool isOneToOneHashPossible(const std::vector< ColumnsForDevice > &columns_per_device) const
static constexpr size_t MAX_NUM_HASH_ENTRIES
Definition: HashJoin.h:136
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:314
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
Definition: DataRecycler.h:154
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
bool hasNulls() const
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, llvm::Value *key_lvs, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
Definition: HashJoin.cpp:423
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
#define VLOGGING(n)
Definition: Logger.h:289
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
#define CHECK_LT(x, y)
Definition: Logger.h:303
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
Definition: sqltypes.h:80
const std::vector< InputTableInfo > & query_infos_
const shared::ColumnKey & getColumnKey() const
Definition: Analyzer.h:198
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:179
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:300
size_t getNormalizedHashEntryCount() const
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
ColumnCacheMap & column_cache_
LocalIdsScopeGuard setNewThreadId() const
Definition: Logger.cpp:538
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
Definition: QueryHint.h:383
RegisteredQueryHint query_hints_
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
size_t QueryPlanHash
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
const InnerOuterStringOpInfos inner_outer_string_op_infos_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
Definition: HashJoin.h:107
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const TemporaryTables *temporary_tables, const bool is_bbox_intersect=false)
Definition: HashJoin.cpp:822
Definition: sqldefs.h:30
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
Definition: Analyzer.h:455
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
bool any_of(std::vector< Analyzer::Expr * > const &target_exprs)
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void copyCpuHashTableToGpu(std::shared_ptr< PerfectHashTable > &cpu_hash_table, const PerfectHashTableEntryInfo hash_table_entry_info, const int device_id, Data_Namespace::DataMgr *data_mgr)
std::vector< QueryPlanHash > hashtable_cache_key_
ThreadId thread_id_
Definition: Logger.h:138
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1084
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashType
Definition: HashTable.h:19
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:880
const StringDictionaryProxy::IdMap * str_proxy_translation_map_
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
std::unordered_set< size_t > table_keys_
#define VLOG(n)
Definition: Logger.h:388
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
bool force_one_to_many_hash_join
Definition: QueryHint.h:360
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
shared::TableKey getTableKey() const
Definition: Analyzer.h:199
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override