OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
PerfectJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <thread>
23 
24 #include "Logger/Logger.h"
27 #include "QueryEngine/Execute.h"
32 
38 
39 namespace {
40 
41 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
43  const TemporaryTables* temporary_tables) {
44  const auto lhs = qual_bin_oper->get_left_operand();
45  const auto rhs = qual_bin_oper->get_right_operand();
46  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
47 }
48 
50  ExpressionRange const& col_range,
51  bool const is_bw_eq) {
52  using EmptyRangeSize = boost::optional<size_t>;
53  auto empty_range_check = [](ExpressionRange const& col_range,
54  bool const is_bw_eq) -> EmptyRangeSize {
55  if (col_range.getIntMin() > col_range.getIntMax()) {
56  CHECK_EQ(col_range.getIntMin(), int64_t(0));
57  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
58  if (is_bw_eq) {
59  return size_t(1);
60  }
61  return size_t(0);
62  }
63  return EmptyRangeSize{};
64  };
65 
66  auto empty_range = empty_range_check(col_range, is_bw_eq);
67  if (empty_range) {
68  return {size_t(*empty_range), 1};
69  }
70 
71  int64_t bucket_normalization =
72  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
73  CHECK_GT(bucket_normalization, 0);
74  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
75  bucket_normalization};
76 }
77 
78 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
79  if (col_range.getIntMin() > col_range.getIntMax()) {
80  CHECK_EQ(col_range.getIntMin(), int64_t(0));
81  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
82  return is_bw_eq ? 1 : 0;
83  }
84  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
85 }
86 
87 } // namespace
88 
89 namespace {
90 
91 bool shard_count_less_or_equal_device_count(const int inner_table_id,
92  const Executor* executor) {
93  const auto inner_table_info = executor->getTableInfo(inner_table_id);
94  std::unordered_set<int> device_holding_fragments;
95  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
96  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
97  for (const auto& fragment : inner_table_info.fragments) {
98  if (fragment.shard != -1) {
99  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
100  if (!it_ok.second) {
101  return false;
102  }
103  }
104  }
105  return true;
106 }
107 
108 } // namespace
109 
111  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
112  const Executor* executor) {
113  const auto inner_col = equi_pair.first;
114  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
115  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
116  return 0;
117  }
118  if (outer_col->get_rte_idx()) {
119  return 0;
120  }
121  if (inner_col->get_type_info() != outer_col->get_type_info()) {
122  return 0;
123  }
124  const auto catalog = executor->getCatalog();
125  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
126  CHECK(inner_td);
127  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
128  CHECK(outer_td);
129  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
130  inner_td->nShards != outer_td->nShards) {
131  return 0;
132  }
133  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
134  return 0;
135  }
136  // The two columns involved must be the ones on which the tables have been sharded on.
137  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
138  outer_td->shardedColumnId == outer_col->get_column_id()) ||
139  (outer_td->shardedColumnId == inner_col->get_column_id() &&
140  inner_td->shardedColumnId == inner_col->get_column_id())
141  ? inner_td->nShards
142  : 0;
143 }
144 
146 std::shared_ptr<PerfectJoinHashTable> PerfectJoinHashTable::getInstance(
147  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
148  const std::vector<InputTableInfo>& query_infos,
149  const Data_Namespace::MemoryLevel memory_level,
150  const HashType preferred_hash_type,
151  const int device_count,
152  ColumnCacheMap& column_cache,
153  Executor* executor) {
154  decltype(std::chrono::steady_clock::now()) ts1, ts2;
155  if (VLOGGING(1)) {
156  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
157  << " for qual: " << qual_bin_oper->toString();
158  ts1 = std::chrono::steady_clock::now();
159  }
160  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
161  const auto cols =
162  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
163  const auto inner_col = cols.first;
164  CHECK(inner_col);
165  const auto& ti = inner_col->get_type_info();
166  auto col_range =
167  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
168  if (col_range.getType() == ExpressionRangeType::Invalid) {
169  throw HashJoinFail(
170  "Could not compute range for the expressions involved in the equijoin");
171  }
172  if (ti.is_string()) {
173  // The nullable info must be the same as the source column.
174  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
175  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
176  throw HashJoinFail(
177  "Could not compute range for the expressions involved in the equijoin");
178  }
179  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
180  // If the inner column expression range is empty, use the inner col range
181  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
182  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
183  col_range = source_col_range;
184  } else {
185  col_range = ExpressionRange::makeIntRange(
186  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
187  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
188  0,
189  source_col_range.hasNulls());
190  }
191  }
192  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
193  const auto max_hash_entry_count =
195  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
196  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
197 
198  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
199  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
200  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
201 
202  if (bucketized_entry_count > max_hash_entry_count) {
203  throw TooManyHashEntries();
204  }
205 
206  if (qual_bin_oper->get_optype() == kBW_EQ &&
207  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
208  throw HashJoinFail("Cannot translate null value for kBW_EQ");
209  }
210  auto join_hash_table =
211  std::shared_ptr<PerfectJoinHashTable>(new PerfectJoinHashTable(qual_bin_oper,
212  inner_col,
213  query_infos,
214  memory_level,
215  preferred_hash_type,
216  col_range,
217  column_cache,
218  executor,
219  device_count));
220  try {
221  join_hash_table->reify();
222  } catch (const TableMustBeReplicated& e) {
223  // Throw a runtime error to abort the query
224  join_hash_table->freeHashBufferMemory();
225  throw std::runtime_error(e.what());
226  } catch (const HashJoinFail& e) {
227  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
228  // possible)
229  join_hash_table->freeHashBufferMemory();
230  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
231  "involved in equijoin | ") +
232  e.what());
233  } catch (const ColumnarConversionNotSupported& e) {
234  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
235  e.what());
236  } catch (const OutOfMemory& e) {
237  throw HashJoinFail(
238  std::string("Ran out of memory while building hash tables for equijoin | ") +
239  e.what());
240  } catch (const std::exception& e) {
241  throw std::runtime_error(
242  std::string("Fatal error while attempting to build hash tables for join: ") +
243  e.what());
244  }
245  if (VLOGGING(1)) {
246  ts2 = std::chrono::steady_clock::now();
247  VLOG(1) << "Built perfect hash table "
248  << getHashTypeString(join_hash_table->getHashType()) << " in "
249  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
250  << " ms";
251  }
252  return join_hash_table;
253 }
254 
256  const Analyzer::Expr* outer_col_expr,
257  const Executor* executor) {
258  const auto catalog = executor->getCatalog();
259  CHECK(catalog);
260  const auto inner_cd = get_column_descriptor_maybe(
261  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
262  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
263  inner_col->get_table_id(),
264  inner_cd,
265  executor->getTemporaryTables());
266  // Only strings may need dictionary translation.
267  if (!inner_ti.is_string()) {
268  return false;
269  }
270  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
271  CHECK(outer_col);
272  const auto outer_cd = get_column_descriptor_maybe(
273  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
274  // Don't want to deal with temporary tables for now, require translation.
275  if (!inner_cd || !outer_cd) {
276  return true;
277  }
278  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
279  outer_col->get_table_id(),
280  outer_cd,
281  executor->getTemporaryTables());
282  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
283  // If the two columns don't share the dictionary, translation is needed.
284  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
285 }
286 
287 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
288  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
289  const int device_id,
290  const int device_count) {
291  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
292  for (const auto& fragment : fragments) {
293  CHECK_GE(fragment.shard, 0);
294  if (fragment.shard % device_count == device_id) {
295  shards_for_device.push_back(fragment);
296  }
297  }
298  return shards_for_device;
299 }
300 
302  auto timer = DEBUG_TIMER(__func__);
304  catalog_ = const_cast<Catalog_Namespace::Catalog*>(executor_->getCatalog());
305  const auto cols =
306  get_cols(qual_bin_oper_.get(), *catalog_, executor_->temporary_tables_);
307  const auto inner_col = cols.first;
309  inner_col->get_table_id(),
311  executor_);
312  const auto& query_info = getInnerQueryInfo(inner_col).info;
313  if (query_info.fragments.empty()) {
314  return;
315  }
316  if (query_info.getNumTuplesUpperBound() >
317  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
318  throw TooManyHashEntries();
319  }
320  std::vector<std::future<void>> init_threads;
321  const int shard_count = shardCount();
322 
323  inner_outer_pairs_.push_back(cols);
324  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
325 
326  std::vector<ColumnsForDevice> columns_per_device;
327  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
328  try {
329  auto& data_mgr = catalog_->getDataMgr();
331  for (int device_id = 0; device_id < device_count_; ++device_id) {
332  dev_buff_owners.emplace_back(
333  std::make_unique<CudaAllocator>(&data_mgr, device_id));
334  }
335  }
336  for (int device_id = 0; device_id < device_count_; ++device_id) {
337  const auto fragments =
338  shard_count
339  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
340  : query_info.fragments;
341  const auto columns_for_device =
342  fetchColumnsForDevice(fragments,
343  device_id,
345  ? dev_buff_owners[device_id].get()
346  : nullptr);
347  columns_per_device.push_back(columns_for_device);
348  const auto hash_table_key = genHashTableKey(
349  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
350  init_threads.push_back(std::async(std::launch::async,
352  this,
353  hash_table_key,
354  columns_per_device[device_id],
355  hash_type_,
356  device_id,
357  logger::thread_id()));
358  }
359  for (auto& init_thread : init_threads) {
360  init_thread.wait();
361  }
362  for (auto& init_thread : init_threads) {
363  init_thread.get();
364  }
365 
366  } catch (const NeedsOneToManyHash& e) {
369  init_threads.clear();
371  CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
372  }
373  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
374  for (int device_id = 0; device_id < device_count_; ++device_id) {
375  const auto fragments =
376  shard_count
377  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
378  : query_info.fragments;
379  const auto hash_table_key = genHashTableKey(
380  fragments, inner_outer_pairs_.front().second, inner_outer_pairs_.front().first);
381  init_threads.push_back(std::async(std::launch::async,
383  this,
384  hash_table_key,
385  columns_per_device[device_id],
386  hash_type_,
387  device_id,
388  logger::thread_id()));
389  }
390  for (auto& init_thread : init_threads) {
391  init_thread.wait();
392  }
393  for (auto& init_thread : init_threads) {
394  init_thread.get();
395  }
396  }
397 }
398 
400  const std::vector<InnerOuter>& inner_outer_pairs) const {
401  for (const auto& inner_outer_pair : inner_outer_pairs) {
403  inner_outer_pair.first, inner_outer_pair.second, executor_)) {
405  }
406  }
407  return memory_level_;
408 }
409 
411  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
412  const int device_id,
413  DeviceAllocator* dev_buff_owner) {
414  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
415 
416  std::vector<JoinColumn> join_columns;
417  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
418  std::vector<JoinColumnTypeInfo> join_column_types;
419  std::vector<JoinBucketInfo> join_bucket_info;
420  std::vector<std::shared_ptr<void>> malloc_owner;
421  for (const auto& inner_outer_pair : inner_outer_pairs_) {
422  const auto inner_col = inner_outer_pair.first;
423  const auto inner_cd = get_column_descriptor_maybe(
424  inner_col->get_column_id(), inner_col->get_table_id(), *catalog_);
425  if (inner_cd && inner_cd->isVirtualCol) {
427  }
428  join_columns.emplace_back(fetchJoinColumn(inner_col,
429  fragments,
430  effective_memory_level,
431  device_id,
432  chunks_owner,
433  dev_buff_owner,
434  malloc_owner,
435  executor_,
436  &column_cache_));
437  const auto& ti = inner_col->get_type_info();
438  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
439  0,
440  0,
442  isBitwiseEq(),
443  0,
445  }
446  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
447 }
448 
450  const ColumnsForDevice& columns_for_device,
451  const HashType layout,
452  const int device_id,
453  const logger::ThreadId parent_thread_id) {
454  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
455  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
456 
457  CHECK_EQ(columns_for_device.join_columns.size(), size_t(1));
458  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
459  auto& join_column = columns_for_device.join_columns.front();
460  if (layout == HashType::OneToOne) {
461  const auto err = initHashTableForDevice(hash_table_key,
462  join_column,
463  inner_outer_pairs_.front(),
465  effective_memory_level,
466  device_id);
467  if (err) {
468  throw NeedsOneToManyHash();
469  }
470  } else {
471  const auto err = initHashTableForDevice(hash_table_key,
472  join_column,
473  inner_outer_pairs_.front(),
475  effective_memory_level,
476  device_id);
477  if (err) {
478  throw std::runtime_error("Unexpected error building one to many hash table: " +
479  std::to_string(err));
480  }
481  }
482 }
483 
485  const ChunkKey& chunk_key,
486  const JoinColumn& join_column,
487  const InnerOuter& cols,
488  const HashType layout,
489  const Data_Namespace::MemoryLevel effective_memory_level,
490  const int device_id) {
491  auto timer = DEBUG_TIMER(__func__);
492  const auto inner_col = cols.first;
493  CHECK(inner_col);
494 
495  auto hash_entry_info = get_bucketized_hash_entry_info(
496  inner_col->get_type_info(), col_range_, isBitwiseEq());
497  if (!hash_entry_info && layout == HashType::OneToOne) {
498  // TODO: what is this for?
499  return 0;
500  }
501 #ifndef HAVE_CUDA
502  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
503 #endif
504  int err{0};
505  const int32_t hash_join_invalid_val{-1};
506  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
507  CHECK(!chunk_key.empty());
508 
509  auto hash_table = initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
510  {
511  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
512  if (!hash_table) {
513  PerfectJoinHashTableBuilder builder(executor_->catalog_);
514  if (layout == HashType::OneToOne) {
515  builder.initOneToOneHashTableOnCpu(join_column,
516  col_range_,
517  isBitwiseEq(),
518  cols,
519  hash_entry_info,
520  hash_join_invalid_val,
521  executor_);
522  hash_table = builder.getHashTable();
523  } else {
524  builder.initOneToManyHashTableOnCpu(join_column,
525  col_range_,
526  isBitwiseEq(),
527  cols,
528  hash_entry_info,
529  hash_join_invalid_val,
530  executor_);
531  hash_table = builder.getHashTable();
532  }
533  } else {
534  if (layout == HashType::OneToOne &&
535  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU) >
536  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t)) {
537  // TODO: can this ever happen?
538  // Too many hash entries, need to retry with a 1:many table
539  throw NeedsOneToManyHash();
540  }
541  }
542  }
543  if (inner_col->get_table_id() > 0) {
544  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, hash_table, cols);
545  }
546  // Transfer the hash table on the GPU if we've only built it on CPU
547  // but the query runs on GPU (join on dictionary encoded columns).
549 #ifdef HAVE_CUDA
550  const auto& ti = inner_col->get_type_info();
551  CHECK(ti.is_string());
552  auto catalog = executor_->getCatalog();
553  CHECK(catalog);
554  auto& data_mgr = catalog->getDataMgr();
555  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
556 
557  PerfectJoinHashTableBuilder gpu_builder(executor_->catalog_);
558  gpu_builder.allocateDeviceMemory(join_column,
559  hash_table->getLayout(),
560  hash_entry_info,
561  shardCount(),
562  device_id,
563  device_count_);
564  std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.getHashTable();
565  CHECK(gpu_hash_table);
566  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
567  CHECK(gpu_buffer_ptr);
568 
569  CHECK(hash_table);
570  // GPU size returns reserved size
571  CHECK_LE(hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
572  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
573  copy_to_gpu(&data_mgr,
574  reinterpret_cast<CUdeviceptr>(gpu_buffer_ptr),
575  hash_table->getCpuBuffer(),
576  hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
577  device_id);
578  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
579  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
580 #else
581  UNREACHABLE();
582 #endif
583  } else {
584  CHECK(hash_table);
585  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
586  hash_tables_for_device_[device_id] = hash_table;
587  }
588  } else {
589 #ifdef HAVE_CUDA
590  PerfectJoinHashTableBuilder builder(executor_->catalog_);
591  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
592  builder.allocateDeviceMemory(
593  join_column, layout, hash_entry_info, shardCount(), device_id, device_count_);
594  builder.initHashTableOnGpu(chunk_key,
595  join_column,
596  col_range_,
597  isBitwiseEq(),
598  cols,
599  layout,
600  hash_entry_info,
601  shardCount(),
602  hash_join_invalid_val,
603  device_id,
605  executor_);
606  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
607  hash_tables_for_device_[device_id] = builder.getHashTable();
608 #else
609  UNREACHABLE();
610 #endif
611  }
612 
613  return err;
614 }
615 
617  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
618  const Analyzer::Expr* outer_col_expr,
619  const Analyzer::ColumnVar* inner_col) const {
620  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
621  inner_col->get_table_id(),
622  inner_col->get_column_id()};
623  const auto& ti = inner_col->get_type_info();
624  if (ti.is_string()) {
625  CHECK_EQ(kENCODING_DICT, ti.get_compression());
626  size_t outer_elem_count = 0;
627  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
628  CHECK(outer_col);
629  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
630  for (auto& frag : outer_query_info.fragments) {
631  outer_elem_count = frag.getNumTuples();
632  }
633  hash_table_key.push_back(outer_elem_count);
634  }
635  if (fragments.size() < 2) {
636  hash_table_key.push_back(fragments.front().fragmentId);
637  }
638  return hash_table_key;
639 }
640 
641 std::shared_ptr<PerfectHashTable> PerfectJoinHashTable::initHashTableOnCpuFromCache(
642  const ChunkKey& chunk_key,
643  const size_t num_elements,
644  const InnerOuter& cols) {
645  auto timer = DEBUG_TIMER(__func__);
646  CHECK_GE(chunk_key.size(), size_t(2));
647  if (chunk_key[1] < 0) {
648  // Do not cache hash tables over intermediate results
649  return nullptr;
650  }
651  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
653  *cols.first,
654  outer_col ? *outer_col : *cols.first,
655  num_elements,
656  chunk_key,
657  qual_bin_oper_->get_optype()};
658  auto hash_table_opt = (hash_table_cache_->get(cache_key));
659  return hash_table_opt ? *hash_table_opt : nullptr;
660 }
661 
663  const size_t num_elements,
664  HashTableCacheValue hash_table,
665  const InnerOuter& cols) {
666  CHECK_GE(chunk_key.size(), size_t(2));
667  if (chunk_key[1] < 0) {
668  // Do not cache hash tables over intermediate results
669  return;
670  }
671  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
673  *cols.first,
674  outer_col ? *outer_col : *cols.first,
675  num_elements,
676  chunk_key,
677  qual_bin_oper_->get_optype()};
679  CHECK(hash_table && !hash_table->getGpuBuffer());
680  hash_table_cache_->insert(cache_key, hash_table);
681 }
682 
683 llvm::Value* PerfectJoinHashTable::codegenHashTableLoad(const size_t table_idx) {
684  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
685  const auto hash_ptr = HashJoin::codegenHashTableLoad(table_idx, executor_);
686  if (hash_ptr->getType()->isIntegerTy(64)) {
687  return hash_ptr;
688  }
689  CHECK(hash_ptr->getType()->isPointerTy());
690  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
691  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
692  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
693 }
694 
695 std::vector<llvm::Value*> PerfectJoinHashTable::getHashJoinArgs(
696  llvm::Value* hash_ptr,
697  const Analyzer::Expr* key_col,
698  const int shard_count,
699  const CompilationOptions& co) {
700  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
701  CodeGenerator code_generator(executor_);
702  const auto key_lvs = code_generator.codegen(key_col, true, co);
703  CHECK_EQ(size_t(1), key_lvs.size());
704  auto const& key_col_ti = key_col->get_type_info();
705  auto hash_entry_info =
707 
708  std::vector<llvm::Value*> hash_join_idx_args{
709  hash_ptr,
710  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
711  executor_->cgen_state_->llInt(col_range_.getIntMin()),
712  executor_->cgen_state_->llInt(col_range_.getIntMax())};
713  if (shard_count) {
714  const auto expected_hash_entry_count =
716  const auto entry_count_per_shard =
717  (expected_hash_entry_count + shard_count - 1) / shard_count;
718  hash_join_idx_args.push_back(
719  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
720  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
721  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
722  }
723  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
724  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
725  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
726  inline_fixed_encoding_null_val(key_col_logical_ti)));
727  }
728  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
729  if (isBitwiseEq()) {
730  if (special_date_bucketization_case) {
731  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
732  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
733  } else {
734  hash_join_idx_args.push_back(
735  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
736  }
737  }
738 
739  if (special_date_bucketization_case) {
740  hash_join_idx_args.emplace_back(
741  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
742  }
743 
744  return hash_join_idx_args;
745 }
746 
748  const size_t index) {
749  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
750  const auto cols = get_cols(
751  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
752  auto key_col = cols.second;
753  CHECK(key_col);
754  auto val_col = cols.first;
755  CHECK(val_col);
756  auto pos_ptr = codegenHashTableLoad(index);
757  CHECK(pos_ptr);
758  const int shard_count = shardCount();
759  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
760  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
761  if (key_col_var && val_col_var &&
763  key_col_var,
764  val_col_var,
765  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
766  throw std::runtime_error(
767  "Query execution fails because the query contains not supported self-join "
768  "pattern. We suspect the query requires multiple left-deep join tree due to "
769  "the "
770  "join condition of the self-join and is not supported for now. Please consider "
771  "rewriting table order in "
772  "FROM clause.");
773  }
774  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
775  const int64_t sub_buff_size = getComponentBufferSize();
776  const auto& key_col_ti = key_col->get_type_info();
777 
778  auto bucketize = (key_col_ti.get_type() == kDATE);
779  return HashJoin::codegenMatchingSet(hash_join_idx_args,
780  shard_count,
781  !key_col_ti.get_notnull(),
782  isBitwiseEq(),
783  sub_buff_size,
784  executor_,
785  bucketize);
786 }
787 
789  return 0;
790 }
791 
793  return getComponentBufferSize();
794 }
795 
797  return 2 * getComponentBufferSize();
798 }
799 
801  if (hash_tables_for_device_.empty()) {
802  return 0;
803  }
804  auto hash_table = hash_tables_for_device_.front();
805  if (hash_table && hash_table->getLayout() == HashType::OneToMany) {
806  return hash_table->getEntryCount() * sizeof(int32_t);
807  } else {
808  return 0;
809  }
810 }
811 
813  CHECK_LT(device_id, hash_tables_for_device_.size());
814  return hash_tables_for_device_[device_id].get();
815 }
816 
818  const int device_id,
819  bool raw) const {
820  auto buffer = getJoinHashBuffer(device_type, device_id);
821  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
822  auto hash_table = getHashTableForDevice(device_id);
823 #ifdef HAVE_CUDA
824  std::unique_ptr<int8_t[]> buffer_copy;
825  if (device_type == ExecutorDeviceType::GPU) {
826  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
827 
828  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
829  buffer_copy.get(),
830  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
831  buffer_size,
832  device_id);
833  }
834  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
835 #else
836  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
837 #endif // HAVE_CUDA
838  auto ptr2 = ptr1 + offsetBufferOff();
839  auto ptr3 = ptr1 + countBufferOff();
840  auto ptr4 = ptr1 + payloadBufferOff();
841  return HashTable::toString("perfect",
843  0,
844  0,
845  hash_table ? hash_table->getEntryCount() : 0,
846  ptr1,
847  ptr2,
848  ptr3,
849  ptr4,
850  buffer_size,
851  raw);
852 }
853 
854 std::set<DecodedJoinHashBufferEntry> PerfectJoinHashTable::toSet(
855  const ExecutorDeviceType device_type,
856  const int device_id) const {
857  auto buffer = getJoinHashBuffer(device_type, device_id);
858  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
859  auto hash_table = getHashTableForDevice(device_id);
860 #ifdef HAVE_CUDA
861  std::unique_ptr<int8_t[]> buffer_copy;
862  if (device_type == ExecutorDeviceType::GPU) {
863  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
864 
865  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
866  buffer_copy.get(),
867  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
868  buffer_size,
869  device_id);
870  }
871  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
872 #else
873  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
874 #endif // HAVE_CUDA
875  auto ptr2 = ptr1 + offsetBufferOff();
876  auto ptr3 = ptr1 + countBufferOff();
877  auto ptr4 = ptr1 + payloadBufferOff();
878  return HashTable::toSet(0,
879  0,
880  hash_table ? hash_table->getEntryCount() : 0,
881  ptr1,
882  ptr2,
883  ptr3,
884  ptr4,
885  buffer_size);
886 }
887 
889  const size_t index) {
890  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
891  using namespace std::string_literals;
892 
894  const auto cols = get_cols(
895  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
896  auto key_col = cols.second;
897  CHECK(key_col);
898  auto val_col = cols.first;
899  CHECK(val_col);
900  CodeGenerator code_generator(executor_);
901  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
902  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
903  if (key_col_var && val_col_var &&
905  key_col_var,
906  val_col_var,
907  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
908  throw std::runtime_error(
909  "Query execution fails because the query contains not supported self-join "
910  "pattern. We suspect the query requires multiple left-deep join tree due to "
911  "the "
912  "join condition of the self-join and is not supported for now. Please consider "
913  "rewriting table order in "
914  "FROM clause.");
915  }
916  const auto key_lvs = code_generator.codegen(key_col, true, co);
917  CHECK_EQ(size_t(1), key_lvs.size());
918  auto hash_ptr = codegenHashTableLoad(index);
919  CHECK(hash_ptr);
920  const int shard_count = shardCount();
921  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
922 
923  const auto& key_col_ti = key_col->get_type_info();
924  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
925  : "hash_join_idx"s);
926 
927  if (isBitwiseEq()) {
928  fname += "_bitwise";
929  }
930  if (shard_count) {
931  fname += "_sharded";
932  }
933 
934  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
935  fname += "_nullable";
936  }
937  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
938 }
939 
941  const Analyzer::ColumnVar* inner_col) const {
942  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
943 }
944 
946  const int inner_table_id,
947  const std::vector<InputTableInfo>& query_infos) {
948  std::optional<size_t> ti_idx;
949  for (size_t i = 0; i < query_infos.size(); ++i) {
950  if (inner_table_id == query_infos[i].table_id) {
951  ti_idx = i;
952  break;
953  }
954  }
955  CHECK(ti_idx);
956  return query_infos[*ti_idx];
957 }
958 
959 size_t get_entries_per_device(const size_t total_entries,
960  const size_t shard_count,
961  const size_t device_count,
962  const Data_Namespace::MemoryLevel memory_level) {
963  const auto entries_per_shard =
964  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
965  size_t entries_per_device = entries_per_shard;
966  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
967  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
968  CHECK_GT(shards_per_device, 0u);
969  entries_per_device = entries_per_shard * shards_per_device;
970  }
971  return entries_per_device;
972 }
973 
977  : 0;
978 }
979 
981  return qual_bin_oper_->get_optype() == kBW_EQ;
982 }
int get_table_id() const
Definition: Analyzer.h:194
llvm::Value * codegenHashTableLoad(const size_t table_idx)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count)
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:233
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string cat(Ts &&...args)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:76
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
ExecutorDeviceType
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
size_t num_elems
#define const
const Expr * get_right_operand() const
Definition: Analyzer.h:443
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:269
#define UNREACHABLE()
Definition: Logger.h:247
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:237
#define CHECK_GE(x, y)
Definition: Logger.h:216
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:911
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:324
size_t payloadBufferOff() const noexceptoverride
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
void freeHashBufferMemory()
Definition: HashJoin.h:256
#define CHECK_GT(x, y)
Definition: Logger.h:215
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:167
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:221
std::shared_ptr< PerfectHashTable > HashTableCacheValue
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
Definition: HashJoin.cpp:570
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const InnerOuter &cols)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::unique_ptr< HashTableCache< JoinHashTableCacheKey, HashTableCacheValue > > hash_table_cache_
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:525
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::unique_ptr< PerfectHashTable > getHashTable()
#define VLOGGING(n)
Definition: Logger.h:201
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:28
#define CHECK_LT(x, y)
Definition: Logger.h:213
Definition: sqltypes.h:52
const std::vector< InputTableInfo > & query_infos_
#define CHECK_LE(x, y)
Definition: Logger.h:214
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:133
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:219
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
uint64_t ThreadId
Definition: Logger.h:312
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
ThreadId thread_id()
Definition: Logger.cpp:732
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
Catalog_Namespace::Catalog * catalog_
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
Definition: sqldefs.h:31
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
Definition: Analyzer.h:442
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
int get_column_id() const
Definition: Analyzer.h:195
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
HashTable * getHashTableForDevice(const size_t device_id) const
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:553
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashType
Definition: HashTable.h:19
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:543
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:79
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, HashTableCacheValue hash_table, const InnerOuter &cols)
#define VLOG(n)
Definition: Logger.h:297
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override