OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <thread>
23 
24 #include "Logger/Logger.h"
27 #include "QueryEngine/Execute.h"
32 
38 
39 namespace {
40 
41 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
43  const TemporaryTables* temporary_tables) {
44  const auto lhs = qual_bin_oper->get_left_operand();
45  const auto rhs = qual_bin_oper->get_right_operand();
46  return HashJoin::normalizeColumnPair(lhs, rhs, cat, temporary_tables);
47 }
48 
50  ExpressionRange const& col_range,
51  bool const is_bw_eq) {
52  using EmptyRangeSize = boost::optional<size_t>;
53  auto empty_range_check = [](ExpressionRange const& col_range,
54  bool const is_bw_eq) -> EmptyRangeSize {
55  if (col_range.getIntMin() > col_range.getIntMax()) {
56  CHECK_EQ(col_range.getIntMin(), int64_t(0));
57  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
58  if (is_bw_eq) {
59  return size_t(1);
60  }
61  return size_t(0);
62  }
63  return EmptyRangeSize{};
64  };
65 
66  auto empty_range = empty_range_check(col_range, is_bw_eq);
67  if (empty_range) {
68  return {size_t(*empty_range), 1};
69  }
70 
71  int64_t bucket_normalization =
72  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
73  CHECK_GT(bucket_normalization, 0);
74  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
75  bucket_normalization};
76 }
77 
78 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
79  if (col_range.getIntMin() > col_range.getIntMax()) {
80  CHECK_EQ(col_range.getIntMin(), int64_t(0));
81  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
82  return is_bw_eq ? 1 : 0;
83  }
84  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
85 }
86 
87 } // namespace
88 
89 namespace {
90 
91 bool shard_count_less_or_equal_device_count(const int inner_table_id,
92  const Executor* executor) {
93  const auto inner_table_info = executor->getTableInfo(inner_table_id);
94  std::unordered_set<int> device_holding_fragments;
95  auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
96  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
97  for (const auto& fragment : inner_table_info.fragments) {
98  if (fragment.shard != -1) {
99  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
100  if (!it_ok.second) {
101  return false;
102  }
103  }
104  }
105  return true;
106 }
107 
108 } // namespace
109 
111  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
112  const Executor* executor) {
113  const auto inner_col = equi_pair.first;
114  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
115  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
116  return 0;
117  }
118  if (outer_col->get_rte_idx()) {
119  return 0;
120  }
121  if (inner_col->get_type_info() != outer_col->get_type_info()) {
122  return 0;
123  }
124  const auto catalog = executor->getCatalog();
125  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
126  CHECK(inner_td);
127  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
128  CHECK(outer_td);
129  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
130  inner_td->nShards != outer_td->nShards) {
131  return 0;
132  }
133  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
134  return 0;
135  }
136  // The two columns involved must be the ones on which the tables have been sharded on.
137  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
138  outer_td->shardedColumnId == outer_col->get_column_id()) ||
139  (outer_td->shardedColumnId == inner_col->get_column_id() &&
140  inner_td->shardedColumnId == inner_col->get_column_id())
141  ? inner_td->nShards
142  : 0;
143 }
144 
146 std::shared_ptr<PerfectJoinHashTable> PerfectJoinHashTable::getInstance(
147  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
148  const std::vector<InputTableInfo>& query_infos,
149  const Data_Namespace::MemoryLevel memory_level,
150  const JoinType join_type,
151  const HashType preferred_hash_type,
152  const int device_count,
153  ColumnCacheMap& column_cache,
154  Executor* executor) {
155  decltype(std::chrono::steady_clock::now()) ts1, ts2;
156  if (VLOGGING(1)) {
157  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
158  << " for qual: " << qual_bin_oper->toString();
159  ts1 = std::chrono::steady_clock::now();
160  }
161  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
162  const auto cols =
163  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
164  const auto inner_col = cols.first;
165  CHECK(inner_col);
166  const auto& ti = inner_col->get_type_info();
167  auto col_range =
168  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
169  if (col_range.getType() == ExpressionRangeType::Invalid) {
170  throw HashJoinFail(
171  "Could not compute range for the expressions involved in the equijoin");
172  }
173  if (ti.is_string()) {
174  // The nullable info must be the same as the source column.
175  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
176  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
177  throw HashJoinFail(
178  "Could not compute range for the expressions involved in the equijoin");
179  }
180  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
181  // If the inner column expression range is empty, use the inner col range
182  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
183  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
184  col_range = source_col_range;
185  } else {
186  col_range = ExpressionRange::makeIntRange(
187  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
188  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
189  0,
190  source_col_range.hasNulls());
191  }
192  }
193  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
194  const auto max_hash_entry_count =
196  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
197  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
198 
199  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
200  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
201  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
202 
203  if (bucketized_entry_count > max_hash_entry_count) {
204  throw TooManyHashEntries();
205  }
206 
207  if (qual_bin_oper->get_optype() == kBW_EQ &&
208  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
209  throw HashJoinFail("Cannot translate null value for kBW_EQ");
210  }
211  auto join_hash_table =
212  std::shared_ptr<PerfectJoinHashTable>(new PerfectJoinHashTable(qual_bin_oper,
213  inner_col,
214  query_infos,
215  memory_level,
216  join_type,
217  preferred_hash_type,
218  col_range,
219  column_cache,
220  executor,
221  device_count));
222  try {
223  join_hash_table->reify();
224  } catch (const TableMustBeReplicated& e) {
225  // Throw a runtime error to abort the query
226  join_hash_table->freeHashBufferMemory();
227  throw std::runtime_error(e.what());
228  } catch (const HashJoinFail& e) {
229  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
230  // possible)
231  join_hash_table->freeHashBufferMemory();
232  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
233  "involved in equijoin | ") +
234  e.what());
235  } catch (const ColumnarConversionNotSupported& e) {
236  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
237  e.what());
238  } catch (const OutOfMemory& e) {
239  throw HashJoinFail(
240  std::string("Ran out of memory while building hash tables for equijoin | ") +
241  e.what());
242  } catch (const std::exception& e) {
243  throw std::runtime_error(
244  std::string("Fatal error while attempting to build hash tables for join: ") +
245  e.what());
246  }
247  if (VLOGGING(1)) {
248  ts2 = std::chrono::steady_clock::now();
249  VLOG(1) << "Built perfect hash table "
250  << getHashTypeString(join_hash_table->getHashType()) << " in "
251  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
252  << " ms";
253  }
254  return join_hash_table;
255 }
256 
258  const Analyzer::Expr* outer_col_expr,
259  const Executor* executor) {
260  const auto catalog = executor->getCatalog();
261  CHECK(catalog);
262  const auto inner_cd = get_column_descriptor_maybe(
263  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
264  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
265  inner_col->get_table_id(),
266  inner_cd,
267  executor->getTemporaryTables());
268  // Only strings may need dictionary translation.
269  if (!inner_ti.is_string()) {
270  return false;
271  }
272  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
273  CHECK(outer_col);
274  const auto outer_cd = get_column_descriptor_maybe(
275  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
276  // Don't want to deal with temporary tables for now, require translation.
277  if (!inner_cd || !outer_cd) {
278  return true;
279  }
280  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
281  outer_col->get_table_id(),
282  outer_cd,
283  executor->getTemporaryTables());
284  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
285  // If the two columns don't share the dictionary, translation is needed.
286  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
287 }
288 
289 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
290  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
291  const int device_id,
292  const int device_count) {
293  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
294  for (const auto& fragment : fragments) {
295  CHECK_GE(fragment.shard, 0);
296  if (fragment.shard % device_count == device_id) {
297  shards_for_device.push_back(fragment);
298  }
299  }
300  return shards_for_device;
301 }
302 
304  auto timer = DEBUG_TIMER(__func__);
306  auto catalog = const_cast<Catalog_Namespace::Catalog*>(executor_->getCatalog());
307  const auto cols =
308  get_cols(qual_bin_oper_.get(), *catalog, executor_->temporary_tables_);
309  const auto inner_col = cols.first;
311  inner_col->get_table_id(),
313  executor_);
314  const auto& query_info = getInnerQueryInfo(inner_col).info;
315  if (query_info.fragments.empty()) {
316  return;
317  }
318  if (query_info.getNumTuplesUpperBound() >
319  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
320  throw TooManyHashEntries();
321  }
322  std::vector<std::future<void>> init_threads;
323  const int shard_count = shardCount();
324 
325  inner_outer_pairs_.push_back(cols);
326  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
327 
328  std::vector<ColumnsForDevice> columns_per_device;
329  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
330  try {
331  auto data_mgr = executor_->getDataMgr();
333  for (int device_id = 0; device_id < device_count_; ++device_id) {
334  dev_buff_owners.emplace_back(
335  std::make_unique<CudaAllocator>(data_mgr, device_id));
336  }
337  }
338  for (int device_id = 0; device_id < device_count_; ++device_id) {
339  const auto fragments =
340  shard_count
341  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
342  : query_info.fragments;
343  const auto columns_for_device =
344  fetchColumnsForDevice(fragments,
345  device_id,
347  ? dev_buff_owners[device_id].get()
348  : nullptr,
349  *catalog);
350  columns_per_device.push_back(columns_for_device);
351  const auto hash_table_key = genHashTableKey(
352  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
353  init_threads.push_back(std::async(std::launch::async,
355  this,
356  hash_table_key,
357  columns_per_device[device_id],
358  hash_type_,
359  device_id,
360  logger::thread_id()));
361  }
362  for (auto& init_thread : init_threads) {
363  init_thread.wait();
364  }
365  for (auto& init_thread : init_threads) {
366  init_thread.get();
367  }
368 
369  } catch (const NeedsOneToManyHash& e) {
372  init_threads.clear();
374  CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
375  }
376  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
377  for (int device_id = 0; device_id < device_count_; ++device_id) {
378  const auto fragments =
379  shard_count
380  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
381  : query_info.fragments;
382  const auto hash_table_key = genHashTableKey(
383  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
384  init_threads.push_back(std::async(std::launch::async,
386  this,
387  hash_table_key,
388  columns_per_device[device_id],
389  hash_type_,
390  device_id,
391  logger::thread_id()));
392  }
393  for (auto& init_thread : init_threads) {
394  init_thread.wait();
395  }
396  for (auto& init_thread : init_threads) {
397  init_thread.get();
398  }
399  }
400 }
401 
403  const std::vector<InnerOuter>& inner_outer_pairs) const {
404  for (const auto& inner_outer_pair : inner_outer_pairs) {
406  inner_outer_pair.first, inner_outer_pair.second, executor_)) {
408  }
409  }
410  return memory_level_;
411 }
412 
414  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
415  const int device_id,
416  DeviceAllocator* dev_buff_owner,
417  const Catalog_Namespace::Catalog& catalog) {
418  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
419 
420  std::vector<JoinColumn> join_columns;
421  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
422  std::vector<JoinColumnTypeInfo> join_column_types;
423  std::vector<JoinBucketInfo> join_bucket_info;
424  std::vector<std::shared_ptr<void>> malloc_owner;
425  for (const auto& inner_outer_pair : inner_outer_pairs_) {
426  const auto inner_col = inner_outer_pair.first;
427  const auto inner_cd = get_column_descriptor_maybe(
428  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
429  if (inner_cd && inner_cd->isVirtualCol) {
431  }
432  join_columns.emplace_back(fetchJoinColumn(inner_col,
433  fragments,
434  effective_memory_level,
435  device_id,
436  chunks_owner,
437  dev_buff_owner,
438  malloc_owner,
439  executor_,
440  &column_cache_));
441  const auto& ti = inner_col->get_type_info();
442  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
443  0,
444  0,
446  isBitwiseEq(),
447  0,
449  }
450  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
451 }
452 
454  const ColumnsForDevice& columns_for_device,
455  const HashType layout,
456  const int device_id,
457  const logger::ThreadId parent_thread_id) {
458  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
459  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
460 
461  CHECK_EQ(columns_for_device.join_columns.size(), size_t(1));
462  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
463  auto& join_column = columns_for_device.join_columns.front();
464  if (layout == HashType::OneToOne) {
465  const auto err = initHashTableForDevice(hash_table_key,
466  join_column,
467  inner_outer_pairs_.front(),
468  layout,
469  effective_memory_level,
470  device_id);
471  if (err) {
472  throw NeedsOneToManyHash();
473  }
474  } else {
475  const auto err = initHashTableForDevice(hash_table_key,
476  join_column,
477  inner_outer_pairs_.front(),
479  effective_memory_level,
480  device_id);
481  if (err) {
482  throw std::runtime_error("Unexpected error building one to many hash table: " +
483  std::to_string(err));
484  }
485  }
486 }
487 
489  const ChunkKey& chunk_key,
490  const JoinColumn& join_column,
491  const InnerOuter& cols,
492  const HashType layout,
493  const Data_Namespace::MemoryLevel effective_memory_level,
494  const int device_id) {
495  auto timer = DEBUG_TIMER(__func__);
496  const auto inner_col = cols.first;
497  CHECK(inner_col);
498 
499  auto hash_entry_info = get_bucketized_hash_entry_info(
500  inner_col->get_type_info(), col_range_, isBitwiseEq());
501  if (!hash_entry_info && layout == HashType::OneToOne) {
502  // TODO: what is this for?
503  return 0;
504  }
505 #ifndef HAVE_CUDA
506  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
507 #endif
508  int err{0};
509  const int32_t hash_join_invalid_val{-1};
510  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
511  CHECK(!chunk_key.empty());
512 
513  auto hash_table = initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
514  {
515  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
516  if (!hash_table) {
518  if (layout == HashType::OneToOne) {
519  builder.initOneToOneHashTableOnCpu(join_column,
520  col_range_,
521  isBitwiseEq(),
522  cols,
523  join_type_,
524  layout,
525  hash_entry_info,
526  hash_join_invalid_val,
527  executor_);
528  hash_table = builder.getHashTable();
529  } else {
530  builder.initOneToManyHashTableOnCpu(join_column,
531  col_range_,
532  isBitwiseEq(),
533  cols,
534  hash_entry_info,
535  hash_join_invalid_val,
536  executor_);
537  hash_table = builder.getHashTable();
538  }
539  } else {
540  if (layout == HashType::OneToOne &&
541  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU) >
542  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t)) {
543  // TODO: can this ever happen?
544  // Too many hash entries, need to retry with a 1:many table
545  throw NeedsOneToManyHash();
546  }
547  }
548  }
549  if (inner_col->get_table_id() > 0) {
550  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, hash_table, cols);
551  }
552  // Transfer the hash table on the GPU if we've only built it on CPU
553  // but the query runs on GPU (join on dictionary encoded columns).
555 #ifdef HAVE_CUDA
556  const auto& ti = inner_col->get_type_info();
557  CHECK(ti.is_string());
558  auto data_mgr = executor_->getDataMgr();
559  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
560 
561  PerfectJoinHashTableBuilder gpu_builder;
562  gpu_builder.allocateDeviceMemory(join_column,
563  hash_table->getLayout(),
564  hash_entry_info,
565  shardCount(),
566  device_id,
568  executor_);
569  std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.getHashTable();
570  CHECK(gpu_hash_table);
571  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
572  CHECK(gpu_buffer_ptr);
573 
574  CHECK(hash_table);
575  // GPU size returns reserved size
576  CHECK_LE(hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
577  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
578  copy_to_gpu(data_mgr,
579  reinterpret_cast<CUdeviceptr>(gpu_buffer_ptr),
580  hash_table->getCpuBuffer(),
581  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
582  device_id);
583  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
584  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
585 #else
586  UNREACHABLE();
587 #endif
588  } else {
589  CHECK(hash_table);
590  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
591  hash_tables_for_device_[device_id] = hash_table;
592  }
593  } else {
594 #ifdef HAVE_CUDA
596  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
597  builder.allocateDeviceMemory(join_column,
598  layout,
599  hash_entry_info,
600  shardCount(),
601  device_id,
603  executor_);
604  builder.initHashTableOnGpu(chunk_key,
605  join_column,
606  col_range_,
607  isBitwiseEq(),
608  cols,
609  join_type_,
610  layout,
611  hash_entry_info,
612  shardCount(),
613  hash_join_invalid_val,
614  device_id,
616  executor_);
617  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
618  hash_tables_for_device_[device_id] = builder.getHashTable();
619 #else
620  UNREACHABLE();
621 #endif
622  }
623 
624  return err;
625 }
626 
628  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
629  const Analyzer::Expr* outer_col_expr,
630  const Analyzer::ColumnVar* inner_col) const {
631  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
632  inner_col->get_table_id(),
633  inner_col->get_column_id()};
634  const auto& ti = inner_col->get_type_info();
635  if (ti.is_string()) {
636  CHECK_EQ(kENCODING_DICT, ti.get_compression());
637  size_t outer_elem_count = 0;
638  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
639  CHECK(outer_col);
640  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
641  for (auto& frag : outer_query_info.fragments) {
642  outer_elem_count = frag.getNumTuples();
643  }
644  hash_table_key.push_back(outer_elem_count);
645  }
646  if (fragments.size() < 2) {
647  hash_table_key.push_back(fragments.front().fragmentId);
648  }
649  return hash_table_key;
650 }
651 
652 std::shared_ptr<PerfectHashTable> PerfectJoinHashTable::initHashTableOnCpuFromCache(
653  const ChunkKey& chunk_key,
654  const size_t num_elements,
655  const InnerOuter& cols) {
656  auto timer = DEBUG_TIMER(__func__);
657  CHECK_GE(chunk_key.size(), size_t(2));
658  if (chunk_key[1] < 0) {
659  // Do not cache hash tables over intermediate results
660  return nullptr;
661  }
662  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
664  *cols.first,
665  outer_col ? *outer_col : *cols.first,
666  num_elements,
667  chunk_key,
668  qual_bin_oper_->get_optype(),
669  join_type_};
670  auto hash_table_opt = (hash_table_cache_->get(cache_key));
671  return hash_table_opt ? *hash_table_opt : nullptr;
672 }
673 
675  const size_t num_elements,
676  HashTableCacheValue hash_table,
677  const InnerOuter& cols) {
678  CHECK_GE(chunk_key.size(), size_t(2));
679  if (chunk_key[1] < 0) {
680  // Do not cache hash tables over intermediate results
681  return;
682  }
683  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
685  *cols.first,
686  outer_col ? *outer_col : *cols.first,
687  num_elements,
688  chunk_key,
689  qual_bin_oper_->get_optype(),
690  join_type_};
692  CHECK(hash_table && !hash_table->getGpuBuffer());
693  hash_table_cache_->insert(cache_key, hash_table);
694 }
695 
696 llvm::Value* PerfectJoinHashTable::codegenHashTableLoad(const size_t table_idx) {
697  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
698  const auto hash_ptr = HashJoin::codegenHashTableLoad(table_idx, executor_);
699  if (hash_ptr->getType()->isIntegerTy(64)) {
700  return hash_ptr;
701  }
702  CHECK(hash_ptr->getType()->isPointerTy());
703  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
704  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
705  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
706 }
707 
708 std::vector<llvm::Value*> PerfectJoinHashTable::getHashJoinArgs(
709  llvm::Value* hash_ptr,
710  const Analyzer::Expr* key_col,
711  const int shard_count,
712  const CompilationOptions& co) {
713  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
714  CodeGenerator code_generator(executor_);
715  const auto key_lvs = code_generator.codegen(key_col, true, co);
716  CHECK_EQ(size_t(1), key_lvs.size());
717  auto const& key_col_ti = key_col->get_type_info();
718  auto hash_entry_info =
720 
721  std::vector<llvm::Value*> hash_join_idx_args{
722  hash_ptr,
723  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
724  executor_->cgen_state_->llInt(col_range_.getIntMin()),
725  executor_->cgen_state_->llInt(col_range_.getIntMax())};
726  if (shard_count) {
727  const auto expected_hash_entry_count =
729  const auto entry_count_per_shard =
730  (expected_hash_entry_count + shard_count - 1) / shard_count;
731  hash_join_idx_args.push_back(
732  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
733  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
734  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
735  }
736  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
737  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
738  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
739  inline_fixed_encoding_null_val(key_col_logical_ti)));
740  }
741  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
742  if (isBitwiseEq()) {
743  if (special_date_bucketization_case) {
744  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
745  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
746  } else {
747  hash_join_idx_args.push_back(
748  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
749  }
750  }
751 
752  if (special_date_bucketization_case) {
753  hash_join_idx_args.emplace_back(
754  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
755  }
756 
757  return hash_join_idx_args;
758 }
759 
761  const size_t index) {
762  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
763  const auto cols = get_cols(
764  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
765  auto key_col = cols.second;
766  CHECK(key_col);
767  auto val_col = cols.first;
768  CHECK(val_col);
769  auto pos_ptr = codegenHashTableLoad(index);
770  CHECK(pos_ptr);
771  const int shard_count = shardCount();
772  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
773  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
774  if (key_col_var && val_col_var &&
776  key_col_var,
777  val_col_var,
778  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
779  throw std::runtime_error(
780  "Query execution fails because the query contains not supported self-join "
781  "pattern. We suspect the query requires multiple left-deep join tree due to "
782  "the "
783  "join condition of the self-join and is not supported for now. Please consider "
784  "rewriting table order in "
785  "FROM clause.");
786  }
787  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
788  const int64_t sub_buff_size = getComponentBufferSize();
789  const auto& key_col_ti = key_col->get_type_info();
790 
791  auto bucketize = (key_col_ti.get_type() == kDATE);
792  return HashJoin::codegenMatchingSet(hash_join_idx_args,
793  shard_count,
794  !key_col_ti.get_notnull(),
795  isBitwiseEq(),
796  sub_buff_size,
797  executor_,
798  bucketize);
799 }
800 
802  return 0;
803 }
804 
806  return getComponentBufferSize();
807 }
808 
810  return 2 * getComponentBufferSize();
811 }
812 
814  if (hash_tables_for_device_.empty()) {
815  return 0;
816  }
817  auto hash_table = hash_tables_for_device_.front();
818  if (hash_table && hash_table->getLayout() == HashType::OneToMany) {
819  return hash_table->getEntryCount() * sizeof(int32_t);
820  } else {
821  return 0;
822  }
823 }
824 
826  CHECK_LT(device_id, hash_tables_for_device_.size());
827  return hash_tables_for_device_[device_id].get();
828 }
829 
831  const int device_id,
832  bool raw) const {
833  auto buffer = getJoinHashBuffer(device_type, device_id);
834  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
835  auto hash_table = getHashTableForDevice(device_id);
836 #ifdef HAVE_CUDA
837  std::unique_ptr<int8_t[]> buffer_copy;
838  if (device_type == ExecutorDeviceType::GPU) {
839  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
840 
841  copy_from_gpu(executor_->getDataMgr(),
842  buffer_copy.get(),
843  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
844  buffer_size,
845  device_id);
846  }
847  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
848 #else
849  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
850 #endif // HAVE_CUDA
851  auto ptr2 = ptr1 + offsetBufferOff();
852  auto ptr3 = ptr1 + countBufferOff();
853  auto ptr4 = ptr1 + payloadBufferOff();
854  return HashTable::toString("perfect",
856  0,
857  0,
858  hash_table ? hash_table->getEntryCount() : 0,
859  ptr1,
860  ptr2,
861  ptr3,
862  ptr4,
863  buffer_size,
864  raw);
865 }
866 
867 std::set<DecodedJoinHashBufferEntry> PerfectJoinHashTable::toSet(
868  const ExecutorDeviceType device_type,
869  const int device_id) const {
870  auto buffer = getJoinHashBuffer(device_type, device_id);
871  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
872  auto hash_table = getHashTableForDevice(device_id);
873 #ifdef HAVE_CUDA
874  std::unique_ptr<int8_t[]> buffer_copy;
875  if (device_type == ExecutorDeviceType::GPU) {
876  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
877 
878  copy_from_gpu(executor_->getDataMgr(),
879  buffer_copy.get(),
880  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
881  buffer_size,
882  device_id);
883  }
884  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
885 #else
886  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
887 #endif // HAVE_CUDA
888  auto ptr2 = ptr1 + offsetBufferOff();
889  auto ptr3 = ptr1 + countBufferOff();
890  auto ptr4 = ptr1 + payloadBufferOff();
891  return HashTable::toSet(0,
892  0,
893  hash_table ? hash_table->getEntryCount() : 0,
894  ptr1,
895  ptr2,
896  ptr3,
897  ptr4,
898  buffer_size);
899 }
900 
902  const size_t index) {
903  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
904  using namespace std::string_literals;
905 
907  const auto cols = get_cols(
908  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
909  auto key_col = cols.second;
910  CHECK(key_col);
911  auto val_col = cols.first;
912  CHECK(val_col);
913  CodeGenerator code_generator(executor_);
914  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
915  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
916  if (key_col_var && val_col_var &&
918  key_col_var,
919  val_col_var,
920  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
921  throw std::runtime_error(
922  "Query execution fails because the query contains not supported self-join "
923  "pattern. We suspect the query requires multiple left-deep join tree due to "
924  "the "
925  "join condition of the self-join and is not supported for now. Please consider "
926  "rewriting table order in "
927  "FROM clause.");
928  }
929  const auto key_lvs = code_generator.codegen(key_col, true, co);
930  CHECK_EQ(size_t(1), key_lvs.size());
931  auto hash_ptr = codegenHashTableLoad(index);
932  CHECK(hash_ptr);
933  const int shard_count = shardCount();
934  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
935 
936  const auto& key_col_ti = key_col->get_type_info();
937  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
938  : "hash_join_idx"s);
939 
940  if (isBitwiseEq()) {
941  fname += "_bitwise";
942  }
943  if (shard_count) {
944  fname += "_sharded";
945  }
946 
947  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
948  fname += "_nullable";
949  }
950  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
951 }
952 
954  const Analyzer::ColumnVar* inner_col) const {
955  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
956 }
957 
959  const int inner_table_id,
960  const std::vector<InputTableInfo>& query_infos) {
961  std::optional<size_t> ti_idx;
962  for (size_t i = 0; i < query_infos.size(); ++i) {
963  if (inner_table_id == query_infos[i].table_id) {
964  ti_idx = i;
965  break;
966  }
967  }
968  CHECK(ti_idx);
969  return query_infos[*ti_idx];
970 }
971 
972 size_t get_entries_per_device(const size_t total_entries,
973  const size_t shard_count,
974  const size_t device_count,
975  const Data_Namespace::MemoryLevel memory_level) {
976  const auto entries_per_shard =
977  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
978  size_t entries_per_device = entries_per_shard;
979  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
980  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
981  CHECK_GT(shards_per_device, 0u);
982  entries_per_device = entries_per_shard * shards_per_device;
983  }
984  return entries_per_device;
985 }
986 
990  : 0;
991 }
992 
994  return qual_bin_oper_->get_optype() == kBW_EQ;
995 }
int get_table_id() const
Definition: Analyzer.h:194
llvm::Value * codegenHashTableLoad(const size_t table_idx)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:256
JoinType
Definition: sqldefs.h:108
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
ExecutorDeviceType
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
size_t num_elems
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
#define const
const Expr * get_right_operand() const
Definition: Analyzer.h:443
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:292
#define UNREACHABLE()
Definition: Logger.h:253
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:234
#define CHECK_GE(x, y)
Definition: Logger.h:222
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:931
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:357
size_t payloadBufferOff() const noexceptoverride
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
void freeHashBufferMemory()
Definition: HashJoin.h:279
#define CHECK_GT(x, y)
Definition: Logger.h:221
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner, const Catalog_Namespace::Catalog &catalog)
future< Result > async(Fn &&fn, Args &&...args)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:164
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:218
std::shared_ptr< PerfectHashTable > HashTableCacheValue
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const InnerOuter &cols)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static std::unique_ptr< HashTableCache< JoinHashTableCacheKey, HashTableCacheValue > > hash_table_cache_
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:571
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::unique_ptr< PerfectHashTable > getHashTable()
#define VLOGGING(n)
Definition: Logger.h:207
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:30
#define CHECK_LT(x, y)
Definition: Logger.h:219
Definition: sqltypes.h:53
const std::vector< InputTableInfo > & query_infos_
#define CHECK_LE(x, y)
Definition: Logger.h:220
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:134
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:242
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
uint64_t ThreadId
Definition: Logger.h:345
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
static InnerOuter normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:587
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
ThreadId thread_id()
Definition: Logger.cpp:791
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
Definition: sqldefs.h:31
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
Definition: Analyzer.h:442
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
int get_column_id() const
Definition: Analyzer.h:195
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
HashTable * getHashTableForDevice(const size_t device_id) const
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:774
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashType
Definition: HashTable.h:19
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:764
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, HashTableCacheValue hash_table, const InnerOuter &cols)
#define VLOG(n)
Definition: Logger.h:303
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override