OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <thread>
23 
24 #include "Logger/Logger.h"
27 #include "QueryEngine/Execute.h"
32 
33 // let's only consider CPU hahstable recycler at this moment
34 std::unique_ptr<HashtableRecycler> PerfectJoinHashTable::hash_table_cache_ =
35  std::make_unique<HashtableRecycler>(CacheItemType::PERFECT_HT,
37 std::unique_ptr<HashingSchemeRecycler> PerfectJoinHashTable::hash_table_layout_cache_ =
38  std::make_unique<HashingSchemeRecycler>();
39 
40 namespace {
41 
42 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
44  const TemporaryTables* temporary_tables) {
45  const auto lhs = qual_bin_oper->get_left_operand();
46  const auto rhs = qual_bin_oper->get_right_operand();
47  return HashJoin::normalizeColumnPair(lhs, rhs, cat, temporary_tables);
48 }
49 
51  ExpressionRange const& col_range,
52  bool const is_bw_eq) {
53  using EmptyRangeSize = boost::optional<size_t>;
54  auto empty_range_check = [](ExpressionRange const& col_range,
55  bool const is_bw_eq) -> EmptyRangeSize {
56  if (col_range.getIntMin() > col_range.getIntMax()) {
57  CHECK_EQ(col_range.getIntMin(), int64_t(0));
58  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
59  if (is_bw_eq) {
60  return size_t(1);
61  }
62  return size_t(0);
63  }
64  return EmptyRangeSize{};
65  };
66 
67  auto empty_range = empty_range_check(col_range, is_bw_eq);
68  if (empty_range) {
69  return {size_t(*empty_range), 1};
70  }
71 
72  int64_t bucket_normalization =
73  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
74  CHECK_GT(bucket_normalization, 0);
75  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
76  bucket_normalization};
77 }
78 
79 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
80  if (col_range.getIntMin() > col_range.getIntMax()) {
81  CHECK_EQ(col_range.getIntMin(), int64_t(0));
82  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
83  return is_bw_eq ? 1 : 0;
84  }
85  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
86 }
87 
88 } // namespace
89 
90 namespace {
91 
92 bool shard_count_less_or_equal_device_count(const int inner_table_id,
93  const Executor* executor) {
94  const auto inner_table_info = executor->getTableInfo(inner_table_id);
95  std::unordered_set<int> device_holding_fragments;
96  auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
97  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
98  for (const auto& fragment : inner_table_info.fragments) {
99  if (fragment.shard != -1) {
100  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
101  if (!it_ok.second) {
102  return false;
103  }
104  }
105  }
106  return true;
107 }
108 
109 } // namespace
110 
112  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
113  const Executor* executor) {
114  const auto inner_col = equi_pair.first;
115  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
116  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
117  return 0;
118  }
119  if (outer_col->get_rte_idx()) {
120  return 0;
121  }
122  if (inner_col->get_type_info() != outer_col->get_type_info()) {
123  return 0;
124  }
125  const auto catalog = executor->getCatalog();
126  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
127  CHECK(inner_td);
128  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
129  CHECK(outer_td);
130  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
131  inner_td->nShards != outer_td->nShards) {
132  return 0;
133  }
134  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
135  return 0;
136  }
137  // The two columns involved must be the ones on which the tables have been sharded on.
138  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
139  outer_td->shardedColumnId == outer_col->get_column_id()) ||
140  (outer_td->shardedColumnId == inner_col->get_column_id() &&
141  inner_td->shardedColumnId == inner_col->get_column_id())
142  ? inner_td->nShards
143  : 0;
144 }
145 
147 std::shared_ptr<PerfectJoinHashTable> PerfectJoinHashTable::getInstance(
148  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
149  const std::vector<InputTableInfo>& query_infos,
150  const Data_Namespace::MemoryLevel memory_level,
151  const JoinType join_type,
152  const HashType preferred_hash_type,
153  const int device_count,
154  ColumnCacheMap& column_cache,
155  Executor* executor,
156  const HashTableBuildDagMap& hashtable_build_dag_map,
157  const TableIdToNodeMap& table_id_to_node_map) {
158  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
159  const auto cols =
160  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
161  const auto inner_col = cols.first;
162  CHECK(inner_col);
163  const auto& ti = inner_col->get_type_info();
164  auto col_range =
165  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
166  if (col_range.getType() == ExpressionRangeType::Invalid) {
167  throw HashJoinFail(
168  "Could not compute range for the expressions involved in the equijoin");
169  }
170  if (ti.is_string()) {
171  // The nullable info must be the same as the source column.
172  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
173  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
174  throw HashJoinFail(
175  "Could not compute range for the expressions involved in the equijoin");
176  }
177  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
178  // If the inner column expression range is empty, use the inner col range
179  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
180  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
181  col_range = source_col_range;
182  } else {
183  col_range = ExpressionRange::makeIntRange(
184  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
185  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
186  0,
187  source_col_range.hasNulls());
188  }
189  }
190  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
191  const auto max_hash_entry_count =
193  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
194  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
195 
196  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
197  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
198  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
199 
200  if (bucketized_entry_count > max_hash_entry_count) {
201  throw TooManyHashEntries();
202  }
203 
204  if (qual_bin_oper->get_optype() == kBW_EQ &&
205  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
206  throw HashJoinFail("Cannot translate null value for kBW_EQ");
207  }
208  std::vector<InnerOuter> inner_outer_pairs;
209  inner_outer_pairs.emplace_back(inner_col, cols.second);
210  auto hashtable_access_path_info =
212  qual_bin_oper->get_optype(),
213  join_type,
214  hashtable_build_dag_map,
215  executor);
216  decltype(std::chrono::steady_clock::now()) ts1, ts2;
217  if (VLOGGING(1)) {
218  ts1 = std::chrono::steady_clock::now();
219  }
220  auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
221  new PerfectJoinHashTable(qual_bin_oper,
222  inner_col,
223  query_infos,
224  memory_level,
225  join_type,
226  preferred_hash_type,
227  col_range,
228  column_cache,
229  executor,
230  device_count,
231  hashtable_access_path_info,
232  table_id_to_node_map));
233  try {
234  join_hash_table->reify();
235  } catch (const TableMustBeReplicated& e) {
236  // Throw a runtime error to abort the query
237  join_hash_table->freeHashBufferMemory();
238  throw std::runtime_error(e.what());
239  } catch (const HashJoinFail& e) {
240  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
241  // possible)
242  join_hash_table->freeHashBufferMemory();
243  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
244  "involved in equijoin | ") +
245  e.what());
246  } catch (const ColumnarConversionNotSupported& e) {
247  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
248  e.what());
249  } catch (const OutOfMemory& e) {
250  throw HashJoinFail(
251  std::string("Ran out of memory while building hash tables for equijoin | ") +
252  e.what());
253  } catch (const std::exception& e) {
254  throw std::runtime_error(
255  std::string("Fatal error while attempting to build hash tables for join: ") +
256  e.what());
257  }
258  if (VLOGGING(1)) {
259  ts2 = std::chrono::steady_clock::now();
260  VLOG(1) << "Built perfect hash table "
261  << getHashTypeString(join_hash_table->getHashType()) << " in "
262  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
263  << " ms";
264  }
265  return join_hash_table;
266 }
267 
269  const Analyzer::Expr* outer_col_expr,
270  const Executor* executor) {
271  const auto catalog = executor->getCatalog();
272  CHECK(catalog);
273  const auto inner_cd = get_column_descriptor_maybe(
274  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
275  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
276  inner_col->get_table_id(),
277  inner_cd,
278  executor->getTemporaryTables());
279  // Only strings may need dictionary translation.
280  if (!inner_ti.is_string()) {
281  return false;
282  }
283  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
284  CHECK(outer_col);
285  const auto outer_cd = get_column_descriptor_maybe(
286  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
287  // Don't want to deal with temporary tables for now, require translation.
288  if (!inner_cd || !outer_cd) {
289  return true;
290  }
291  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
292  outer_col->get_table_id(),
293  outer_cd,
294  executor->getTemporaryTables());
295  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
296  // If the two columns don't share the dictionary, translation is needed.
297  if (outer_ti.get_comp_param() != inner_ti.get_comp_param()) {
298  return true;
299  }
300  const auto inner_str_dict_proxy =
301  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
302  CHECK(inner_str_dict_proxy);
303  const auto outer_str_dict_proxy =
304  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
305  CHECK(outer_str_dict_proxy);
306 
307  return *inner_str_dict_proxy != *outer_str_dict_proxy;
308 }
309 
310 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
311  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
312  const int device_id,
313  const int device_count) {
314  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
315  for (const auto& fragment : fragments) {
316  CHECK_GE(fragment.shard, 0);
317  if (fragment.shard % device_count == device_id) {
318  shards_for_device.push_back(fragment);
319  }
320  }
321  return shards_for_device;
322 }
323 
325  auto timer = DEBUG_TIMER(__func__);
327  auto catalog = const_cast<Catalog_Namespace::Catalog*>(executor_->getCatalog());
328  const auto cols =
329  get_cols(qual_bin_oper_.get(), *catalog, executor_->temporary_tables_);
330  const auto inner_col = cols.first;
332  inner_col->get_table_id(),
334  executor_);
335  const auto& query_info = getInnerQueryInfo(inner_col).info;
336  if (query_info.fragments.empty()) {
337  return;
338  }
339  if (query_info.getNumTuplesUpperBound() >
340  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
341  throw TooManyHashEntries();
342  }
343  std::vector<std::future<void>> init_threads;
344  const int shard_count = shardCount();
345 
346  inner_outer_pairs_.push_back(cols);
347  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
348 
349  std::vector<ColumnsForDevice> columns_per_device;
350  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
351 
352  try {
353  auto data_mgr = executor_->getDataMgr();
355  for (int device_id = 0; device_id < device_count_; ++device_id) {
356  dev_buff_owners.emplace_back(
357  std::make_unique<CudaAllocator>(data_mgr, device_id));
358  }
359  }
360  for (int device_id = 0; device_id < device_count_; ++device_id) {
361  const auto fragments =
362  shard_count
363  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
364  : query_info.fragments;
365  const auto columns_for_device =
366  fetchColumnsForDevice(fragments,
367  device_id,
369  ? dev_buff_owners[device_id].get()
370  : nullptr,
371  *catalog);
372  columns_per_device.push_back(columns_for_device);
373  const auto chunk_key = genChunkKey(
374  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
375  auto table_keys = table_keys_;
376  if (device_id == 0 && hashtable_cache_key_ == EMPTY_HASHED_PLAN_DAG_KEY &&
377  getInnerTableId() > 0) {
378  // sometimes we cannot retrieve query plan dag, so try to recycler cache
379  // with the old-fashioned cache key if we deal with hashtable of non-temporary
380  // table
381  auto outer_col =
382  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pairs_.front().second);
384  col_range_,
385  inner_col,
386  outer_col ? outer_col : inner_col,
387  chunk_key,
388  columns_per_device[device_id].join_columns.front().num_elems,
389  qual_bin_oper_->get_optype(),
390  join_type_};
392  std::vector<int> alternative_table_key{chunk_key[0], chunk_key[1]};
393  CHECK(!alternative_table_key.empty());
394  table_keys = std::unordered_set<size_t>{boost::hash_value(alternative_table_key)};
395  }
396  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_, table_keys);
397 
398  init_threads.push_back(std::async(std::launch::async,
400  this,
401  chunk_key,
402  columns_per_device[device_id],
403  hash_type_,
404  device_id,
405  logger::thread_id()));
406  }
407  for (auto& init_thread : init_threads) {
408  init_thread.wait();
409  }
410  for (auto& init_thread : init_threads) {
411  init_thread.get();
412  }
413  } catch (const NeedsOneToManyHash& e) {
416  init_threads.clear();
418  CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
419  }
420  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
421  for (int device_id = 0; device_id < device_count_; ++device_id) {
422  const auto fragments =
423  shard_count
424  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
425  : query_info.fragments;
426  const auto chunk_key = genChunkKey(
427  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
428  init_threads.push_back(std::async(std::launch::async,
430  this,
431  chunk_key,
432  columns_per_device[device_id],
433  hash_type_,
434  device_id,
435  logger::thread_id()));
436  }
437  for (auto& init_thread : init_threads) {
438  init_thread.wait();
439  }
440  for (auto& init_thread : init_threads) {
441  init_thread.get();
442  }
443  }
444 }
445 
447  const std::vector<InnerOuter>& inner_outer_pairs) const {
448  for (const auto& inner_outer_pair : inner_outer_pairs) {
450  inner_outer_pair.first, inner_outer_pair.second, executor_)) {
453  }
454  }
455  return memory_level_;
456 }
457 
459  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
460  const int device_id,
461  DeviceAllocator* dev_buff_owner,
462  const Catalog_Namespace::Catalog& catalog) {
463  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
464  std::vector<JoinColumn> join_columns;
465  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
466  std::vector<JoinColumnTypeInfo> join_column_types;
467  std::vector<JoinBucketInfo> join_bucket_info;
468  std::vector<std::shared_ptr<void>> malloc_owner;
469  for (const auto& inner_outer_pair : inner_outer_pairs_) {
470  const auto inner_col = inner_outer_pair.first;
471  const auto inner_cd = get_column_descriptor_maybe(
472  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
473  if (inner_cd && inner_cd->isVirtualCol) {
475  }
476  join_columns.emplace_back(fetchJoinColumn(inner_col,
477  fragments,
478  effective_memory_level,
479  device_id,
480  chunks_owner,
481  dev_buff_owner,
482  malloc_owner,
483  executor_,
484  &column_cache_));
485  const auto& ti = inner_col->get_type_info();
486  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
487  0,
488  0,
490  isBitwiseEq(),
491  0,
493  }
494  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
495 }
496 
498  const ColumnsForDevice& columns_for_device,
499  const HashType layout,
500  const int device_id,
501  const logger::ThreadId parent_thread_id) {
502  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
503  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
504 
505  CHECK_EQ(columns_for_device.join_columns.size(), size_t(1));
506  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
507  auto& join_column = columns_for_device.join_columns.front();
508  if (layout == HashType::OneToOne) {
509  const auto err = initHashTableForDevice(chunk_key,
510  join_column,
511  inner_outer_pairs_.front(),
512  layout,
513  effective_memory_level,
514  device_id);
515  if (err) {
516  throw NeedsOneToManyHash();
517  }
518  } else {
519  const auto err = initHashTableForDevice(chunk_key,
520  join_column,
521  inner_outer_pairs_.front(),
523  effective_memory_level,
524  device_id);
525  if (err) {
526  throw std::runtime_error("Unexpected error building one to many hash table: " +
527  std::to_string(err));
528  }
529  }
530 }
531 
533  const ChunkKey& chunk_key,
534  const JoinColumn& join_column,
535  const InnerOuter& cols,
536  const HashType layout,
537  const Data_Namespace::MemoryLevel effective_memory_level,
538  const int device_id) {
539  auto timer = DEBUG_TIMER(__func__);
540  const auto inner_col = cols.first;
541  CHECK(inner_col);
542 
543  auto hash_entry_info = get_bucketized_hash_entry_info(
544  inner_col->get_type_info(), col_range_, isBitwiseEq());
545  if (!hash_entry_info && layout == HashType::OneToOne) {
546  // TODO: what is this for?
547  return 0;
548  }
549 
550 #ifndef HAVE_CUDA
551  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
552 #endif
553  int err{0};
554  const int32_t hash_join_invalid_val{-1};
555  auto hashtable_layout = layout;
556  auto allow_hashtable_recycling = HashtableRecycler::isSafeToCacheHashtable(
557  table_id_to_node_map_, needs_dict_translation_, inner_col->get_table_id());
558  if (allow_hashtable_recycling) {
559  auto cached_hashtable_layout_type = hash_table_layout_cache_->getItemFromCache(
563  {});
564  if (cached_hashtable_layout_type) {
565  hash_type_ = *cached_hashtable_layout_type;
566  hashtable_layout = hash_type_;
567  }
568  }
569  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
570  CHECK(!chunk_key.empty());
571  std::shared_ptr<PerfectHashTable> hash_table{nullptr};
572  if (allow_hashtable_recycling) {
576  }
577  if (!hash_table) {
578  std::unique_lock<std::mutex> str_proxy_translation_lock(
580  // It's not ideal to populate the str dict proxy translation map at the per device
581  // init func, but currently with the hash table cache lookup (above) at this level,
582  // if we do the translation in PerfectJoinHashTable::reify, we don't know if the
583  // hash table is cached and so needlessly compute a potentially expensive proxy
584  // translation even if we have the hash table already cached. Todo(todd/yoonmin):
585  // Hoist cache lookup to PerfectJoinHashTable::reify and then move this proxy
586  // translation to that level as well, conditioned on the hash table not being
587  // cached.
589  CHECK_GE(inner_outer_pairs_.size(), 1UL);
591  inner_outer_pairs_.front(), executor_);
592  CHECK(str_proxy_translation_map_);
593  }
594  }
595  decltype(std::chrono::steady_clock::now()) ts1, ts2;
596  ts1 = std::chrono::steady_clock::now();
597  {
598  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
599  if (!hash_table) {
600  // Try to get hash table from cache again, since if we are building
601  // for multiple devices, all devices except the first to take
602  // cpu_hash_table_buff_lock should find their hash table cached now
603  // from the first device to run
607  if (!hash_table) {
609  if (hashtable_layout == HashType::OneToOne) {
610  builder.initOneToOneHashTableOnCpu(join_column,
611  col_range_,
612  isBitwiseEq(),
613  cols,
615  join_type_,
616  hashtable_layout,
617  hash_entry_info,
618  hash_join_invalid_val,
619  executor_);
620  hash_table = builder.getHashTable();
621  } else {
622  builder.initOneToManyHashTableOnCpu(join_column,
623  col_range_,
624  isBitwiseEq(),
625  cols,
627  hash_entry_info,
628  hash_join_invalid_val,
629  executor_);
630  hash_table = builder.getHashTable();
631  }
632  ts2 = std::chrono::steady_clock::now();
633  auto build_time =
634  std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
635  if (allow_hashtable_recycling && hash_table) {
636  // add ht-related items to cache iff we have a valid hashtable
637  hash_table_layout_cache_->putItemToCache(
639  hashtable_layout,
642  0,
643  0,
644  {});
647  hash_table,
649  build_time);
650  }
651  }
652  }
653  }
654  // Transfer the hash table on the GPU if we've only built it on CPU
655  // but the query runs on GPU (join on dictionary encoded columns).
657 #ifdef HAVE_CUDA
658  const auto& ti = inner_col->get_type_info();
659  CHECK(ti.is_string());
660  auto data_mgr = executor_->getDataMgr();
661  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
662 
663  PerfectJoinHashTableBuilder gpu_builder;
664  gpu_builder.allocateDeviceMemory(join_column,
665  hash_table->getLayout(),
666  hash_entry_info,
667  shardCount(),
668  device_id,
670  executor_);
671  std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.getHashTable();
672  CHECK(gpu_hash_table);
673  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
674  CHECK(gpu_buffer_ptr);
675 
676  CHECK(hash_table);
677  // GPU size returns reserved size
678  CHECK_LE(hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
679  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
680 
681  auto device_allocator = data_mgr->createGpuAllocator(device_id);
682  device_allocator->copyToDevice(
683  gpu_buffer_ptr,
684  hash_table->getCpuBuffer(),
685  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU));
686  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
687  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
688 #else
689  UNREACHABLE();
690 #endif
691  } else {
692  CHECK(hash_table);
693  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
694  hash_tables_for_device_[device_id] = hash_table;
695  }
696  } else {
697 #ifdef HAVE_CUDA
699  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
700  builder.allocateDeviceMemory(join_column,
701  hashtable_layout,
702  hash_entry_info,
703  shardCount(),
704  device_id,
706  executor_);
707  builder.initHashTableOnGpu(chunk_key,
708  join_column,
709  col_range_,
710  isBitwiseEq(),
711  cols,
712  join_type_,
713  hashtable_layout,
714  hash_entry_info,
715  shardCount(),
716  hash_join_invalid_val,
717  device_id,
719  executor_);
720  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
721  hash_tables_for_device_[device_id] = builder.getHashTable();
722  if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
723  // add layout to cache iff we have a valid hashtable
724  hash_table_layout_cache_->putItemToCache(
726  hash_tables_for_device_[device_id]->getLayout(),
729  0,
730  0,
731  {});
732  }
733 #else
734  UNREACHABLE();
735 #endif
736  }
737  return err;
738 }
739 
741  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
742  const Analyzer::Expr* outer_col_expr,
743  const Analyzer::ColumnVar* inner_col) const {
744  ChunkKey chunk_key{executor_->getCatalog()->getCurrentDB().dbId,
745  inner_col->get_table_id(),
746  inner_col->get_column_id()};
747  const auto& ti = inner_col->get_type_info();
748  if (ti.is_string()) {
749  CHECK_EQ(kENCODING_DICT, ti.get_compression());
750  size_t outer_elem_count = 0;
751  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
752  CHECK(outer_col);
753  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
754  for (auto& frag : outer_query_info.fragments) {
755  outer_elem_count = frag.getNumTuples();
756  }
757  chunk_key.push_back(outer_elem_count);
758  }
759  if (fragments.size() < 2) {
760  chunk_key.push_back(fragments.front().fragmentId);
761  }
762  return chunk_key;
763 }
764 
765 std::shared_ptr<PerfectHashTable> PerfectJoinHashTable::initHashTableOnCpuFromCache(
766  QueryPlanHash key,
767  CacheItemType item_type,
768  DeviceIdentifier device_identifier) {
770  auto timer = DEBUG_TIMER(__func__);
771  VLOG(1) << "Checking CPU hash table cache.";
772  auto hashtable_ptr =
773  hash_table_cache_->getItemFromCache(key, item_type, device_identifier);
774  if (hashtable_ptr) {
775  return std::dynamic_pointer_cast<PerfectHashTable>(hashtable_ptr);
776  }
777  return nullptr;
778 }
779 
781  QueryPlanHash key,
782  CacheItemType item_type,
783  std::shared_ptr<PerfectHashTable> hashtable_ptr,
784  DeviceIdentifier device_identifier,
785  size_t hashtable_building_time) {
787  CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
788  hash_table_cache_->putItemToCache(
789  key,
790  hashtable_ptr,
791  item_type,
792  device_identifier,
793  hashtable_ptr->getHashTableBufferSize(ExecutorDeviceType::CPU),
794  hashtable_building_time);
795 }
796 
797 llvm::Value* PerfectJoinHashTable::codegenHashTableLoad(const size_t table_idx) {
798  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
799  const auto hash_ptr = HashJoin::codegenHashTableLoad(table_idx, executor_);
800  if (hash_ptr->getType()->isIntegerTy(64)) {
801  return hash_ptr;
802  }
803  CHECK(hash_ptr->getType()->isPointerTy());
804  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
805  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
806  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
807 }
808 
809 std::vector<llvm::Value*> PerfectJoinHashTable::getHashJoinArgs(
810  llvm::Value* hash_ptr,
811  const Analyzer::Expr* key_col,
812  const int shard_count,
813  const CompilationOptions& co) {
814  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
815  CodeGenerator code_generator(executor_);
816  const auto key_lvs = code_generator.codegen(key_col, true, co);
817  CHECK_EQ(size_t(1), key_lvs.size());
818  auto const& key_col_ti = key_col->get_type_info();
819  auto hash_entry_info =
821 
822  std::vector<llvm::Value*> hash_join_idx_args{
823  hash_ptr,
824  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
825  executor_->cgen_state_->llInt(col_range_.getIntMin()),
826  executor_->cgen_state_->llInt(col_range_.getIntMax())};
827  if (shard_count) {
828  const auto expected_hash_entry_count =
830  const auto entry_count_per_shard =
831  (expected_hash_entry_count + shard_count - 1) / shard_count;
832  hash_join_idx_args.push_back(
833  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
834  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
835  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
836  }
837  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
838  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
839  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
840  inline_fixed_encoding_null_val(key_col_logical_ti)));
841  }
842  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
843  if (isBitwiseEq()) {
844  if (special_date_bucketization_case) {
845  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
846  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
847  } else {
848  hash_join_idx_args.push_back(
849  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
850  }
851  }
852 
853  if (special_date_bucketization_case) {
854  hash_join_idx_args.emplace_back(
855  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
856  }
857 
858  return hash_join_idx_args;
859 }
860 
862  const size_t index) {
863  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
864  const auto cols = get_cols(
865  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
866  auto key_col = cols.second;
867  CHECK(key_col);
868  auto val_col = cols.first;
869  CHECK(val_col);
870  auto pos_ptr = codegenHashTableLoad(index);
871  CHECK(pos_ptr);
872  const int shard_count = shardCount();
873  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
874  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
875  if (key_col_var && val_col_var &&
877  key_col_var,
878  val_col_var,
879  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
880  throw std::runtime_error(
881  "Query execution fails because the query contains not supported self-join "
882  "pattern. We suspect the query requires multiple left-deep join tree due to "
883  "the "
884  "join condition of the self-join and is not supported for now. Please consider "
885  "rewriting table order in "
886  "FROM clause.");
887  }
888  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
889  const int64_t sub_buff_size = getComponentBufferSize();
890  const auto& key_col_ti = key_col->get_type_info();
891 
892  auto bucketize = (key_col_ti.get_type() == kDATE);
893  return HashJoin::codegenMatchingSet(hash_join_idx_args,
894  shard_count,
895  !key_col_ti.get_notnull(),
896  isBitwiseEq(),
897  sub_buff_size,
898  executor_,
899  bucketize);
900 }
901 
903  return 0;
904 }
905 
907  return getComponentBufferSize();
908 }
909 
911  return 2 * getComponentBufferSize();
912 }
913 
915  if (hash_tables_for_device_.empty()) {
916  return 0;
917  }
918  auto hash_table = hash_tables_for_device_.front();
919  if (hash_table && hash_table->getLayout() == HashType::OneToMany) {
920  return hash_table->getEntryCount() * sizeof(int32_t);
921  } else {
922  return 0;
923  }
924 }
925 
927  CHECK_LT(device_id, hash_tables_for_device_.size());
928  return hash_tables_for_device_[device_id].get();
929 }
930 
932  const int device_id,
933  bool raw) const {
934  auto buffer = getJoinHashBuffer(device_type, device_id);
935  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
936  auto hash_table = getHashTableForDevice(device_id);
937 #ifdef HAVE_CUDA
938  std::unique_ptr<int8_t[]> buffer_copy;
939  if (device_type == ExecutorDeviceType::GPU) {
940  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
941 
942  auto data_mgr = executor_->getDataMgr();
943  auto device_allocator = data_mgr->createGpuAllocator(device_id);
944  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
945  }
946  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
947 #else
948  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
949 #endif // HAVE_CUDA
950  auto ptr2 = ptr1 + offsetBufferOff();
951  auto ptr3 = ptr1 + countBufferOff();
952  auto ptr4 = ptr1 + payloadBufferOff();
953  return HashTable::toString("perfect",
955  0,
956  0,
957  hash_table ? hash_table->getEntryCount() : 0,
958  ptr1,
959  ptr2,
960  ptr3,
961  ptr4,
962  buffer_size,
963  raw);
964 }
965 
966 std::set<DecodedJoinHashBufferEntry> PerfectJoinHashTable::toSet(
967  const ExecutorDeviceType device_type,
968  const int device_id) const {
969  auto buffer = getJoinHashBuffer(device_type, device_id);
970  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
971  auto hash_table = getHashTableForDevice(device_id);
972 #ifdef HAVE_CUDA
973  std::unique_ptr<int8_t[]> buffer_copy;
974  if (device_type == ExecutorDeviceType::GPU) {
975  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
976 
977  auto data_mgr = executor_->getDataMgr();
978  auto device_allocator = data_mgr->createGpuAllocator(device_id);
979  device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
980  }
981  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
982 #else
983  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
984 #endif // HAVE_CUDA
985  auto ptr2 = ptr1 + offsetBufferOff();
986  auto ptr3 = ptr1 + countBufferOff();
987  auto ptr4 = ptr1 + payloadBufferOff();
988  return HashTable::toSet(0,
989  0,
990  hash_table ? hash_table->getEntryCount() : 0,
991  ptr1,
992  ptr2,
993  ptr3,
994  ptr4,
995  buffer_size);
996 }
997 
999  const size_t index) {
1000  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1001  using namespace std::string_literals;
1002 
1004  const auto cols = get_cols(
1005  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1006  auto key_col = cols.second;
1007  CHECK(key_col);
1008  auto val_col = cols.first;
1009  CHECK(val_col);
1010  CodeGenerator code_generator(executor_);
1011  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1012  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1013  if (key_col_var && val_col_var &&
1015  key_col_var,
1016  val_col_var,
1017  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1018  throw std::runtime_error(
1019  "Query execution fails because the query contains not supported self-join "
1020  "pattern. We suspect the query requires multiple left-deep join tree due to "
1021  "the "
1022  "join condition of the self-join and is not supported for now. Please consider "
1023  "rewriting table order in "
1024  "FROM clause.");
1025  }
1026  const auto key_lvs = code_generator.codegen(key_col, true, co);
1027  CHECK_EQ(size_t(1), key_lvs.size());
1028  auto hash_ptr = codegenHashTableLoad(index);
1029  CHECK(hash_ptr);
1030  const int shard_count = shardCount();
1031  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1032 
1033  const auto& key_col_ti = key_col->get_type_info();
1034  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1035  : "hash_join_idx"s);
1036 
1037  if (isBitwiseEq()) {
1038  fname += "_bitwise";
1039  }
1040  if (shard_count) {
1041  fname += "_sharded";
1042  }
1043 
1044  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1045  fname += "_nullable";
1046  }
1047  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1048 }
1049 
1051  const Analyzer::ColumnVar* inner_col) const {
1052  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1053 }
1054 
1056  const int inner_table_id,
1057  const std::vector<InputTableInfo>& query_infos) {
1058  std::optional<size_t> ti_idx;
1059  for (size_t i = 0; i < query_infos.size(); ++i) {
1060  if (inner_table_id == query_infos[i].table_id) {
1061  ti_idx = i;
1062  break;
1063  }
1064  }
1065  CHECK(ti_idx);
1066  return query_infos[*ti_idx];
1067 }
1068 
1069 size_t get_entries_per_device(const size_t total_entries,
1070  const size_t shard_count,
1071  const size_t device_count,
1072  const Data_Namespace::MemoryLevel memory_level) {
1073  const auto entries_per_shard =
1074  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1075  size_t entries_per_device = entries_per_shard;
1076  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1077  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1078  CHECK_GT(shards_per_device, 0u);
1079  entries_per_device = entries_per_shard * shards_per_device;
1080  }
1081  return entries_per_device;
1082 }
1083 
1087  : 0;
1088 }
1089 
1091  return qual_bin_oper_->get_optype() == kBW_EQ;
1092 }
int get_table_id() const
Definition: Analyzer.h:193
llvm::Value * codegenHashTableLoad(const size_t table_idx)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< int > ChunkKey
Definition: types.h:37
size_t DeviceIdentifier
Definition: DataRecycler.h:112
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxyTranslationMap *str_proxy_translation_map, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:108
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:217
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
ExecutorDeviceType
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
QueryPlanHash hashtable_cache_key_
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
#define const
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const int table_id)
const TableIdToNodeMap table_id_to_node_map_
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const Expr * get_right_operand() const
Definition: Analyzer.h:442
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:302
#define UNREACHABLE()
Definition: Logger.h:255
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:233
#define CHECK_GE(x, y)
Definition: Logger.h:224
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1064
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:363
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxyTranslationMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
size_t payloadBufferOff() const noexceptoverride
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
void freeHashBufferMemory()
Definition: HashJoin.h:283
#define CHECK_GT(x, y)
Definition: Logger.h:223
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
static std::unique_ptr< HashtableRecycler > hash_table_cache_
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner, const Catalog_Namespace::Catalog &catalog)
future< Result > async(Fn &&fn, Args &&...args)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:163
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:217
CacheItemType
Definition: DataRecycler.h:37
int count
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, Executor *executor)
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:260
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:633
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
#define VLOGGING(n)
Definition: Logger.h:209
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
#define CHECK_LT(x, y)
Definition: Logger.h:221
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
Definition: sqltypes.h:53
const std::vector< InputTableInfo > & query_infos_
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
#define CHECK_LE(x, y)
Definition: Logger.h:222
int getInnerTableId() const noexceptoverride
int get_comp_param() const
Definition: Analyzer.h:197
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:134
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:246
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
uint64_t ThreadId
Definition: Logger.h:351
size_t QueryPlanHash
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
static InnerOuter normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:649
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
ThreadId thread_id()
Definition: Logger.cpp:816
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
Definition: sqldefs.h:31
std::shared_ptr< StringDictionaryProxyTranslationMap > str_proxy_translation_map_
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
static std::shared_ptr< StringDictionaryProxyTranslationMap > translateInnerToOuterStrDictProxies(const InnerOuter &cols, const Executor *executor)
Definition: HashJoin.cpp:366
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
Definition: Analyzer.h:441
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
int get_column_id() const
Definition: Analyzer.h:194
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:133
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:836
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashType
Definition: HashTable.h:19
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:826
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
std::unordered_set< size_t > table_keys_
#define VLOG(n)
Definition: Logger.h:305
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count, HashtableAccessPathInfo hashtable_access_path_info, const TableIdToNodeMap &table_id_to_node_map)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override