OmniSciDB  04ee39c94c
JoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "JoinHashTable.h"
18 #include "CodeGenerator.h"
19 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "ExpressionRewrite.h"
22 #include "HashJoinRuntime.h"
23 #include "RangeTableIndexVisitor.h"
24 #include "RuntimeFunctions.h"
25 #include "Shared/Logger.h"
26 
27 #include <future>
28 #include <numeric>
29 #include <thread>
30 
31 namespace {
32 
34  public:
35  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
36 };
37 
38 } // namespace
39 
41  const Analyzer::Expr* rhs,
42  const Catalog_Namespace::Catalog& cat,
43  const TemporaryTables* temporary_tables,
44  const bool is_overlaps_join) {
45  const auto& lhs_ti = lhs->get_type_info();
46  const auto& rhs_ti = rhs->get_type_info();
47  if (!is_overlaps_join) {
48  if (lhs_ti.get_type() != rhs_ti.get_type()) {
49  throw HashJoinFail("Equijoin types must be identical, found: " +
50  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
51  }
52  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string()) {
53  throw HashJoinFail("Cannot apply hash join to inner column type " +
54  lhs_ti.get_type_name());
55  }
56  }
57 
58  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
59  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
60  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
61  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
62  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
63  throw HashJoinFail("Cannot use hash join for given expression");
64  }
65  const auto lhs_col =
66  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
67  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
68  const auto rhs_col =
69  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
70  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
71  if (!lhs_col && !rhs_col) {
72  throw HashJoinFail("Cannot use hash join for given expression");
73  }
74  const Analyzer::ColumnVar* inner_col{nullptr};
75  const Analyzer::ColumnVar* outer_col{nullptr};
76  auto outer_ti = lhs_ti;
77  auto inner_ti = rhs_ti;
78  const Analyzer::Expr* outer_expr{lhs};
79  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
80  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
81  inner_col = rhs_col;
82  outer_col = lhs_col;
83  } else {
84  if (lhs_col && lhs_col->get_rte_idx() == 0) {
85  throw HashJoinFail("Cannot use hash join for given expression");
86  }
87  inner_col = lhs_col;
88  outer_col = rhs_col;
89  std::swap(outer_ti, inner_ti);
90  outer_expr = rhs;
91  }
92  if (!inner_col) {
93  throw HashJoinFail("Cannot use hash join for given expression");
94  }
95  if (!outer_col) {
96  MaxRangeTableIndexVisitor rte_idx_visitor;
97  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
98  // The inner column candidate is not actually inner; the outer
99  // expression contains columns which are at least as deep.
100  if (inner_col->get_rte_idx() <= outer_rte_idx) {
101  throw HashJoinFail("Cannot use hash join for given expression");
102  }
103  }
104  // We need to fetch the actual type information from the catalog since Analyzer
105  // always reports nullable as true for inner table columns in left joins.
106  const auto inner_col_cd = get_column_descriptor_maybe(
107  inner_col->get_column_id(), inner_col->get_table_id(), cat);
108  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
109  inner_col->get_table_id(),
110  inner_col_cd,
111  temporary_tables);
112  const auto& outer_col_ti =
113  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
114  ? outer_col->get_type_info()
115  : outer_ti;
116  if (is_overlaps_join) {
117  if (!inner_col_real_ti.is_array()) {
118  throw HashJoinFail(
119  "Overlaps join only supported for inner columns with array type");
120  }
121  if (!(inner_col_real_ti.is_fixlen_array() && inner_col_real_ti.get_size() == 32)) {
122  throw HashJoinFail(
123  "Overlaps join only supported for 4-element double fixed length arrays");
124  }
125  if (!(outer_col_ti.get_type() == kPOINT)) {
126  throw HashJoinFail(
127  "Overlaps join only supported for geometry outer columns of type point");
128  }
129  } else {
130  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
131  (inner_col_real_ti.is_string() &&
132  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
133  throw HashJoinFail(
134  "Can only apply hash join to integer-like types and dictionary encoded "
135  "strings");
136  }
137  }
138  return {inner_col, outer_col ? outer_col : outer_expr};
139 }
140 
141 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
142  const Catalog_Namespace::Catalog& cat,
143  const TemporaryTables* temporary_tables) {
144  std::vector<InnerOuter> result;
145  const auto lhs_tuple_expr =
146  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
147  const auto rhs_tuple_expr =
148  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
149 
150  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
151  if (lhs_tuple_expr) {
152  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
153  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
154  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
155  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
156  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
157  rhs_tuple[i].get(),
158  cat,
159  temporary_tables,
160  condition->is_overlaps_oper()));
161  }
162  } else {
163  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
164  result.push_back(normalize_column_pair(condition->get_left_operand(),
165  condition->get_right_operand(),
166  cat,
167  temporary_tables,
168  condition->is_overlaps_oper()));
169  }
170 
171  return result;
172 }
173 
174 namespace {
175 
176 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> get_cols(
177  const Analyzer::BinOper* qual_bin_oper,
178  const Catalog_Namespace::Catalog& cat,
179  const TemporaryTables* temporary_tables) {
180  const auto lhs = qual_bin_oper->get_left_operand();
181  const auto rhs = qual_bin_oper->get_right_operand();
182  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
183 }
184 
186  ExpressionRange const& col_range,
187  bool const is_bw_eq) {
188  using EmptyRangeSize = boost::optional<size_t>;
189  auto empty_range_check = [](ExpressionRange const& col_range,
190  bool const is_bw_eq) -> EmptyRangeSize {
191  if (col_range.getIntMin() > col_range.getIntMax()) {
192  CHECK_EQ(col_range.getIntMin(), int64_t(0));
193  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
194  if (is_bw_eq) {
195  return size_t(1);
196  }
197  return size_t(0);
198  }
199  return EmptyRangeSize{};
200  };
201 
202  auto empty_range = empty_range_check(col_range, is_bw_eq);
203  if (empty_range) {
204  return {size_t(*empty_range), 1};
205  }
206 
207  int64_t bucket_normalization =
208  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
209  CHECK_GT(bucket_normalization, 0);
210  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
211  bucket_normalization};
212 }
213 
214 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
215  if (col_range.getIntMin() > col_range.getIntMax()) {
216  CHECK_EQ(col_range.getIntMin(), int64_t(0));
217  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
218  return is_bw_eq ? 1 : 0;
219  }
220  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
221 }
222 
223 } // namespace
224 
225 std::vector<std::pair<JoinHashTable::JoinHashTableCacheKey,
226  std::shared_ptr<std::vector<int32_t>>>>
229 
230 size_t get_shard_count(const Analyzer::BinOper* join_condition,
231  const Executor* executor) {
232  const Analyzer::ColumnVar* inner_col{nullptr};
233  const Analyzer::Expr* outer_col{nullptr};
234  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
235  try {
236  std::tie(inner_col, outer_col) =
237  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
238  } catch (...) {
239  return 0;
240  }
241  if (!inner_col || !outer_col) {
242  return 0;
243  }
244  return get_shard_count({inner_col, outer_col}, executor);
245 }
246 
247 namespace {
248 
249 bool shard_count_less_or_equal_device_count(const int inner_table_id,
250  const Executor* executor) {
251  const auto inner_table_info = executor->getTableInfo(inner_table_id);
252  std::unordered_set<int> device_holding_fragments;
253  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
254  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
255  for (const auto& fragment : inner_table_info.fragments) {
256  if (fragment.shard != -1) {
257  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
258  if (!it_ok.second) {
259  return false;
260  }
261  }
262  }
263  return true;
264 }
265 
266 } // namespace
267 
269  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
270  const Executor* executor) {
271  const auto inner_col = equi_pair.first;
272  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
273  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
274  return 0;
275  }
276  if (outer_col->get_rte_idx()) {
277  return 0;
278  }
279  if (inner_col->get_type_info() != outer_col->get_type_info()) {
280  return 0;
281  }
282  const auto catalog = executor->getCatalog();
283  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
284  CHECK(inner_td);
285  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
286  CHECK(outer_td);
287  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
288  inner_td->nShards != outer_td->nShards) {
289  return 0;
290  }
291  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
292  return 0;
293  }
294  // The two columns involved must be the ones on which the tables have been sharded on.
295  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
296  outer_td->shardedColumnId == outer_col->get_column_id()) ||
297  (outer_td->shardedColumnId == inner_col->get_column_id() &&
298  inner_td->shardedColumnId == inner_col->get_column_id())
299  ? inner_td->nShards
300  : 0;
301 }
302 
303 std::shared_ptr<JoinHashTable> JoinHashTable::getInstance(
304  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
305  const std::vector<InputTableInfo>& query_infos,
306  const Data_Namespace::MemoryLevel memory_level,
307  const HashType preferred_hash_type,
308  const int device_count,
309  ColumnCacheMap& column_cache,
310  Executor* executor) {
311  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
312  const auto cols =
313  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
314  const auto inner_col = cols.first;
315  CHECK(inner_col);
316  const auto& ti = inner_col->get_type_info();
317  auto col_range =
318  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
319  if (col_range.getType() == ExpressionRangeType::Invalid) {
320  throw HashJoinFail(
321  "Could not compute range for the expressions involved in the equijoin");
322  }
323  if (ti.is_string()) {
324  // The nullable info must be the same as the source column.
325  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
326  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
327  throw HashJoinFail(
328  "Could not compute range for the expressions involved in the equijoin");
329  }
330  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
331  // If the inner column expression range is empty, use the inner col range
332  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
333  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
334  col_range = source_col_range;
335  } else {
336  col_range = ExpressionRange::makeIntRange(
337  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
338  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
339  0,
340  source_col_range.hasNulls());
341  }
342  }
343  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
344  const auto max_hash_entry_count =
346  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
347  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
348 
349  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
350  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
351  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
352 
353  if (bucketized_entry_count > max_hash_entry_count) {
354  throw TooManyHashEntries();
355  }
356 
357  if (qual_bin_oper->get_optype() == kBW_EQ &&
358  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
359  throw HashJoinFail("Cannot translate null value for kBW_EQ");
360  }
361  auto join_hash_table =
362  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
363  inner_col,
364  query_infos,
365  memory_level,
366  preferred_hash_type,
367  col_range,
368  column_cache,
369  executor,
370  device_count));
371  try {
372  join_hash_table->reify(device_count);
373  } catch (const TableMustBeReplicated& e) {
374  // Throw a runtime error to abort the query
375  join_hash_table->freeHashBufferMemory();
376  throw std::runtime_error(e.what());
377  } catch (const HashJoinFail& e) {
378  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
379  // possible)
380  join_hash_table->freeHashBufferMemory();
381  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
382  "involved in equijoin | ") +
383  e.what());
384  } catch (const ColumnarConversionNotSupported& e) {
385  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
386  e.what());
387  } catch (const OutOfMemory& e) {
388  throw HashJoinFail(
389  std::string("Ran out of memory while building hash tables for equijoin | ") +
390  e.what());
391  } catch (const std::exception& e) {
392  throw std::runtime_error(
393  std::string("Fatal error while attempting to build hash tables for join: ") +
394  e.what());
395  }
396  return join_hash_table;
397 }
398 
399 std::pair<const int8_t*, size_t> JoinHashTable::getOneColumnFragment(
400  const Analyzer::ColumnVar& hash_col,
401  const Fragmenter_Namespace::FragmentInfo& fragment,
402  const Data_Namespace::MemoryLevel effective_mem_lvl,
403  const int device_id,
404  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
405  return ColumnFetcher::getOneColumnFragment(executor_,
406  hash_col,
407  fragment,
408  effective_mem_lvl,
409  device_id,
410  chunks_owner,
411  column_cache_);
412 }
413 
414 std::pair<const int8_t*, size_t> JoinHashTable::getAllColumnFragments(
415  const Analyzer::ColumnVar& hash_col,
416  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
417  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
418  std::lock_guard<std::mutex> linearized_multifrag_column_lock(
419  linearized_multifrag_column_mutex_);
420  if (linearized_multifrag_column_.first) {
421  return linearized_multifrag_column_;
422  }
423  const int8_t* col_buff;
424  size_t total_elem_count;
425  std::tie(col_buff, total_elem_count) = ColumnFetcher::getAllColumnFragments(
426  executor_, hash_col, fragments, chunks_owner, column_cache_);
427  linearized_multifrag_column_owner_.addColBuffer(col_buff);
428  if (!shardCount()) {
429  linearized_multifrag_column_ = {col_buff, total_elem_count};
430  }
431  return {col_buff, total_elem_count};
432 }
433 
435  const Analyzer::Expr* outer_col_expr,
436  const Executor* executor) {
437  const auto catalog = executor->getCatalog();
438  CHECK(catalog);
439  const auto inner_cd = get_column_descriptor_maybe(
440  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
441  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
442  inner_col->get_table_id(),
443  inner_cd,
444  executor->getTemporaryTables());
445  // Only strings may need dictionary translation.
446  if (!inner_ti.is_string()) {
447  return false;
448  }
449  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
450  CHECK(outer_col);
451  const auto outer_cd = get_column_descriptor_maybe(
452  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
453  // Don't want to deal with temporary tables for now, require translation.
454  if (!inner_cd || !outer_cd) {
455  return true;
456  }
457  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
458  outer_col->get_table_id(),
459  outer_cd,
460  executor->getTemporaryTables());
461  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
462  // If the two columns don't share the dictionary, translation is needed.
463  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
464 }
465 
466 std::deque<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
467  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
468  const int device_id,
469  const int device_count) {
470  std::deque<Fragmenter_Namespace::FragmentInfo> shards_for_device;
471  for (const auto& fragment : fragments) {
472  CHECK_GE(fragment.shard, 0);
473  if (fragment.shard % device_count == device_id) {
474  shards_for_device.push_back(fragment);
475  }
476  }
477  return shards_for_device;
478 }
479 
480 void JoinHashTable::reify(const int device_count) {
481  CHECK_LT(0, device_count);
482  const auto& catalog = *executor_->getCatalog();
483  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
484  const auto inner_col = cols.first;
485  checkHashJoinReplicationConstraint(inner_col->get_table_id());
486  const auto& query_info = getInnerQueryInfo(inner_col).info;
487  if (query_info.fragments.empty()) {
488  return;
489  }
490  if (query_info.getNumTuplesUpperBound() >
491  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
492  throw TooManyHashEntries();
493  }
494 #ifdef HAVE_CUDA
495  gpu_hash_table_buff_.resize(device_count);
496  gpu_hash_table_err_buff_.resize(device_count);
497 #endif // HAVE_CUDA
498  std::vector<std::future<void>> init_threads;
499  const int shard_count = shardCount();
500 
501  try {
502  for (int device_id = 0; device_id < device_count; ++device_id) {
503  const auto fragments =
504  shard_count
505  ? only_shards_for_device(query_info.fragments, device_id, device_count)
506  : query_info.fragments;
507  init_threads.push_back(
508  std::async(std::launch::async,
512  this,
513  fragments,
514  device_id));
515  }
516  for (auto& init_thread : init_threads) {
517  init_thread.wait();
518  }
519  for (auto& init_thread : init_threads) {
520  init_thread.get();
521  }
522 
523  } catch (const NeedsOneToManyHash& e) {
525  freeHashBufferMemory();
526  init_threads.clear();
527  for (int device_id = 0; device_id < device_count; ++device_id) {
528  const auto fragments =
529  shard_count
530  ? only_shards_for_device(query_info.fragments, device_id, device_count)
531  : query_info.fragments;
532 
533  init_threads.push_back(std::async(std::launch::async,
535  this,
536  fragments,
537  device_id));
538  }
539  for (auto& init_thread : init_threads) {
540  init_thread.wait();
541  }
542  for (auto& init_thread : init_threads) {
543  init_thread.get();
544  }
545  }
546 }
547 
548 std::pair<const int8_t*, size_t> JoinHashTable::fetchFragments(
549  const Analyzer::ColumnVar* hash_col,
550  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragment_info,
551  const Data_Namespace::MemoryLevel effective_memory_level,
552  const int device_id,
553  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
554  ThrustAllocator& dev_buff_owner) {
555  static std::mutex fragment_fetch_mutex;
556  const bool has_multi_frag = fragment_info.size() > 1;
557  const auto& catalog = *executor_->getCatalog();
558  auto& data_mgr = catalog.getDataMgr();
559  const auto& first_frag = fragment_info.front();
560  const int8_t* col_buff = nullptr;
561  size_t elem_count = 0;
562 
563  const size_t elem_width = hash_col->get_type_info().get_size();
564  if (has_multi_frag) {
565  std::tie(col_buff, elem_count) =
566  getAllColumnFragments(*hash_col, fragment_info, chunks_owner);
567  }
568 
569  {
570  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
571  if (has_multi_frag) {
572  if (effective_memory_level == Data_Namespace::GPU_LEVEL && col_buff) {
573  CHECK_NE(elem_count, size_t(0));
574  int8_t* dev_col_buff = nullptr;
575  dev_col_buff = dev_buff_owner.allocate(elem_count * elem_width);
576  copy_to_gpu(&data_mgr,
577  reinterpret_cast<CUdeviceptr>(dev_col_buff),
578  col_buff,
579  elem_count * elem_width,
580  device_id);
581  col_buff = dev_col_buff;
582  }
583  } else {
584  std::tie(col_buff, elem_count) = getOneColumnFragment(
585  *hash_col, first_frag, effective_memory_level, device_id, chunks_owner);
586  }
587  }
588  return {col_buff, elem_count};
589 }
590 
592  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
593  const Analyzer::Expr* outer_col_expr,
594  const Analyzer::ColumnVar* inner_col) const {
595  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
596  inner_col->get_table_id(),
597  inner_col->get_column_id()};
598  const auto& ti = inner_col->get_type_info();
599  if (ti.is_string()) {
600  CHECK_EQ(kENCODING_DICT, ti.get_compression());
601  size_t outer_elem_count = 0;
602  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
603  CHECK(outer_col);
604  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
605  for (auto& frag : outer_query_info.fragments) {
606  outer_elem_count = frag.getNumTuples();
607  }
608  hash_table_key.push_back(outer_elem_count);
609  }
610  if (fragments.size() < 2) {
611  hash_table_key.push_back(fragments.front().fragmentId);
612  }
613  return hash_table_key;
614 }
615 
617  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
618  const int device_id) {
619  const auto& catalog = *executor_->getCatalog();
620  auto& data_mgr = catalog.getDataMgr();
621  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
622  const auto inner_col = cols.first;
623  CHECK(inner_col);
624  const auto inner_cd = get_column_descriptor_maybe(
625  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
626  if (inner_cd && inner_cd->isVirtualCol) {
628  }
629  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
630  // Since we don't have the string dictionary payloads on the GPU, we'll build
631  // the join hash table on the CPU and transfer it to the GPU.
632  const auto effective_memory_level =
633  needs_dictionary_translation(inner_col, cols.second, executor_)
635  : memory_level_;
636  if (fragments.empty()) {
637  // No data in this fragment. Still need to create a hash table and initialize it
638  // properly.
639  ChunkKey empty_chunk;
640  initHashTableForDevice(
641  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
642  }
643 
644  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
645  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
646  const int8_t* col_buff = nullptr;
647  size_t elem_count = 0;
648 
649  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
650  fragments,
651  effective_memory_level,
652  device_id,
653  chunks_owner,
654  dev_buff_owner);
655 
656  initHashTableForDevice(genHashTableKey(fragments, cols.second, inner_col),
657  col_buff,
658  elem_count,
659  cols,
660  effective_memory_level,
661  device_id);
662 }
663 
665  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
666  const int device_id) {
667  const auto& catalog = *executor_->getCatalog();
668  auto& data_mgr = catalog.getDataMgr();
669  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
670  const auto inner_col = cols.first;
671  CHECK(inner_col);
672  const auto inner_cd = get_column_descriptor_maybe(
673  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
674  if (inner_cd && inner_cd->isVirtualCol) {
676  }
677  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
678  // Since we don't have the string dictionary payloads on the GPU, we'll build
679  // the join hash table on the CPU and transfer it to the GPU.
680  const auto effective_memory_level =
681  needs_dictionary_translation(inner_col, cols.second, executor_)
683  : memory_level_;
684  if (fragments.empty()) {
685  ChunkKey empty_chunk;
686  initOneToManyHashTable(
687  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
688  return;
689  }
690 
691  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
692  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
693  const int8_t* col_buff = nullptr;
694  size_t elem_count = 0;
695 
696  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
697  fragments,
698  effective_memory_level,
699  device_id,
700  chunks_owner,
701  dev_buff_owner);
702 
703  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
704  col_buff,
705  elem_count,
706  cols,
707  effective_memory_level,
708  device_id);
709 }
710 
712  if (!g_cluster) {
713  return;
714  }
715  if (table_id >= 0) {
716  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
717  CHECK(inner_td);
718  size_t shard_count{0};
719  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
720  if (!shard_count && !table_is_replicated(inner_td)) {
721  throw TableMustBeReplicated(inner_td->tableName);
722  }
723  }
724 }
725 
727  const int8_t* col_buff,
728  const size_t num_elements,
729  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
730  const HashEntryInfo hash_entry_info,
731  const int32_t hash_join_invalid_val) {
732  const auto inner_col = cols.first;
733  CHECK(inner_col);
734  const auto& ti = inner_col->get_type_info();
735  if (!cpu_hash_table_buff_) {
736  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
737  hash_entry_info.getNormalizedHashEntryCount());
738  const StringDictionaryProxy* sd_inner_proxy{nullptr};
739  const StringDictionaryProxy* sd_outer_proxy{nullptr};
740  if (ti.is_string()) {
741  CHECK_EQ(kENCODING_DICT, ti.get_compression());
742  sd_inner_proxy = executor_->getStringDictionaryProxy(
743  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
744  CHECK(sd_inner_proxy);
745  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
746  CHECK(outer_col);
747  sd_outer_proxy = executor_->getStringDictionaryProxy(
748  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
749  CHECK(sd_outer_proxy);
750  }
751  int thread_count = cpu_threads();
752  std::vector<std::thread> init_cpu_buff_threads;
753  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
754  init_cpu_buff_threads.emplace_back(
755  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
756  init_hash_join_buff(&(*cpu_hash_table_buff_)[0],
757  hash_entry_info.getNormalizedHashEntryCount(),
758  hash_join_invalid_val,
759  thread_idx,
760  thread_count);
761  });
762  }
763  for (auto& t : init_cpu_buff_threads) {
764  t.join();
765  }
766  init_cpu_buff_threads.clear();
767  int err{0};
768  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
769  init_cpu_buff_threads.emplace_back([this,
770  hash_join_invalid_val,
771  col_buff,
772  num_elements,
773  sd_inner_proxy,
774  sd_outer_proxy,
775  thread_idx,
776  thread_count,
777  &ti,
778  &err,
779  hash_entry_info] {
780  int partial_err =
781  fill_hash_join_buff_bucketized(&(*cpu_hash_table_buff_)[0],
782  hash_join_invalid_val,
783  {col_buff, num_elements},
784  {static_cast<size_t>(ti.get_size()),
785  col_range_.getIntMin(),
786  col_range_.getIntMax(),
788  isBitwiseEq(),
789  col_range_.getIntMax() + 1,
791  sd_inner_proxy,
792  sd_outer_proxy,
793  thread_idx,
794  thread_count,
795  hash_entry_info.bucket_normalization);
796  __sync_val_compare_and_swap(&err, 0, partial_err);
797  });
798  }
799  for (auto& t : init_cpu_buff_threads) {
800  t.join();
801  }
802  if (err) {
803  cpu_hash_table_buff_.reset();
804  // Too many hash entries, need to retry with a 1:many table
805  throw NeedsOneToManyHash();
806  }
807  } else {
808  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
809  // Too many hash entries, need to retry with a 1:many table
810  throw NeedsOneToManyHash();
811  }
812  }
813 }
814 
816  const int8_t* col_buff,
817  const size_t num_elements,
818  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
819  const HashEntryInfo hash_entry_info,
820  const int32_t hash_join_invalid_val) {
821  const auto inner_col = cols.first;
822  CHECK(inner_col);
823  const auto& ti = inner_col->get_type_info();
824  if (cpu_hash_table_buff_) {
825  return;
826  }
827  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
828  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements);
829  const StringDictionaryProxy* sd_inner_proxy{nullptr};
830  const StringDictionaryProxy* sd_outer_proxy{nullptr};
831  if (ti.is_string()) {
832  CHECK_EQ(kENCODING_DICT, ti.get_compression());
833  sd_inner_proxy = executor_->getStringDictionaryProxy(
834  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
835  CHECK(sd_inner_proxy);
836  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
837  CHECK(outer_col);
838  sd_outer_proxy = executor_->getStringDictionaryProxy(
839  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
840  CHECK(sd_outer_proxy);
841  }
842  int thread_count = cpu_threads();
843  std::vector<std::future<void>> init_threads;
844  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
845  init_threads.emplace_back(std::async(std::launch::async,
847  &(*cpu_hash_table_buff_)[0],
848  hash_entry_info.getNormalizedHashEntryCount(),
849  hash_join_invalid_val,
850  thread_idx,
851  thread_count));
852  }
853  for (auto& child : init_threads) {
854  child.wait();
855  }
856  for (auto& child : init_threads) {
857  child.get();
858  }
859 
860  if (ti.get_type() == kDATE) {
861  fill_one_to_many_hash_table_bucketized(&(*cpu_hash_table_buff_)[0],
862  hash_entry_info,
863  hash_join_invalid_val,
864  {col_buff, num_elements},
865  {static_cast<size_t>(ti.get_size()),
866  col_range_.getIntMin(),
867  col_range_.getIntMax(),
869  isBitwiseEq(),
870  col_range_.getIntMax() + 1,
872  sd_inner_proxy,
873  sd_outer_proxy,
874  thread_count);
875  } else {
876  fill_one_to_many_hash_table(&(*cpu_hash_table_buff_)[0],
877  hash_entry_info,
878  hash_join_invalid_val,
879  {col_buff, num_elements},
880  {static_cast<size_t>(ti.get_size()),
881  col_range_.getIntMin(),
882  col_range_.getIntMax(),
884  isBitwiseEq(),
885  col_range_.getIntMax() + 1,
887  sd_inner_proxy,
888  sd_outer_proxy,
889  thread_count);
890  }
891 }
892 
893 namespace {
894 
895 #ifdef HAVE_CUDA
896 // Number of entries per shard, rounded up.
897 size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count) {
898  CHECK_NE(size_t(0), shard_count);
899  return (total_entry_count + shard_count - 1) / shard_count;
900 }
901 #endif // HAVE_CUDA
902 
903 } // namespace
904 
906  const ChunkKey& chunk_key,
907  const int8_t* col_buff,
908  const size_t num_elements,
909  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
910  const Data_Namespace::MemoryLevel effective_memory_level,
911  const int device_id) {
912  const auto inner_col = cols.first;
913  CHECK(inner_col);
914 
915  auto hash_entry_info = get_bucketized_hash_entry_info(
916  inner_col->get_type_info(), col_range_, isBitwiseEq());
917  if (!hash_entry_info) {
918  return;
919  }
920 
921 #ifdef HAVE_CUDA
922  const auto shard_count = shardCount();
923  const size_t entries_per_shard{
924  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
925  : 0};
926  // Even if we join on dictionary encoded strings, the memory on the GPU is still
927  // needed once the join hash table has been built on the CPU.
928  const auto catalog = executor_->getCatalog();
929  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
930  auto& data_mgr = catalog->getDataMgr();
931  if (shard_count) {
932  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
933  CHECK_GT(shards_per_device, 0u);
934  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
935  }
936  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
937  &data_mgr,
938  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
939  device_id);
940  }
941 #else
942  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
943 #endif
944 
945 #ifdef HAVE_CUDA
946  const auto& ti = inner_col->get_type_info();
947 #endif
948  const int32_t hash_join_invalid_val{-1};
949  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
950  CHECK(!chunk_key.empty());
951  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
952  {
953  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
954  initHashTableOnCpu(
955  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
956  }
957  if (inner_col->get_table_id() > 0) {
958  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
959  }
960  // Transfer the hash table on the GPU if we've only built it on CPU
961  // but the query runs on GPU (join on dictionary encoded columns).
962  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
963 #ifdef HAVE_CUDA
964  CHECK(ti.is_string());
965  auto& data_mgr = catalog->getDataMgr();
966  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
967 
968  copy_to_gpu(
969  &data_mgr,
970  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
971  &(*cpu_hash_table_buff_)[0],
972  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
973  device_id);
974 #else
975  CHECK(false);
976 #endif
977  }
978  } else {
979 #ifdef HAVE_CUDA
980  int err{0};
981  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
982  auto& data_mgr = catalog->getDataMgr();
983  gpu_hash_table_err_buff_[device_id] =
984  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
985  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
986  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
987  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
989  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
990  hash_entry_info.getNormalizedHashEntryCount(),
991  hash_join_invalid_val,
992  executor_->blockSize(),
993  executor_->gridSize());
994  if (chunk_key.empty()) {
995  return;
996  }
997  JoinColumn join_column{col_buff, num_elements};
998  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
999  col_range_.getIntMin(),
1000  col_range_.getIntMax(),
1002  isBitwiseEq(),
1003  col_range_.getIntMax() + 1,
1005  if (shard_count) {
1006  CHECK_GT(device_count_, 0);
1007  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1008  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1010  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1011  hash_join_invalid_val,
1012  reinterpret_cast<int*>(dev_err_buff),
1013  join_column,
1014  type_info,
1015  shard_info,
1016  executor_->blockSize(),
1017  executor_->gridSize(),
1018  hash_entry_info.bucket_normalization);
1019  }
1020  } else {
1022  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1023  hash_join_invalid_val,
1024  reinterpret_cast<int*>(dev_err_buff),
1025  join_column,
1026  type_info,
1027  executor_->blockSize(),
1028  executor_->gridSize(),
1029  hash_entry_info.bucket_normalization);
1030  }
1031  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1032 
1033  if (err) {
1034  throw NeedsOneToManyHash();
1035  }
1036 #else
1037  CHECK(false);
1038 #endif
1039  }
1040 }
1041 
1043  const ChunkKey& chunk_key,
1044  const int8_t* col_buff,
1045  const size_t num_elements,
1046  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
1047  const Data_Namespace::MemoryLevel effective_memory_level,
1048  const int device_id) {
1049  auto const inner_col = cols.first;
1050  CHECK(inner_col);
1051 
1052  auto hash_entry_info = get_bucketized_hash_entry_info(
1053  inner_col->get_type_info(), col_range_, isBitwiseEq());
1054 
1055 #ifdef HAVE_CUDA
1056  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1057  const size_t entries_per_shard =
1058  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1059  : 0);
1060  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1061  // needed once the join hash table has been built on the CPU.
1062  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1063  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1064  CHECK_GT(shards_per_device, 0u);
1065  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1066  }
1067 #else
1068  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1069 #endif
1070  if (!device_id) {
1071  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1072  }
1073 
1074 #ifdef HAVE_CUDA
1075  const auto& ti = inner_col->get_type_info();
1076  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1077  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
1078  const size_t total_count =
1079  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements;
1080  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1081  &data_mgr, total_count * sizeof(int32_t), device_id);
1082  }
1083 #endif
1084  const int32_t hash_join_invalid_val{-1};
1085  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1086  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1087  {
1088  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1089  initOneToManyHashTableOnCpu(
1090  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1091  }
1092  if (inner_col->get_table_id() > 0) {
1093  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1094  }
1095  // Transfer the hash table on the GPU if we've only built it on CPU
1096  // but the query runs on GPU (join on dictionary encoded columns).
1097  // Don't transfer the buffer if there was an error since we'll bail anyway.
1098  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
1099 #ifdef HAVE_CUDA
1100  CHECK(ti.is_string());
1101  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1102  copy_to_gpu(
1103  &data_mgr,
1104  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1105  &(*cpu_hash_table_buff_)[0],
1106  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1107  device_id);
1108 #else
1109  CHECK(false);
1110 #endif
1111  }
1112  } else {
1113 #ifdef HAVE_CUDA
1114  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1115  data_mgr.getCudaMgr()->setContext(device_id);
1117  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1118  hash_entry_info.getNormalizedHashEntryCount(),
1119  hash_join_invalid_val,
1120  executor_->blockSize(),
1121  executor_->gridSize());
1122  JoinColumn join_column{col_buff, num_elements};
1123  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1124  col_range_.getIntMin(),
1125  col_range_.getIntMax(),
1127  isBitwiseEq(),
1128  col_range_.getIntMax() + 1,
1130  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1131 
1132  if (shard_count) {
1133  CHECK_GT(device_count_, 0);
1134  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1135  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1137  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1138  hash_entry_info,
1139  hash_join_invalid_val,
1140  join_column,
1141  type_info,
1142  shard_info,
1143  executor_->blockSize(),
1144  executor_->gridSize());
1145  }
1146  } else {
1147  if (use_bucketization) {
1149  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1150  hash_entry_info,
1151  hash_join_invalid_val,
1152  join_column,
1153  type_info,
1154  executor_->blockSize(),
1155  executor_->gridSize());
1156  } else {
1158  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1159  hash_entry_info,
1160  hash_join_invalid_val,
1161  join_column,
1162  type_info,
1163  executor_->blockSize(),
1164  executor_->gridSize());
1165  }
1166  }
1167 #else
1168  CHECK(false);
1169 #endif
1170  }
1171 }
1172 
1174  const ChunkKey& chunk_key,
1175  const size_t num_elements,
1176  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1177  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1178  JoinHashTableCacheKey cache_key{col_range_,
1179  *cols.first,
1180  outer_col ? *outer_col : *cols.first,
1181  num_elements,
1182  chunk_key,
1183  qual_bin_oper_->get_optype()};
1184  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1185  for (const auto& kv : join_hash_table_cache_) {
1186  if (kv.first == cache_key) {
1187  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1188  cpu_hash_table_buff_ = kv.second;
1189  break;
1190  }
1191  }
1192 }
1193 
1195  const ChunkKey& chunk_key,
1196  const size_t num_elements,
1197  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1198  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1199  JoinHashTableCacheKey cache_key{col_range_,
1200  *cols.first,
1201  outer_col ? *outer_col : *cols.first,
1202  num_elements,
1203  chunk_key,
1204  qual_bin_oper_->get_optype()};
1205  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1206  for (const auto& kv : join_hash_table_cache_) {
1207  if (kv.first == cache_key) {
1208  return;
1209  }
1210  }
1211  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1212 }
1213 
1214 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx) {
1215  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1216  if (hash_ptr->getType()->isIntegerTy(64)) {
1217  return hash_ptr;
1218  }
1219  CHECK(hash_ptr->getType()->isPointerTy());
1220  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1221  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1222  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1223 }
1224 
1225 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx,
1226  Executor* executor) {
1227  llvm::Value* hash_ptr = nullptr;
1228  const auto total_table_count =
1229  executor->plan_state_->join_info_.join_hash_tables_.size();
1230  CHECK_LT(table_idx, total_table_count);
1231  if (total_table_count > 1) {
1232  auto hash_tables_ptr =
1233  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1234  auto hash_pptr =
1235  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1236  hash_tables_ptr,
1237  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1238  : hash_tables_ptr;
1239  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1240  } else {
1241  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1242  }
1243  CHECK(hash_ptr);
1244  return hash_ptr;
1245 }
1246 
1247 std::vector<llvm::Value*> JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr,
1248  const Analyzer::Expr* key_col,
1249  const int shard_count,
1250  const CompilationOptions& co) {
1251  CodeGenerator code_generator(executor_);
1252  const auto key_lvs = code_generator.codegen(key_col, true, co);
1253  CHECK_EQ(size_t(1), key_lvs.size());
1254  auto const& key_col_ti = key_col->get_type_info();
1255  auto hash_entry_info =
1256  get_bucketized_hash_entry_info(key_col_ti, col_range_, isBitwiseEq());
1257 
1258  std::vector<llvm::Value*> hash_join_idx_args{
1259  hash_ptr,
1260  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1261  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1262  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1263  if (shard_count) {
1264  const auto expected_hash_entry_count =
1265  get_hash_entry_count(col_range_, isBitwiseEq());
1266  const auto entry_count_per_shard =
1267  (expected_hash_entry_count + shard_count - 1) / shard_count;
1268  hash_join_idx_args.push_back(
1269  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1270  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1271  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1272  }
1273  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1274  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1275  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1276  inline_fixed_encoding_null_val(key_col_logical_ti)));
1277  }
1278  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1279  if (isBitwiseEq()) {
1280  if (special_date_bucketization_case) {
1281  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1282  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1283  } else {
1284  hash_join_idx_args.push_back(
1285  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1286  }
1287  }
1288 
1289  if (special_date_bucketization_case) {
1290  hash_join_idx_args.emplace_back(
1291  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1292  }
1293 
1294  return hash_join_idx_args;
1295 }
1296 
1298  const size_t index) {
1299  const auto cols = get_cols(
1300  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1301  auto key_col = cols.second;
1302  CHECK(key_col);
1303  auto val_col = cols.first;
1304  CHECK(val_col);
1305  auto pos_ptr = codegenHashTableLoad(index);
1306  CHECK(pos_ptr);
1307  const int shard_count = shardCount();
1308  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1309  const int64_t sub_buff_size = getComponentBufferSize();
1310  const auto& key_col_ti = key_col->get_type_info();
1311 
1312  auto bucketize = (key_col_ti.get_type() == kDATE);
1313  return codegenMatchingSet(hash_join_idx_args,
1314  shard_count,
1315  !key_col_ti.get_notnull(),
1316  isBitwiseEq(),
1317  sub_buff_size,
1318  executor_,
1319  bucketize);
1320 }
1321 
1323  const std::vector<llvm::Value*>& hash_join_idx_args_in,
1324  const bool is_sharded,
1325  const bool col_is_nullable,
1326  const bool is_bw_eq,
1327  const int64_t sub_buff_size,
1328  Executor* executor,
1329  bool is_bucketized) {
1330  using namespace std::string_literals;
1331 
1332  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1333 
1334  if (is_bw_eq) {
1335  fname += "_bitwise";
1336  }
1337  if (is_sharded) {
1338  fname += "_sharded";
1339  }
1340  if (!is_bw_eq && col_is_nullable) {
1341  fname += "_nullable";
1342  }
1343 
1344  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1345  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1346  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1347 
1348  auto pos_ptr = hash_join_idx_args_in[0];
1349  CHECK(pos_ptr);
1350 
1351  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1352  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1353  auto hash_join_idx_args = hash_join_idx_args_in;
1354  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1355  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1356 
1357  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1358  slot_valid_lv,
1359  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1360  executor->cgen_state_->llInt(int64_t(0)));
1361  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1362  executor->cgen_state_->ir_builder_.CreateAdd(
1363  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1364  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1365  auto rowid_ptr_i32 =
1366  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1367  return {rowid_ptr_i32, row_count_lv, slot_lv};
1368 }
1369 
1370 size_t JoinHashTable::offsetBufferOff() const noexcept {
1372  return 0;
1373 }
1374 
1375 size_t JoinHashTable::countBufferOff() const noexcept {
1377  return getComponentBufferSize();
1378 }
1379 
1380 size_t JoinHashTable::payloadBufferOff() const noexcept {
1382  return 2 * getComponentBufferSize();
1383 }
1384 
1385 size_t JoinHashTable::getComponentBufferSize() const noexcept {
1386  return hash_entry_count_ * sizeof(int32_t);
1387 }
1388 
1390  const size_t index) {
1391  using namespace std::string_literals;
1392 
1394  const auto cols = get_cols(
1395  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1396  auto key_col = cols.second;
1397  CHECK(key_col);
1398  auto val_col = cols.first;
1399  CHECK(val_col);
1400  CodeGenerator code_generator(executor_);
1401  const auto key_lvs = code_generator.codegen(key_col, true, co);
1402  CHECK_EQ(size_t(1), key_lvs.size());
1403  auto hash_ptr = codegenHashTableLoad(index);
1404  CHECK(hash_ptr);
1405  const int shard_count = shardCount();
1406  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1407 
1408  const auto& key_col_ti = key_col->get_type_info();
1409  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1410  : "hash_join_idx"s);
1411 
1412  if (isBitwiseEq()) {
1413  fname += "_bitwise";
1414  }
1415  if (shard_count) {
1416  fname += "_sharded";
1417  }
1418 
1419  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1420  fname += "_nullable";
1421  }
1422  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1423 }
1424 
1426  const Analyzer::ColumnVar* inner_col) const {
1427  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1428 }
1429 
1431  const int inner_table_id,
1432  const std::vector<InputTableInfo>& query_infos) {
1433  ssize_t ti_idx = -1;
1434  for (size_t i = 0; i < query_infos.size(); ++i) {
1435  if (inner_table_id == query_infos[i].table_id) {
1436  ti_idx = i;
1437  break;
1438  }
1439  }
1440  CHECK_NE(ssize_t(-1), ti_idx);
1441  return query_infos[ti_idx];
1442 }
1443 
1444 size_t get_entries_per_device(const size_t total_entries,
1445  const size_t shard_count,
1446  const size_t device_count,
1447  const Data_Namespace::MemoryLevel memory_level) {
1448  const auto entries_per_shard =
1449  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1450  size_t entries_per_device = entries_per_shard;
1451  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1452  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1453  CHECK_GT(shards_per_device, 0u);
1454  entries_per_device = entries_per_shard * shards_per_device;
1455  }
1456  return entries_per_device;
1457 }
1458 
1459 // TODO(adb): unify with BaselineJoinHashTable
1461  return memory_level_ == Data_Namespace::GPU_LEVEL
1462  ? get_shard_count(qual_bin_oper_.get(), executor_)
1463  : 0;
1464 }
1465 
1467  return qual_bin_oper_->get_optype() == kBW_EQ;
1468 }
1469 
1471 #ifdef HAVE_CUDA
1472  freeHashBufferGpuMemory();
1473 #endif
1474  freeHashBufferCpuMemory();
1475 }
1476 
1478 #ifdef HAVE_CUDA
1479  const auto& catalog = *executor_->getCatalog();
1480  auto& data_mgr = catalog.getDataMgr();
1481  for (auto& buf : gpu_hash_table_buff_) {
1482  if (buf) {
1483  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1484  buf = nullptr;
1485  }
1486  }
1487  for (auto& buf : gpu_hash_table_err_buff_) {
1488  if (buf) {
1489  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1490  buf = nullptr;
1491  }
1492  }
1493 #else
1494  CHECK(false);
1495 #endif // HAVE_CUDA
1496 }
1497 
1499  cpu_hash_table_buff_.reset();
1500 }
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
int get_column_id() const
Definition: Analyzer.h:194
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
int64_t getBucket() const
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
int8_t * allocate(std::ptrdiff_t num_bytes)
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
bool isBitwiseEq() const
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
void freeHashBufferMemory()
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t countBufferOff() const noexcept override
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
void checkHashJoinReplicationConstraint(const int table_id) const
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:184
#define CHECK_GE(x, y)
Definition: Logger.h:200
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
Definition: sqldefs.h:49
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:114
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:168
int64_t getIntMin() const
#define CHECK_NE(x, y)
Definition: Logger.h:196
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
bool is_overlaps_oper() const
Definition: Analyzer.h:433
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t payloadBufferOff() const noexcept override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
int64_t bucket_normalization
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
size_t shardCount() const
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:25
#define CHECK_LT(x, y)
Definition: Logger.h:197
Definition: sqltypes.h:55
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
bool table_is_replicated(const TableDescriptor *td)
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
int get_table_id() const
Definition: Analyzer.h:193
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
size_t getComponentBufferSize() const noexcept
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
T visit(const Analyzer::Expr *expr) const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
size_t offsetBufferOff() const noexcept override
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
#define CHECK(condition)
Definition: Logger.h:187
Definition: sqldefs.h:31
std::vector< int > ChunkKey
Definition: types.h:35
void reify(const int device_count)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
static std::pair< const int8_t *, size_t > getAllColumnFragments(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:23
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
const Expr * get_right_operand() const
Definition: Analyzer.h:436
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)
const Expr * get_left_operand() const
Definition: Analyzer.h:435