OmniSciDB  8a228a1076
JoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <future>
20 #include <numeric>
21 #include <thread>
22 
23 #include "Logger/Logger.h"
26 #include "QueryEngine/Execute.h"
31 
32 namespace {
33 
35  public:
36  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
37 };
38 
39 } // namespace
40 
42  const Analyzer::Expr* rhs,
44  const TemporaryTables* temporary_tables,
45  const bool is_overlaps_join) {
46  const auto& lhs_ti = lhs->get_type_info();
47  const auto& rhs_ti = rhs->get_type_info();
48  if (!is_overlaps_join) {
49  if (lhs_ti.get_type() != rhs_ti.get_type()) {
50  throw HashJoinFail("Equijoin types must be identical, found: " +
51  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
52  }
53  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
54  !lhs_ti.is_decimal()) {
55  throw HashJoinFail("Cannot apply hash join to inner column type " +
56  lhs_ti.get_type_name());
57  }
58  // Decimal types should be identical.
59  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
60  lhs_ti.get_precision() != rhs_ti.get_precision())) {
61  throw HashJoinFail("Equijoin with different decimal types");
62  }
63  }
64 
65  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
66  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
67  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
68  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
69  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
70  throw HashJoinFail("Cannot use hash join for given expression");
71  }
72  // Casts to decimal are not suported.
73  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
74  throw HashJoinFail("Cannot use hash join for given expression");
75  }
76  const auto lhs_col =
77  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
78  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
79  const auto rhs_col =
80  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
81  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
82  if (!lhs_col && !rhs_col) {
83  throw HashJoinFail("Cannot use hash join for given expression");
84  }
85  const Analyzer::ColumnVar* inner_col{nullptr};
86  const Analyzer::ColumnVar* outer_col{nullptr};
87  auto outer_ti = lhs_ti;
88  auto inner_ti = rhs_ti;
89  const Analyzer::Expr* outer_expr{lhs};
90  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
91  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
92  inner_col = rhs_col;
93  outer_col = lhs_col;
94  } else {
95  if (lhs_col && lhs_col->get_rte_idx() == 0) {
96  throw HashJoinFail("Cannot use hash join for given expression");
97  }
98  inner_col = lhs_col;
99  outer_col = rhs_col;
100  std::swap(outer_ti, inner_ti);
101  outer_expr = rhs;
102  }
103  if (!inner_col) {
104  throw HashJoinFail("Cannot use hash join for given expression");
105  }
106  if (!outer_col) {
107  MaxRangeTableIndexVisitor rte_idx_visitor;
108  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
109  // The inner column candidate is not actually inner; the outer
110  // expression contains columns which are at least as deep.
111  if (inner_col->get_rte_idx() <= outer_rte_idx) {
112  throw HashJoinFail("Cannot use hash join for given expression");
113  }
114  }
115  // We need to fetch the actual type information from the catalog since Analyzer
116  // always reports nullable as true for inner table columns in left joins.
117  const auto inner_col_cd = get_column_descriptor_maybe(
118  inner_col->get_column_id(), inner_col->get_table_id(), cat);
119  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
120  inner_col->get_table_id(),
121  inner_col_cd,
122  temporary_tables);
123  const auto& outer_col_ti =
124  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
125  ? outer_col->get_type_info()
126  : outer_ti;
127  // Casts from decimal are not supported.
128  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
129  (lhs_cast || rhs_cast)) {
130  throw HashJoinFail("Cannot use hash join for given expression");
131  }
132  if (is_overlaps_join) {
133  if (!inner_col_real_ti.is_array()) {
134  throw HashJoinFail(
135  "Overlaps join only supported for inner columns with array type");
136  }
137  auto is_bounds_array = [](const auto ti) {
138  return ti.is_fixlen_array() && ti.get_size() == 32;
139  };
140  if (!is_bounds_array(inner_col_real_ti)) {
141  throw HashJoinFail(
142  "Overlaps join only supported for 4-element double fixed length arrays");
143  }
144  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti))) {
145  throw HashJoinFail(
146  "Overlaps join only supported for geometry outer columns of type point or "
147  "geometry columns with bounds");
148  }
149  } else {
150  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
151  inner_col_real_ti.is_decimal() ||
152  (inner_col_real_ti.is_string() &&
153  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
154  throw HashJoinFail(
155  "Can only apply hash join to integer-like types and dictionary encoded "
156  "strings");
157  }
158  }
159 
160  auto normalized_inner_col = inner_col;
161  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
162 
163  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
164  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
165 
166  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
167  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
168  normalized_inner_ti.get_type_name() + " and " +
169  normalized_outer_ti.get_type_name()));
170  }
171 
172  return {normalized_inner_col, normalized_outer_col};
173 }
174 
175 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
177  const TemporaryTables* temporary_tables) {
178  std::vector<InnerOuter> result;
179  const auto lhs_tuple_expr =
180  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
181  const auto rhs_tuple_expr =
182  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
183 
184  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
185  if (lhs_tuple_expr) {
186  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
187  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
188  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
189  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
190  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
191  rhs_tuple[i].get(),
192  cat,
193  temporary_tables,
194  condition->is_overlaps_oper()));
195  }
196  } else {
197  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
198  result.push_back(normalize_column_pair(condition->get_left_operand(),
199  condition->get_right_operand(),
200  cat,
201  temporary_tables,
202  condition->is_overlaps_oper()));
203  }
204 
205  return result;
206 }
207 
208 namespace {
209 
210 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> get_cols(
211  const Analyzer::BinOper* qual_bin_oper,
213  const TemporaryTables* temporary_tables) {
214  const auto lhs = qual_bin_oper->get_left_operand();
215  const auto rhs = qual_bin_oper->get_right_operand();
216  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
217 }
218 
220  ExpressionRange const& col_range,
221  bool const is_bw_eq) {
222  using EmptyRangeSize = boost::optional<size_t>;
223  auto empty_range_check = [](ExpressionRange const& col_range,
224  bool const is_bw_eq) -> EmptyRangeSize {
225  if (col_range.getIntMin() > col_range.getIntMax()) {
226  CHECK_EQ(col_range.getIntMin(), int64_t(0));
227  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
228  if (is_bw_eq) {
229  return size_t(1);
230  }
231  return size_t(0);
232  }
233  return EmptyRangeSize{};
234  };
235 
236  auto empty_range = empty_range_check(col_range, is_bw_eq);
237  if (empty_range) {
238  return {size_t(*empty_range), 1};
239  }
240 
241  int64_t bucket_normalization =
242  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
243  CHECK_GT(bucket_normalization, 0);
244  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
245  bucket_normalization};
246 }
247 
248 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
249  if (col_range.getIntMin() > col_range.getIntMax()) {
250  CHECK_EQ(col_range.getIntMin(), int64_t(0));
251  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
252  return is_bw_eq ? 1 : 0;
253  }
254  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
255 }
256 
257 } // namespace
258 
259 std::vector<std::pair<JoinHashTable::JoinHashTableCacheKey,
260  std::shared_ptr<std::vector<int32_t>>>>
263 
264 size_t get_shard_count(const Analyzer::BinOper* join_condition,
265  const Executor* executor) {
266  const Analyzer::ColumnVar* inner_col{nullptr};
267  const Analyzer::Expr* outer_col{nullptr};
268  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
269  try {
270  std::tie(inner_col, outer_col) =
271  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
272  } catch (...) {
273  return 0;
274  }
275  if (!inner_col || !outer_col) {
276  return 0;
277  }
278  return get_shard_count({inner_col, outer_col}, executor);
279 }
280 
281 namespace {
282 
283 bool shard_count_less_or_equal_device_count(const int inner_table_id,
284  const Executor* executor) {
285  const auto inner_table_info = executor->getTableInfo(inner_table_id);
286  std::unordered_set<int> device_holding_fragments;
287  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
288  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
289  for (const auto& fragment : inner_table_info.fragments) {
290  if (fragment.shard != -1) {
291  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
292  if (!it_ok.second) {
293  return false;
294  }
295  }
296  }
297  return true;
298 }
299 
300 } // namespace
301 
303  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
304  const Executor* executor) {
305  const auto inner_col = equi_pair.first;
306  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
307  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
308  return 0;
309  }
310  if (outer_col->get_rte_idx()) {
311  return 0;
312  }
313  if (inner_col->get_type_info() != outer_col->get_type_info()) {
314  return 0;
315  }
316  const auto catalog = executor->getCatalog();
317  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
318  CHECK(inner_td);
319  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
320  CHECK(outer_td);
321  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
322  inner_td->nShards != outer_td->nShards) {
323  return 0;
324  }
325  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
326  return 0;
327  }
328  // The two columns involved must be the ones on which the tables have been sharded on.
329  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
330  outer_td->shardedColumnId == outer_col->get_column_id()) ||
331  (outer_td->shardedColumnId == inner_col->get_column_id() &&
332  inner_td->shardedColumnId == inner_col->get_column_id())
333  ? inner_td->nShards
334  : 0;
335 }
336 
338 std::shared_ptr<JoinHashTable> JoinHashTable::getInstance(
339  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
340  const std::vector<InputTableInfo>& query_infos,
341  const Data_Namespace::MemoryLevel memory_level,
342  const HashType preferred_hash_type,
343  const int device_count,
344  ColumnCacheMap& column_cache,
345  Executor* executor) {
346  decltype(std::chrono::steady_clock::now()) ts1, ts2;
347  if (VLOGGING(1)) {
348  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
349  << " for qual: " << qual_bin_oper->toString();
350  ts1 = std::chrono::steady_clock::now();
351  }
352  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
353  const auto cols =
354  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
355  const auto inner_col = cols.first;
356  CHECK(inner_col);
357  const auto& ti = inner_col->get_type_info();
358  auto col_range =
359  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
360  if (col_range.getType() == ExpressionRangeType::Invalid) {
361  throw HashJoinFail(
362  "Could not compute range for the expressions involved in the equijoin");
363  }
364  if (ti.is_string()) {
365  // The nullable info must be the same as the source column.
366  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
367  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
368  throw HashJoinFail(
369  "Could not compute range for the expressions involved in the equijoin");
370  }
371  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
372  // If the inner column expression range is empty, use the inner col range
373  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
374  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
375  col_range = source_col_range;
376  } else {
377  col_range = ExpressionRange::makeIntRange(
378  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
379  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
380  0,
381  source_col_range.hasNulls());
382  }
383  }
384  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
385  const auto max_hash_entry_count =
387  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
388  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
389 
390  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
391  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
392  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
393 
394  if (bucketized_entry_count > max_hash_entry_count) {
395  throw TooManyHashEntries();
396  }
397 
398  if (qual_bin_oper->get_optype() == kBW_EQ &&
399  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
400  throw HashJoinFail("Cannot translate null value for kBW_EQ");
401  }
402  auto join_hash_table =
403  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
404  inner_col,
405  query_infos,
406  memory_level,
407  preferred_hash_type,
408  col_range,
409  column_cache,
410  executor,
411  device_count));
412  try {
413  join_hash_table->reify();
414  } catch (const TableMustBeReplicated& e) {
415  // Throw a runtime error to abort the query
416  join_hash_table->freeHashBufferMemory();
417  throw std::runtime_error(e.what());
418  } catch (const HashJoinFail& e) {
419  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
420  // possible)
421  join_hash_table->freeHashBufferMemory();
422  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
423  "involved in equijoin | ") +
424  e.what());
425  } catch (const ColumnarConversionNotSupported& e) {
426  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
427  e.what());
428  } catch (const OutOfMemory& e) {
429  throw HashJoinFail(
430  std::string("Ran out of memory while building hash tables for equijoin | ") +
431  e.what());
432  } catch (const std::exception& e) {
433  throw std::runtime_error(
434  std::string("Fatal error while attempting to build hash tables for join: ") +
435  e.what());
436  }
437  if (VLOGGING(1)) {
438  ts2 = std::chrono::steady_clock::now();
439  VLOG(1) << "Built perfect hash table "
440  << getHashTypeString(join_hash_table->getHashType()) << " in "
441  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
442  << " ms";
443  }
444  return join_hash_table;
445 }
446 
448  const Analyzer::Expr* outer_col_expr,
449  const Executor* executor) {
450  const auto catalog = executor->getCatalog();
451  CHECK(catalog);
452  const auto inner_cd = get_column_descriptor_maybe(
453  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
454  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
455  inner_col->get_table_id(),
456  inner_cd,
457  executor->getTemporaryTables());
458  // Only strings may need dictionary translation.
459  if (!inner_ti.is_string()) {
460  return false;
461  }
462  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
463  CHECK(outer_col);
464  const auto outer_cd = get_column_descriptor_maybe(
465  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
466  // Don't want to deal with temporary tables for now, require translation.
467  if (!inner_cd || !outer_cd) {
468  return true;
469  }
470  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
471  outer_col->get_table_id(),
472  outer_cd,
473  executor->getTemporaryTables());
474  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
475  // If the two columns don't share the dictionary, translation is needed.
476  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
477 }
478 
479 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
480  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
481  const int device_id,
482  const int device_count) {
483  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
484  for (const auto& fragment : fragments) {
485  CHECK_GE(fragment.shard, 0);
486  if (fragment.shard % device_count == device_id) {
487  shards_for_device.push_back(fragment);
488  }
489  }
490  return shards_for_device;
491 }
492 
494  auto timer = DEBUG_TIMER(__func__);
495  CHECK_LT(0, device_count_);
496  const auto& catalog = *executor_->getCatalog();
497  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
498  const auto inner_col = cols.first;
499  checkHashJoinReplicationConstraint(inner_col->get_table_id());
500  const auto& query_info = getInnerQueryInfo(inner_col).info;
501  if (query_info.fragments.empty()) {
502  return;
503  }
504  if (query_info.getNumTuplesUpperBound() >
505  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
506  throw TooManyHashEntries();
507  }
508 #ifdef HAVE_CUDA
509  gpu_hash_table_buff_.resize(device_count_);
510  gpu_hash_table_err_buff_.resize(device_count_);
511 #endif // HAVE_CUDA
512  std::vector<std::future<void>> init_threads;
513  const int shard_count = shardCount();
514 
515  try {
516  for (int device_id = 0; device_id < device_count_; ++device_id) {
517  const auto fragments =
518  shard_count
519  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
520  : query_info.fragments;
521  init_threads.push_back(
522  std::async(std::launch::async,
526  this,
527  fragments,
528  device_id,
529  logger::thread_id()));
530  }
531  for (auto& init_thread : init_threads) {
532  init_thread.wait();
533  }
534  for (auto& init_thread : init_threads) {
535  init_thread.get();
536  }
537 
538  } catch (const NeedsOneToManyHash& e) {
540  freeHashBufferMemory();
541  init_threads.clear();
542  for (int device_id = 0; device_id < device_count_; ++device_id) {
543  const auto fragments =
544  shard_count
545  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
546  : query_info.fragments;
547 
548  init_threads.push_back(std::async(std::launch::async,
550  this,
551  fragments,
552  device_id,
553  logger::thread_id()));
554  }
555  for (auto& init_thread : init_threads) {
556  init_thread.wait();
557  }
558  for (auto& init_thread : init_threads) {
559  init_thread.get();
560  }
561  }
562 }
563 
565 #ifdef HAVE_CUDA
566  CHECK(executor_);
567  CHECK(executor_->catalog_);
568  auto& data_mgr = executor_->catalog_->getDataMgr();
569  for (auto& gpu_buffer : gpu_hash_table_buff_) {
570  if (gpu_buffer) {
571  data_mgr.free(gpu_buffer);
572  }
573  }
574  for (auto& gpu_buffer : gpu_hash_table_err_buff_) {
575  if (gpu_buffer) {
576  data_mgr.free(gpu_buffer);
577  }
578  }
579 #endif
580 }
581 
583  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
584  const Analyzer::Expr* outer_col_expr,
585  const Analyzer::ColumnVar* inner_col) const {
586  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
587  inner_col->get_table_id(),
588  inner_col->get_column_id()};
589  const auto& ti = inner_col->get_type_info();
590  if (ti.is_string()) {
591  CHECK_EQ(kENCODING_DICT, ti.get_compression());
592  size_t outer_elem_count = 0;
593  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
594  CHECK(outer_col);
595  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
596  for (auto& frag : outer_query_info.fragments) {
597  outer_elem_count = frag.getNumTuples();
598  }
599  hash_table_key.push_back(outer_elem_count);
600  }
601  if (fragments.size() < 2) {
602  hash_table_key.push_back(fragments.front().fragmentId);
603  }
604  return hash_table_key;
605 }
606 
608  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
609  const int device_id,
610  const logger::ThreadId parent_thread_id) {
611  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
612  const auto& catalog = *executor_->getCatalog();
613  auto& data_mgr = catalog.getDataMgr();
614  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
615  const auto inner_col = cols.first;
616  CHECK(inner_col);
617  const auto inner_cd = get_column_descriptor_maybe(
618  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
619  if (inner_cd && inner_cd->isVirtualCol) {
621  }
622  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
623  // Since we don't have the string dictionary payloads on the GPU, we'll build
624  // the join hash table on the CPU and transfer it to the GPU.
625  const auto effective_memory_level =
626  needs_dictionary_translation(inner_col, cols.second, executor_)
628  : memory_level_;
629  if (fragments.empty()) {
630  // No data in this fragment. Still need to create a hash table and initialize it
631  // properly.
632  ChunkKey empty_chunk;
633  initOneToOneHashTable(empty_chunk,
634  JoinColumn{nullptr, 0, 0, 0, 0},
635  cols,
636  effective_memory_level,
637  device_id);
638  return;
639  }
640 
641  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
642  std::unique_ptr<DeviceAllocator> device_allocator;
643  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
644  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
645  }
646  std::vector<std::shared_ptr<void>> malloc_owner;
647 
648  JoinColumn join_column = fetchJoinColumn(inner_col,
649  fragments,
650  effective_memory_level,
651  device_id,
652  chunks_owner,
653  device_allocator.get(),
654  malloc_owner,
655  executor_,
656  &column_cache_);
657 
658  initOneToOneHashTable(genHashTableKey(fragments, cols.second, inner_col),
659  join_column,
660  cols,
661  effective_memory_level,
662  device_id);
663 }
664 
666  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
667  const int device_id,
668  const logger::ThreadId parent_thread_id) {
669  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
670  const auto& catalog = *executor_->getCatalog();
671  auto& data_mgr = catalog.getDataMgr();
672  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
673  const auto inner_col = cols.first;
674  CHECK(inner_col);
675  const auto inner_cd = get_column_descriptor_maybe(
676  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
677  if (inner_cd && inner_cd->isVirtualCol) {
679  }
680  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
681  // Since we don't have the string dictionary payloads on the GPU, we'll build
682  // the join hash table on the CPU and transfer it to the GPU.
683  const auto effective_memory_level =
684  needs_dictionary_translation(inner_col, cols.second, executor_)
686  : memory_level_;
687  if (fragments.empty()) {
688  ChunkKey empty_chunk;
689  initOneToManyHashTable(empty_chunk,
690  JoinColumn{nullptr, 0, 0, 0, 0},
691  cols,
692  effective_memory_level,
693  device_id);
694  return;
695  }
696 
697  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
698  std::unique_ptr<DeviceAllocator> device_allocator;
699  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
700  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
701  }
702  std::vector<std::shared_ptr<void>> malloc_owner;
703 
704  JoinColumn join_column = fetchJoinColumn(inner_col,
705  fragments,
706  effective_memory_level,
707  device_id,
708  chunks_owner,
709  device_allocator.get(),
710  malloc_owner,
711  executor_,
712  &column_cache_);
713 
714  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
715  join_column,
716  cols,
717  effective_memory_level,
718  device_id);
719 }
720 
722  if (!g_cluster) {
723  return;
724  }
725  if (table_id >= 0) {
726  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
727  CHECK(inner_td);
728  size_t shard_count{0};
729  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
730  if (!shard_count && !table_is_replicated(inner_td)) {
731  throw TableMustBeReplicated(inner_td->tableName);
732  }
733  }
734 }
735 
737  const JoinColumn& join_column,
738  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
739  const HashEntryInfo hash_entry_info,
740  const int32_t hash_join_invalid_val) {
741  auto timer = DEBUG_TIMER(__func__);
742  const auto inner_col = cols.first;
743  CHECK(inner_col);
744  const auto& ti = inner_col->get_type_info();
745  if (!cpu_hash_table_buff_) {
746  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
747  hash_entry_info.getNormalizedHashEntryCount());
748  const StringDictionaryProxy* sd_inner_proxy{nullptr};
749  const StringDictionaryProxy* sd_outer_proxy{nullptr};
750  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
751  if (ti.is_string() &&
752  (outer_col && !(inner_col->get_comp_param() == outer_col->get_comp_param()))) {
753  CHECK_EQ(kENCODING_DICT, ti.get_compression());
754  sd_inner_proxy = executor_->getStringDictionaryProxy(
755  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
756  CHECK(sd_inner_proxy);
757  CHECK(outer_col);
758  sd_outer_proxy = executor_->getStringDictionaryProxy(
759  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
760  CHECK(sd_outer_proxy);
761  }
762  int thread_count = cpu_threads();
763  std::vector<std::thread> init_cpu_buff_threads;
764  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
765  init_cpu_buff_threads.emplace_back(
766  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
767  init_hash_join_buff(&(*cpu_hash_table_buff_)[0],
768  hash_entry_info.getNormalizedHashEntryCount(),
769  hash_join_invalid_val,
770  thread_idx,
771  thread_count);
772  });
773  }
774  for (auto& t : init_cpu_buff_threads) {
775  t.join();
776  }
777  init_cpu_buff_threads.clear();
778  int err{0};
779  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
780  init_cpu_buff_threads.emplace_back([this,
781  hash_join_invalid_val,
782  &join_column,
783  sd_inner_proxy,
784  sd_outer_proxy,
785  thread_idx,
786  thread_count,
787  &ti,
788  &err,
789  hash_entry_info] {
790  int partial_err =
791  fill_hash_join_buff_bucketized(&(*cpu_hash_table_buff_)[0],
792  hash_join_invalid_val,
793  join_column,
794  {static_cast<size_t>(ti.get_size()),
795  col_range_.getIntMin(),
796  col_range_.getIntMax(),
798  isBitwiseEq(),
799  col_range_.getIntMax() + 1,
801  sd_inner_proxy,
802  sd_outer_proxy,
803  thread_idx,
804  thread_count,
805  hash_entry_info.bucket_normalization);
806  __sync_val_compare_and_swap(&err, 0, partial_err);
807  });
808  }
809  for (auto& t : init_cpu_buff_threads) {
810  t.join();
811  }
812  if (err) {
813  cpu_hash_table_buff_.reset();
814  // Too many hash entries, need to retry with a 1:many table
815  throw NeedsOneToManyHash();
816  }
817  } else {
818  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
819  // Too many hash entries, need to retry with a 1:many table
820  throw NeedsOneToManyHash();
821  }
822  }
823 }
824 
826  const JoinColumn& join_column,
827  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
828  const HashEntryInfo hash_entry_info,
829  const int32_t hash_join_invalid_val) {
830  auto timer = DEBUG_TIMER(__func__);
831  const auto inner_col = cols.first;
832  CHECK(inner_col);
833  const auto& ti = inner_col->get_type_info();
834  if (cpu_hash_table_buff_) {
835  return;
836  }
837  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
838  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems);
839  const StringDictionaryProxy* sd_inner_proxy{nullptr};
840  const StringDictionaryProxy* sd_outer_proxy{nullptr};
841  if (ti.is_string()) {
842  CHECK_EQ(kENCODING_DICT, ti.get_compression());
843  sd_inner_proxy = executor_->getStringDictionaryProxy(
844  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
845  CHECK(sd_inner_proxy);
846  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
847  CHECK(outer_col);
848  sd_outer_proxy = executor_->getStringDictionaryProxy(
849  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
850  CHECK(sd_outer_proxy);
851  }
852  int thread_count = cpu_threads();
853  std::vector<std::future<void>> init_threads;
854  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
855  init_threads.emplace_back(std::async(std::launch::async,
857  &(*cpu_hash_table_buff_)[0],
858  hash_entry_info.getNormalizedHashEntryCount(),
859  hash_join_invalid_val,
860  thread_idx,
861  thread_count));
862  }
863  for (auto& child : init_threads) {
864  child.wait();
865  }
866  for (auto& child : init_threads) {
867  child.get();
868  }
869 
870  if (ti.get_type() == kDATE) {
871  fill_one_to_many_hash_table_bucketized(&(*cpu_hash_table_buff_)[0],
872  hash_entry_info,
873  hash_join_invalid_val,
874  join_column,
875  {static_cast<size_t>(ti.get_size()),
876  col_range_.getIntMin(),
877  col_range_.getIntMax(),
879  isBitwiseEq(),
880  col_range_.getIntMax() + 1,
882  sd_inner_proxy,
883  sd_outer_proxy,
884  thread_count);
885  } else {
886  fill_one_to_many_hash_table(&(*cpu_hash_table_buff_)[0],
887  hash_entry_info,
888  hash_join_invalid_val,
889  join_column,
890  {static_cast<size_t>(ti.get_size()),
891  col_range_.getIntMin(),
892  col_range_.getIntMax(),
894  isBitwiseEq(),
895  col_range_.getIntMax() + 1,
897  sd_inner_proxy,
898  sd_outer_proxy,
899  thread_count);
900  }
901 }
902 
903 namespace {
904 
905 #ifdef HAVE_CUDA
906 // Number of entries per shard, rounded up.
907 size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count) {
908  CHECK_NE(size_t(0), shard_count);
909  return (total_entry_count + shard_count - 1) / shard_count;
910 }
911 #endif // HAVE_CUDA
912 
913 } // namespace
914 
916  const ChunkKey& chunk_key,
917  const JoinColumn& join_column,
918  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
919  const Data_Namespace::MemoryLevel effective_memory_level,
920  const int device_id) {
921  auto timer = DEBUG_TIMER(__func__);
922  const auto inner_col = cols.first;
923  CHECK(inner_col);
924 
925  auto hash_entry_info = get_bucketized_hash_entry_info(
926  inner_col->get_type_info(), col_range_, isBitwiseEq());
927  if (!hash_entry_info) {
928  return;
929  }
930 
931 #ifdef HAVE_CUDA
932  const auto shard_count = shardCount();
933  const size_t entries_per_shard{
934  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
935  : 0};
936  // Even if we join on dictionary encoded strings, the memory on the GPU is still
937  // needed once the join hash table has been built on the CPU.
938  const auto catalog = executor_->getCatalog();
939  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
940  auto& data_mgr = catalog->getDataMgr();
941  if (shard_count) {
942  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
943  CHECK_GT(shards_per_device, 0u);
944  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
945  }
946  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
947  &data_mgr,
948  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
949  device_id);
950  }
951 #else
952  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
953 #endif
954  if (!device_id) {
955  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
956  }
957 
958 #ifdef HAVE_CUDA
959  const auto& ti = inner_col->get_type_info();
960 #endif
961  const int32_t hash_join_invalid_val{-1};
962  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
963  CHECK(!chunk_key.empty());
964  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
965  {
966  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
967  initOneToOneHashTableOnCpu(
968  join_column, cols, hash_entry_info, hash_join_invalid_val);
969  }
970  if (inner_col->get_table_id() > 0) {
971  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
972  }
973  // Transfer the hash table on the GPU if we've only built it on CPU
974  // but the query runs on GPU (join on dictionary encoded columns).
975  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
976 #ifdef HAVE_CUDA
977  CHECK(ti.is_string());
978  auto& data_mgr = catalog->getDataMgr();
979  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
980 
981  copy_to_gpu(
982  &data_mgr,
983  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
984  &(*cpu_hash_table_buff_)[0],
985  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
986  device_id);
987 #else
988  CHECK(false);
989 #endif
990  }
991  } else {
992 #ifdef HAVE_CUDA
993  int err{0};
994  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
995  auto& data_mgr = catalog->getDataMgr();
996  gpu_hash_table_err_buff_[device_id] =
997  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
998  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
999  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
1000  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1002  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1003  hash_entry_info.getNormalizedHashEntryCount(),
1004  hash_join_invalid_val,
1005  executor_->blockSize(),
1006  executor_->gridSize());
1007  if (chunk_key.empty()) {
1008  return;
1009  }
1010  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1011  col_range_.getIntMin(),
1012  col_range_.getIntMax(),
1014  isBitwiseEq(),
1015  col_range_.getIntMax() + 1,
1017  if (shard_count) {
1018  CHECK_GT(device_count_, 0);
1019  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1020  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1022  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1023  hash_join_invalid_val,
1024  reinterpret_cast<int*>(dev_err_buff),
1025  join_column,
1026  type_info,
1027  shard_info,
1028  executor_->blockSize(),
1029  executor_->gridSize(),
1030  hash_entry_info.bucket_normalization);
1031  }
1032  } else {
1034  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1035  hash_join_invalid_val,
1036  reinterpret_cast<int*>(dev_err_buff),
1037  join_column,
1038  type_info,
1039  executor_->blockSize(),
1040  executor_->gridSize(),
1041  hash_entry_info.bucket_normalization);
1042  }
1043  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1044 
1045  if (err) {
1046  throw NeedsOneToManyHash();
1047  }
1048 #else
1049  CHECK(false);
1050 #endif
1051  }
1052 }
1053 
1055  const ChunkKey& chunk_key,
1056  const JoinColumn& join_column,
1057  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
1058  const Data_Namespace::MemoryLevel effective_memory_level,
1059  const int device_id) {
1060  auto timer = DEBUG_TIMER(__func__);
1061  auto const inner_col = cols.first;
1062  CHECK(inner_col);
1063 
1064  auto hash_entry_info = get_bucketized_hash_entry_info(
1065  inner_col->get_type_info(), col_range_, isBitwiseEq());
1066 
1067 #ifdef HAVE_CUDA
1068  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1069  const size_t entries_per_shard =
1070  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1071  : 0);
1072  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1073  // needed once the join hash table has been built on the CPU.
1074  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1075  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1076  CHECK_GT(shards_per_device, 0u);
1077  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1078  }
1079 #else
1080  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1081 #endif
1082  if (!device_id) {
1083  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1084  }
1085 
1086 #ifdef HAVE_CUDA
1087  const auto& ti = inner_col->get_type_info();
1088  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1089  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
1090  const size_t total_count =
1091  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
1092  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1093  &data_mgr, total_count * sizeof(int32_t), device_id);
1094  }
1095 #endif
1096  const int32_t hash_join_invalid_val{-1};
1097  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1098  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
1099  {
1100  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1101  initOneToManyHashTableOnCpu(
1102  join_column, cols, hash_entry_info, hash_join_invalid_val);
1103  }
1104  if (inner_col->get_table_id() > 0) {
1105  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
1106  }
1107  // Transfer the hash table on the GPU if we've only built it on CPU
1108  // but the query runs on GPU (join on dictionary encoded columns).
1109  // Don't transfer the buffer if there was an error since we'll bail anyway.
1110  if (memory_level_ == Data_Namespace::GPU_LEVEL) {
1111 #ifdef HAVE_CUDA
1112  CHECK(ti.is_string());
1113  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1114  copy_to_gpu(
1115  &data_mgr,
1116  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1117  &(*cpu_hash_table_buff_)[0],
1118  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1119  device_id);
1120 #else
1121  CHECK(false);
1122 #endif
1123  }
1124  } else {
1125 #ifdef HAVE_CUDA
1126  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1127  data_mgr.getCudaMgr()->setContext(device_id);
1129  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1130  hash_entry_info.getNormalizedHashEntryCount(),
1131  hash_join_invalid_val,
1132  executor_->blockSize(),
1133  executor_->gridSize());
1134  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1135  col_range_.getIntMin(),
1136  col_range_.getIntMax(),
1138  isBitwiseEq(),
1139  col_range_.getIntMax() + 1,
1141  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1142 
1143  if (shard_count) {
1144  CHECK_GT(device_count_, 0);
1145  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1146  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1148  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1149  hash_entry_info,
1150  hash_join_invalid_val,
1151  join_column,
1152  type_info,
1153  shard_info,
1154  executor_->blockSize(),
1155  executor_->gridSize());
1156  }
1157  } else {
1158  if (use_bucketization) {
1160  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1161  hash_entry_info,
1162  hash_join_invalid_val,
1163  join_column,
1164  type_info,
1165  executor_->blockSize(),
1166  executor_->gridSize());
1167  } else {
1169  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1170  hash_entry_info,
1171  hash_join_invalid_val,
1172  join_column,
1173  type_info,
1174  executor_->blockSize(),
1175  executor_->gridSize());
1176  }
1177  }
1178 #else
1179  CHECK(false);
1180 #endif
1181  }
1182 }
1183 
1185  const ChunkKey& chunk_key,
1186  const size_t num_elements,
1187  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1188  auto timer = DEBUG_TIMER(__func__);
1189  CHECK_GE(chunk_key.size(), size_t(2));
1190  if (chunk_key[1] < 0) {
1191  // Do not cache hash tables over intermediate results
1192  return;
1193  }
1194  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1195  JoinHashTableCacheKey cache_key{col_range_,
1196  *cols.first,
1197  outer_col ? *outer_col : *cols.first,
1198  num_elements,
1199  chunk_key,
1200  qual_bin_oper_->get_optype()};
1201  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1202  for (const auto& kv : join_hash_table_cache_) {
1203  if (kv.first == cache_key) {
1204  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1205  cpu_hash_table_buff_ = kv.second;
1206  break;
1207  }
1208  }
1209 }
1210 
1212  const ChunkKey& chunk_key,
1213  const size_t num_elements,
1214  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1215  CHECK_GE(chunk_key.size(), size_t(2));
1216  if (chunk_key[1] < 0) {
1217  // Do not cache hash tables over intermediate results
1218  return;
1219  }
1220  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1221  JoinHashTableCacheKey cache_key{col_range_,
1222  *cols.first,
1223  outer_col ? *outer_col : *cols.first,
1224  num_elements,
1225  chunk_key,
1226  qual_bin_oper_->get_optype()};
1227  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1228  for (const auto& kv : join_hash_table_cache_) {
1229  if (kv.first == cache_key) {
1230  return;
1231  }
1232  }
1233  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1234 }
1235 
1236 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx) {
1237  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1238  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1239  if (hash_ptr->getType()->isIntegerTy(64)) {
1240  return hash_ptr;
1241  }
1242  CHECK(hash_ptr->getType()->isPointerTy());
1243  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1244  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1245  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1246 }
1247 
1248 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx,
1249  Executor* executor) {
1250  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1251  llvm::Value* hash_ptr = nullptr;
1252  const auto total_table_count =
1253  executor->plan_state_->join_info_.join_hash_tables_.size();
1254  CHECK_LT(table_idx, total_table_count);
1255  if (total_table_count > 1) {
1256  auto hash_tables_ptr =
1257  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1258  auto hash_pptr =
1259  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1260  hash_tables_ptr,
1261  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1262  : hash_tables_ptr;
1263  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1264  } else {
1265  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1266  }
1267  CHECK(hash_ptr);
1268  return hash_ptr;
1269 }
1270 
1271 std::vector<llvm::Value*> JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr,
1272  const Analyzer::Expr* key_col,
1273  const int shard_count,
1274  const CompilationOptions& co) {
1275  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1276  CodeGenerator code_generator(executor_);
1277  const auto key_lvs = code_generator.codegen(key_col, true, co);
1278  CHECK_EQ(size_t(1), key_lvs.size());
1279  auto const& key_col_ti = key_col->get_type_info();
1280  auto hash_entry_info =
1281  get_bucketized_hash_entry_info(key_col_ti, col_range_, isBitwiseEq());
1282 
1283  std::vector<llvm::Value*> hash_join_idx_args{
1284  hash_ptr,
1285  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1286  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1287  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1288  if (shard_count) {
1289  const auto expected_hash_entry_count =
1290  get_hash_entry_count(col_range_, isBitwiseEq());
1291  const auto entry_count_per_shard =
1292  (expected_hash_entry_count + shard_count - 1) / shard_count;
1293  hash_join_idx_args.push_back(
1294  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1295  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1296  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1297  }
1298  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1299  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1300  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1301  inline_fixed_encoding_null_val(key_col_logical_ti)));
1302  }
1303  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1304  if (isBitwiseEq()) {
1305  if (special_date_bucketization_case) {
1306  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1307  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1308  } else {
1309  hash_join_idx_args.push_back(
1310  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1311  }
1312  }
1313 
1314  if (special_date_bucketization_case) {
1315  hash_join_idx_args.emplace_back(
1316  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1317  }
1318 
1319  return hash_join_idx_args;
1320 }
1321 
1323  const size_t index) {
1324  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1325  const auto cols = get_cols(
1326  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1327  auto key_col = cols.second;
1328  CHECK(key_col);
1329  auto val_col = cols.first;
1330  CHECK(val_col);
1331  auto pos_ptr = codegenHashTableLoad(index);
1332  CHECK(pos_ptr);
1333  const int shard_count = shardCount();
1334  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1335  const int64_t sub_buff_size = getComponentBufferSize();
1336  const auto& key_col_ti = key_col->get_type_info();
1337 
1338  auto bucketize = (key_col_ti.get_type() == kDATE);
1339  return codegenMatchingSet(hash_join_idx_args,
1340  shard_count,
1341  !key_col_ti.get_notnull(),
1342  isBitwiseEq(),
1343  sub_buff_size,
1344  executor_,
1345  bucketize);
1346 }
1347 
1349  const std::vector<llvm::Value*>& hash_join_idx_args_in,
1350  const bool is_sharded,
1351  const bool col_is_nullable,
1352  const bool is_bw_eq,
1353  const int64_t sub_buff_size,
1354  Executor* executor,
1355  bool is_bucketized) {
1356  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1357  using namespace std::string_literals;
1358 
1359  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1360 
1361  if (is_bw_eq) {
1362  fname += "_bitwise";
1363  }
1364  if (is_sharded) {
1365  fname += "_sharded";
1366  }
1367  if (!is_bw_eq && col_is_nullable) {
1368  fname += "_nullable";
1369  }
1370 
1371  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1372  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1373  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1374 
1375  auto pos_ptr = hash_join_idx_args_in[0];
1376  CHECK(pos_ptr);
1377 
1378  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1379  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1380  auto hash_join_idx_args = hash_join_idx_args_in;
1381  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1382  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1383 
1384  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1385  slot_valid_lv,
1386  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1387  executor->cgen_state_->llInt(int64_t(0)));
1388  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1389  executor->cgen_state_->ir_builder_.CreateAdd(
1390  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1391  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1392  auto rowid_ptr_i32 =
1393  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1394  return {rowid_ptr_i32, row_count_lv, slot_lv};
1395 }
1396 
1397 size_t JoinHashTable::offsetBufferOff() const noexcept {
1398  return 0;
1399 }
1400 
1401 size_t JoinHashTable::countBufferOff() const noexcept {
1402  return getComponentBufferSize();
1403 }
1404 
1405 size_t JoinHashTable::payloadBufferOff() const noexcept {
1406  return 2 * getComponentBufferSize();
1407 }
1408 
1409 size_t JoinHashTable::getComponentBufferSize() const noexcept {
1410  if (hash_type_ == JoinHashTableInterface::HashType::OneToMany) {
1411  return hash_entry_count_ * sizeof(int32_t);
1412  } else {
1413  return 0;
1414  }
1415 }
1416 
1418  const int device_id) const noexcept {
1419  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1420  return 0;
1421  }
1422 #ifdef HAVE_CUDA
1423  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1424  if (device_type == ExecutorDeviceType::CPU) {
1425  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1426  } else {
1427  return gpu_hash_table_buff_[device_id]
1428  ? reinterpret_cast<CUdeviceptr>(
1429  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1430  : reinterpret_cast<CUdeviceptr>(nullptr);
1431  }
1432 #else
1433  CHECK(device_type == ExecutorDeviceType::CPU);
1434  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1435 #endif
1436 }
1437 
1439  const int device_id) const noexcept {
1440  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1441  return 0;
1442  }
1443 #ifdef HAVE_CUDA
1444  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1445  if (device_type == ExecutorDeviceType::CPU) {
1446  return cpu_hash_table_buff_->size() *
1447  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1448  } else {
1449  return gpu_hash_table_buff_[device_id]
1450  ? gpu_hash_table_buff_[device_id]->reservedSize()
1451  : 0;
1452  }
1453 #else
1454  CHECK(device_type == ExecutorDeviceType::CPU);
1455  return cpu_hash_table_buff_->size() *
1456  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1457 #endif
1458 }
1459 
1460 std::string JoinHashTable::toString(const ExecutorDeviceType device_type,
1461  const int device_id,
1462  bool raw) const {
1463  auto buffer = getJoinHashBuffer(device_type, device_id);
1464  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1465 #ifdef HAVE_CUDA
1466  std::unique_ptr<int8_t[]> buffer_copy;
1467  if (device_type == ExecutorDeviceType::GPU) {
1468  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1469 
1470  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1471  buffer_copy.get(),
1472  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1473  buffer_size,
1474  device_id);
1475  }
1476  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1477 #else
1478  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1479 #endif // HAVE_CUDA
1480  auto ptr2 = ptr1 + offsetBufferOff();
1481  auto ptr3 = ptr1 + countBufferOff();
1482  auto ptr4 = ptr1 + payloadBufferOff();
1483  return JoinHashTableInterface::toString("perfect",
1484  getHashTypeString(hash_type_),
1485  0,
1486  0,
1487  hash_entry_count_,
1488  ptr1,
1489  ptr2,
1490  ptr3,
1491  ptr4,
1492  buffer_size,
1493  raw);
1494 }
1495 
1496 std::set<DecodedJoinHashBufferEntry> JoinHashTable::toSet(
1497  const ExecutorDeviceType device_type,
1498  const int device_id) const {
1499  auto buffer = getJoinHashBuffer(device_type, device_id);
1500  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1501 #ifdef HAVE_CUDA
1502  std::unique_ptr<int8_t[]> buffer_copy;
1503  if (device_type == ExecutorDeviceType::GPU) {
1504  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1505 
1506  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1507  buffer_copy.get(),
1508  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1509  buffer_size,
1510  device_id);
1511  }
1512  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1513 #else
1514  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1515 #endif // HAVE_CUDA
1516  auto ptr2 = ptr1 + offsetBufferOff();
1517  auto ptr3 = ptr1 + countBufferOff();
1518  auto ptr4 = ptr1 + payloadBufferOff();
1520  0, 0, hash_entry_count_, ptr1, ptr2, ptr3, ptr4, buffer_size);
1521 }
1522 
1524  const size_t index) {
1525  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1526  using namespace std::string_literals;
1527 
1529  const auto cols = get_cols(
1530  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1531  auto key_col = cols.second;
1532  CHECK(key_col);
1533  auto val_col = cols.first;
1534  CHECK(val_col);
1535  CodeGenerator code_generator(executor_);
1536  const auto key_lvs = code_generator.codegen(key_col, true, co);
1537  CHECK_EQ(size_t(1), key_lvs.size());
1538  auto hash_ptr = codegenHashTableLoad(index);
1539  CHECK(hash_ptr);
1540  const int shard_count = shardCount();
1541  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1542 
1543  const auto& key_col_ti = key_col->get_type_info();
1544  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1545  : "hash_join_idx"s);
1546 
1547  if (isBitwiseEq()) {
1548  fname += "_bitwise";
1549  }
1550  if (shard_count) {
1551  fname += "_sharded";
1552  }
1553 
1554  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1555  fname += "_nullable";
1556  }
1557  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1558 }
1559 
1561  const Analyzer::ColumnVar* inner_col) const {
1562  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1563 }
1564 
1566  const int inner_table_id,
1567  const std::vector<InputTableInfo>& query_infos) {
1568  ssize_t ti_idx = -1;
1569  for (size_t i = 0; i < query_infos.size(); ++i) {
1570  if (inner_table_id == query_infos[i].table_id) {
1571  ti_idx = i;
1572  break;
1573  }
1574  }
1575  CHECK_NE(ssize_t(-1), ti_idx);
1576  return query_infos[ti_idx];
1577 }
1578 
1579 size_t get_entries_per_device(const size_t total_entries,
1580  const size_t shard_count,
1581  const size_t device_count,
1582  const Data_Namespace::MemoryLevel memory_level) {
1583  const auto entries_per_shard =
1584  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1585  size_t entries_per_device = entries_per_shard;
1586  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1587  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1588  CHECK_GT(shards_per_device, 0u);
1589  entries_per_device = entries_per_shard * shards_per_device;
1590  }
1591  return entries_per_device;
1592 }
1593 
1594 // TODO(adb): unify with BaselineJoinHashTable
1596  return memory_level_ == Data_Namespace::GPU_LEVEL
1597  ? get_shard_count(qual_bin_oper_.get(), executor_)
1598  : 0;
1599 }
1600 
1602  return qual_bin_oper_->get_optype() == kBW_EQ;
1603 }
1604 
1606 #ifdef HAVE_CUDA
1607  freeHashBufferGpuMemory();
1608 #endif
1609  freeHashBufferCpuMemory();
1610 }
1611 
1613 #ifdef HAVE_CUDA
1614  const auto& catalog = *executor_->getCatalog();
1615  auto& data_mgr = catalog.getDataMgr();
1616  for (auto& buf : gpu_hash_table_buff_) {
1617  if (buf) {
1618  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1619  buf = nullptr;
1620  }
1621  }
1622  for (auto& buf : gpu_hash_table_err_buff_) {
1623  if (buf) {
1624  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1625  buf = nullptr;
1626  }
1627  }
1628 #else
1629  CHECK(false);
1630 #endif // HAVE_CUDA
1631 }
1632 
1634  cpu_hash_table_buff_.reset();
1635 }
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
int get_column_id() const
Definition: Analyzer.h:196
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
int64_t getBucket() const
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ExecutorDeviceType
size_t num_elems
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
bool isBitwiseEq() const
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void freeHashBufferMemory()
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t countBufferOff() const noexcept override
void checkHashJoinReplicationConstraint(const int table_id) const
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:165
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:819
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
int64_t getIntMax() const
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
void initOneToOneHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
std::string cat(Ts &&... args)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:95
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void reifyOneToOneForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:149
int64_t getIntMin() const
#define CHECK_NE(x, y)
Definition: Logger.h:206
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
bool is_overlaps_oper() const
Definition: Analyzer.h:441
void reifyOneToManyForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
size_t payloadBufferOff() const noexcept override
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
int64_t bucket_normalization
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
size_t shardCount() const
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
#define VLOGGING(n)
Definition: Logger.h:195
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:55
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
bool table_is_replicated(const TableDescriptor *td)
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
int get_table_id() const
Definition: Analyzer.h:195
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
uint64_t ThreadId
Definition: Logger.h:306
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
size_t getComponentBufferSize() const noexcept
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexcept override
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
T visit(const Analyzer::Expr *expr) const
void initOneToManyHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
ThreadId thread_id()
Definition: Logger.cpp:731
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
size_t offsetBufferOff() const noexcept override
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
Definition: sqldefs.h:31
std::vector< int > ChunkKey
Definition: types.h:35
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
virtual ~JoinHashTable()
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexcept override
int cpu_threads()
Definition: thread_count.h:24
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
#define VLOG(n)
Definition: Logger.h:291
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
const Expr * get_right_operand() const
Definition: Analyzer.h:444
const Expr * get_left_operand() const
Definition: Analyzer.h:443