OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <atomic>
20 #include <future>
21 #include <numeric>
22 #include <thread>
23 
24 #include "Logger/Logger.h"
27 #include "QueryEngine/Execute.h"
32 
33 namespace {
34 
36  public:
37  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
38 };
39 
40 } // namespace
41 
43  const Analyzer::Expr* rhs,
45  const TemporaryTables* temporary_tables,
46  const bool is_overlaps_join) {
47  const auto& lhs_ti = lhs->get_type_info();
48  const auto& rhs_ti = rhs->get_type_info();
49  if (!is_overlaps_join) {
50  if (lhs_ti.get_type() != rhs_ti.get_type()) {
51  throw HashJoinFail("Equijoin types must be identical, found: " +
52  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
53  }
54  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
55  !lhs_ti.is_decimal()) {
56  throw HashJoinFail("Cannot apply hash join to inner column type " +
57  lhs_ti.get_type_name());
58  }
59  // Decimal types should be identical.
60  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
61  lhs_ti.get_precision() != rhs_ti.get_precision())) {
62  throw HashJoinFail("Equijoin with different decimal types");
63  }
64  }
65 
66  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
67  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
68  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
69  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
70  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
71  throw HashJoinFail("Cannot use hash join for given expression");
72  }
73  // Casts to decimal are not suported.
74  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
75  throw HashJoinFail("Cannot use hash join for given expression");
76  }
77  const auto lhs_col =
78  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
79  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
80  const auto rhs_col =
81  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
82  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
83  if (!lhs_col && !rhs_col) {
84  throw HashJoinFail("Cannot use hash join for given expression");
85  }
86  const Analyzer::ColumnVar* inner_col{nullptr};
87  const Analyzer::ColumnVar* outer_col{nullptr};
88  auto outer_ti = lhs_ti;
89  auto inner_ti = rhs_ti;
90  const Analyzer::Expr* outer_expr{lhs};
91  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
92  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
93  inner_col = rhs_col;
94  outer_col = lhs_col;
95  } else {
96  if (lhs_col && lhs_col->get_rte_idx() == 0) {
97  throw HashJoinFail("Cannot use hash join for given expression");
98  }
99  inner_col = lhs_col;
100  outer_col = rhs_col;
101  std::swap(outer_ti, inner_ti);
102  outer_expr = rhs;
103  }
104  if (!inner_col) {
105  throw HashJoinFail("Cannot use hash join for given expression");
106  }
107  if (!outer_col) {
108  MaxRangeTableIndexVisitor rte_idx_visitor;
109  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
110  // The inner column candidate is not actually inner; the outer
111  // expression contains columns which are at least as deep.
112  if (inner_col->get_rte_idx() <= outer_rte_idx) {
113  throw HashJoinFail("Cannot use hash join for given expression");
114  }
115  }
116  // We need to fetch the actual type information from the catalog since Analyzer
117  // always reports nullable as true for inner table columns in left joins.
118  const auto inner_col_cd = get_column_descriptor_maybe(
119  inner_col->get_column_id(), inner_col->get_table_id(), cat);
120  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
121  inner_col->get_table_id(),
122  inner_col_cd,
123  temporary_tables);
124  const auto& outer_col_ti =
125  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
126  ? outer_col->get_type_info()
127  : outer_ti;
128  // Casts from decimal are not supported.
129  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
130  (lhs_cast || rhs_cast)) {
131  throw HashJoinFail("Cannot use hash join for given expression");
132  }
133  if (is_overlaps_join) {
134  if (!inner_col_real_ti.is_array()) {
135  throw HashJoinFail(
136  "Overlaps join only supported for inner columns with array type");
137  }
138  auto is_bounds_array = [](const auto ti) {
139  return ti.is_fixlen_array() && ti.get_size() == 32;
140  };
141  if (!is_bounds_array(inner_col_real_ti)) {
142  throw HashJoinFail(
143  "Overlaps join only supported for 4-element double fixed length arrays");
144  }
145  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti))) {
146  throw HashJoinFail(
147  "Overlaps join only supported for geometry outer columns of type point or "
148  "geometry columns with bounds");
149  }
150  } else {
151  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
152  inner_col_real_ti.is_decimal() ||
153  (inner_col_real_ti.is_string() &&
154  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
155  throw HashJoinFail(
156  "Can only apply hash join to integer-like types and dictionary encoded "
157  "strings");
158  }
159  }
160 
161  auto normalized_inner_col = inner_col;
162  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
163 
164  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
165  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
166 
167  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
168  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
169  normalized_inner_ti.get_type_name() + " and " +
170  normalized_outer_ti.get_type_name()));
171  }
172 
173  return {normalized_inner_col, normalized_outer_col};
174 }
175 
176 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
178  const TemporaryTables* temporary_tables) {
179  std::vector<InnerOuter> result;
180  const auto lhs_tuple_expr =
181  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
182  const auto rhs_tuple_expr =
183  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
184 
185  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
186  if (lhs_tuple_expr) {
187  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
188  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
189  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
190  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
191  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
192  rhs_tuple[i].get(),
193  cat,
194  temporary_tables,
195  condition->is_overlaps_oper()));
196  }
197  } else {
198  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
199  result.push_back(normalize_column_pair(condition->get_left_operand(),
200  condition->get_right_operand(),
201  cat,
202  temporary_tables,
203  condition->is_overlaps_oper()));
204  }
205 
206  return result;
207 }
208 
209 namespace {
210 
211 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> get_cols(
212  const Analyzer::BinOper* qual_bin_oper,
214  const TemporaryTables* temporary_tables) {
215  const auto lhs = qual_bin_oper->get_left_operand();
216  const auto rhs = qual_bin_oper->get_right_operand();
217  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
218 }
219 
221  ExpressionRange const& col_range,
222  bool const is_bw_eq) {
223  using EmptyRangeSize = boost::optional<size_t>;
224  auto empty_range_check = [](ExpressionRange const& col_range,
225  bool const is_bw_eq) -> EmptyRangeSize {
226  if (col_range.getIntMin() > col_range.getIntMax()) {
227  CHECK_EQ(col_range.getIntMin(), int64_t(0));
228  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
229  if (is_bw_eq) {
230  return size_t(1);
231  }
232  return size_t(0);
233  }
234  return EmptyRangeSize{};
235  };
236 
237  auto empty_range = empty_range_check(col_range, is_bw_eq);
238  if (empty_range) {
239  return {size_t(*empty_range), 1};
240  }
241 
242  int64_t bucket_normalization =
243  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
244  CHECK_GT(bucket_normalization, 0);
245  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
246  bucket_normalization};
247 }
248 
249 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
250  if (col_range.getIntMin() > col_range.getIntMax()) {
251  CHECK_EQ(col_range.getIntMin(), int64_t(0));
252  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
253  return is_bw_eq ? 1 : 0;
254  }
255  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
256 }
257 
258 } // namespace
259 
260 std::vector<std::pair<JoinHashTable::JoinHashTableCacheKey,
261  std::shared_ptr<std::vector<int32_t>>>>
264 
265 size_t get_shard_count(const Analyzer::BinOper* join_condition,
266  const Executor* executor) {
267  const Analyzer::ColumnVar* inner_col{nullptr};
268  const Analyzer::Expr* outer_col{nullptr};
269  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
270  try {
271  std::tie(inner_col, outer_col) =
272  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
273  } catch (...) {
274  return 0;
275  }
276  if (!inner_col || !outer_col) {
277  return 0;
278  }
279  return get_shard_count({inner_col, outer_col}, executor);
280 }
281 
282 namespace {
283 
284 bool shard_count_less_or_equal_device_count(const int inner_table_id,
285  const Executor* executor) {
286  const auto inner_table_info = executor->getTableInfo(inner_table_id);
287  std::unordered_set<int> device_holding_fragments;
288  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
289  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
290  for (const auto& fragment : inner_table_info.fragments) {
291  if (fragment.shard != -1) {
292  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
293  if (!it_ok.second) {
294  return false;
295  }
296  }
297  }
298  return true;
299 }
300 
301 } // namespace
302 
304  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
305  const Executor* executor) {
306  const auto inner_col = equi_pair.first;
307  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
308  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
309  return 0;
310  }
311  if (outer_col->get_rte_idx()) {
312  return 0;
313  }
314  if (inner_col->get_type_info() != outer_col->get_type_info()) {
315  return 0;
316  }
317  const auto catalog = executor->getCatalog();
318  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
319  CHECK(inner_td);
320  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
321  CHECK(outer_td);
322  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
323  inner_td->nShards != outer_td->nShards) {
324  return 0;
325  }
326  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
327  return 0;
328  }
329  // The two columns involved must be the ones on which the tables have been sharded on.
330  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
331  outer_td->shardedColumnId == outer_col->get_column_id()) ||
332  (outer_td->shardedColumnId == inner_col->get_column_id() &&
333  inner_td->shardedColumnId == inner_col->get_column_id())
334  ? inner_td->nShards
335  : 0;
336 }
337 
339 std::shared_ptr<JoinHashTable> JoinHashTable::getInstance(
340  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
341  const std::vector<InputTableInfo>& query_infos,
342  const Data_Namespace::MemoryLevel memory_level,
343  const HashType preferred_hash_type,
344  const int device_count,
345  ColumnCacheMap& column_cache,
346  Executor* executor) {
347  decltype(std::chrono::steady_clock::now()) ts1, ts2;
348  if (VLOGGING(1)) {
349  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
350  << " for qual: " << qual_bin_oper->toString();
351  ts1 = std::chrono::steady_clock::now();
352  }
353  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
354  const auto cols =
355  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
356  const auto inner_col = cols.first;
357  CHECK(inner_col);
358  const auto& ti = inner_col->get_type_info();
359  auto col_range =
360  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
361  if (col_range.getType() == ExpressionRangeType::Invalid) {
362  throw HashJoinFail(
363  "Could not compute range for the expressions involved in the equijoin");
364  }
365  if (ti.is_string()) {
366  // The nullable info must be the same as the source column.
367  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
368  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
369  throw HashJoinFail(
370  "Could not compute range for the expressions involved in the equijoin");
371  }
372  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
373  // If the inner column expression range is empty, use the inner col range
374  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
375  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
376  col_range = source_col_range;
377  } else {
378  col_range = ExpressionRange::makeIntRange(
379  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
380  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
381  0,
382  source_col_range.hasNulls());
383  }
384  }
385  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
386  const auto max_hash_entry_count =
388  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
389  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
390 
391  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
392  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
393  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
394 
395  if (bucketized_entry_count > max_hash_entry_count) {
396  throw TooManyHashEntries();
397  }
398 
399  if (qual_bin_oper->get_optype() == kBW_EQ &&
400  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
401  throw HashJoinFail("Cannot translate null value for kBW_EQ");
402  }
403  auto join_hash_table =
404  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
405  inner_col,
406  query_infos,
407  memory_level,
408  preferred_hash_type,
409  col_range,
410  column_cache,
411  executor,
412  device_count));
413  try {
414  join_hash_table->reify();
415  } catch (const TableMustBeReplicated& e) {
416  // Throw a runtime error to abort the query
417  join_hash_table->freeHashBufferMemory();
418  throw std::runtime_error(e.what());
419  } catch (const HashJoinFail& e) {
420  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
421  // possible)
422  join_hash_table->freeHashBufferMemory();
423  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
424  "involved in equijoin | ") +
425  e.what());
426  } catch (const ColumnarConversionNotSupported& e) {
427  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
428  e.what());
429  } catch (const OutOfMemory& e) {
430  throw HashJoinFail(
431  std::string("Ran out of memory while building hash tables for equijoin | ") +
432  e.what());
433  } catch (const std::exception& e) {
434  throw std::runtime_error(
435  std::string("Fatal error while attempting to build hash tables for join: ") +
436  e.what());
437  }
438  if (VLOGGING(1)) {
439  ts2 = std::chrono::steady_clock::now();
440  VLOG(1) << "Built perfect hash table "
441  << getHashTypeString(join_hash_table->getHashType()) << " in "
442  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
443  << " ms";
444  }
445  return join_hash_table;
446 }
447 
449  const Analyzer::Expr* outer_col_expr,
450  const Executor* executor) {
451  const auto catalog = executor->getCatalog();
452  CHECK(catalog);
453  const auto inner_cd = get_column_descriptor_maybe(
454  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
455  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
456  inner_col->get_table_id(),
457  inner_cd,
458  executor->getTemporaryTables());
459  // Only strings may need dictionary translation.
460  if (!inner_ti.is_string()) {
461  return false;
462  }
463  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
464  CHECK(outer_col);
465  const auto outer_cd = get_column_descriptor_maybe(
466  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
467  // Don't want to deal with temporary tables for now, require translation.
468  if (!inner_cd || !outer_cd) {
469  return true;
470  }
471  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
472  outer_col->get_table_id(),
473  outer_cd,
474  executor->getTemporaryTables());
475  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
476  // If the two columns don't share the dictionary, translation is needed.
477  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
478 }
479 
480 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
481  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
482  const int device_id,
483  const int device_count) {
484  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
485  for (const auto& fragment : fragments) {
486  CHECK_GE(fragment.shard, 0);
487  if (fragment.shard % device_count == device_id) {
488  shards_for_device.push_back(fragment);
489  }
490  }
491  return shards_for_device;
492 }
493 
495  auto timer = DEBUG_TIMER(__func__);
497  const auto& catalog = *executor_->getCatalog();
498  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
499  const auto inner_col = cols.first;
500  checkHashJoinReplicationConstraint(inner_col->get_table_id());
501  const auto& query_info = getInnerQueryInfo(inner_col).info;
502  if (query_info.fragments.empty()) {
503  return;
504  }
505  if (query_info.getNumTuplesUpperBound() >
506  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
507  throw TooManyHashEntries();
508  }
509 #ifdef HAVE_CUDA
510  gpu_hash_table_buff_.resize(device_count_);
511  gpu_hash_table_err_buff_.resize(device_count_);
512 #endif // HAVE_CUDA
513  std::vector<std::future<void>> init_threads;
514  const int shard_count = shardCount();
515 
516  try {
517  for (int device_id = 0; device_id < device_count_; ++device_id) {
518  const auto fragments =
519  shard_count
520  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
521  : query_info.fragments;
522  init_threads.push_back(
523  std::async(std::launch::async,
527  this,
528  fragments,
529  device_id,
530  logger::thread_id()));
531  }
532  for (auto& init_thread : init_threads) {
533  init_thread.wait();
534  }
535  for (auto& init_thread : init_threads) {
536  init_thread.get();
537  }
538 
539  } catch (const NeedsOneToManyHash& e) {
542  init_threads.clear();
543  for (int device_id = 0; device_id < device_count_; ++device_id) {
544  const auto fragments =
545  shard_count
546  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
547  : query_info.fragments;
548 
549  init_threads.push_back(std::async(std::launch::async,
551  this,
552  fragments,
553  device_id,
554  logger::thread_id()));
555  }
556  for (auto& init_thread : init_threads) {
557  init_thread.wait();
558  }
559  for (auto& init_thread : init_threads) {
560  init_thread.get();
561  }
562  }
563 }
564 
566 #ifdef HAVE_CUDA
567  CHECK(executor_);
568  CHECK(executor_->catalog_);
569  auto& data_mgr = executor_->catalog_->getDataMgr();
570  for (auto& gpu_buffer : gpu_hash_table_buff_) {
571  if (gpu_buffer) {
572  data_mgr.free(gpu_buffer);
573  }
574  }
575  for (auto& gpu_buffer : gpu_hash_table_err_buff_) {
576  if (gpu_buffer) {
577  data_mgr.free(gpu_buffer);
578  }
579  }
580 #endif
581 }
582 
584  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
585  const Analyzer::Expr* outer_col_expr,
586  const Analyzer::ColumnVar* inner_col) const {
587  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
588  inner_col->get_table_id(),
589  inner_col->get_column_id()};
590  const auto& ti = inner_col->get_type_info();
591  if (ti.is_string()) {
592  CHECK_EQ(kENCODING_DICT, ti.get_compression());
593  size_t outer_elem_count = 0;
594  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
595  CHECK(outer_col);
596  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
597  for (auto& frag : outer_query_info.fragments) {
598  outer_elem_count = frag.getNumTuples();
599  }
600  hash_table_key.push_back(outer_elem_count);
601  }
602  if (fragments.size() < 2) {
603  hash_table_key.push_back(fragments.front().fragmentId);
604  }
605  return hash_table_key;
606 }
607 
609  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
610  const int device_id,
611  const logger::ThreadId parent_thread_id) {
612  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
613  const auto& catalog = *executor_->getCatalog();
614  auto& data_mgr = catalog.getDataMgr();
615  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
616  const auto inner_col = cols.first;
617  CHECK(inner_col);
618  const auto inner_cd = get_column_descriptor_maybe(
619  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
620  if (inner_cd && inner_cd->isVirtualCol) {
622  }
623  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
624  // Since we don't have the string dictionary payloads on the GPU, we'll build
625  // the join hash table on the CPU and transfer it to the GPU.
626  const auto effective_memory_level =
627  needs_dictionary_translation(inner_col, cols.second, executor_)
629  : memory_level_;
630  if (fragments.empty()) {
631  // No data in this fragment. Still need to create a hash table and initialize it
632  // properly.
633  ChunkKey empty_chunk;
634  initOneToOneHashTable(empty_chunk,
635  JoinColumn{nullptr, 0, 0, 0, 0},
636  cols,
637  effective_memory_level,
638  device_id);
639  return;
640  }
641 
642  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
643  std::unique_ptr<DeviceAllocator> device_allocator;
644  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
645  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
646  }
647  std::vector<std::shared_ptr<void>> malloc_owner;
648 
649  JoinColumn join_column = fetchJoinColumn(inner_col,
650  fragments,
651  effective_memory_level,
652  device_id,
653  chunks_owner,
654  device_allocator.get(),
655  malloc_owner,
656  executor_,
657  &column_cache_);
658 
659  initOneToOneHashTable(genHashTableKey(fragments, cols.second, inner_col),
660  join_column,
661  cols,
662  effective_memory_level,
663  device_id);
664 }
665 
667  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
668  const int device_id,
669  const logger::ThreadId parent_thread_id) {
670  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
671  const auto& catalog = *executor_->getCatalog();
672  auto& data_mgr = catalog.getDataMgr();
673  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
674  const auto inner_col = cols.first;
675  CHECK(inner_col);
676  const auto inner_cd = get_column_descriptor_maybe(
677  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
678  if (inner_cd && inner_cd->isVirtualCol) {
680  }
681  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
682  // Since we don't have the string dictionary payloads on the GPU, we'll build
683  // the join hash table on the CPU and transfer it to the GPU.
684  const auto effective_memory_level =
685  needs_dictionary_translation(inner_col, cols.second, executor_)
687  : memory_level_;
688  if (fragments.empty()) {
689  ChunkKey empty_chunk;
690  initOneToManyHashTable(empty_chunk,
691  JoinColumn{nullptr, 0, 0, 0, 0},
692  cols,
693  effective_memory_level,
694  device_id);
695  return;
696  }
697 
698  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
699  std::unique_ptr<DeviceAllocator> device_allocator;
700  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
701  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
702  }
703  std::vector<std::shared_ptr<void>> malloc_owner;
704 
705  JoinColumn join_column = fetchJoinColumn(inner_col,
706  fragments,
707  effective_memory_level,
708  device_id,
709  chunks_owner,
710  device_allocator.get(),
711  malloc_owner,
712  executor_,
713  &column_cache_);
714 
715  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
716  join_column,
717  cols,
718  effective_memory_level,
719  device_id);
720 }
721 
723  if (!g_cluster) {
724  return;
725  }
726  if (table_id >= 0) {
727  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
728  CHECK(inner_td);
729  size_t shard_count{0};
730  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
731  if (!shard_count && !table_is_replicated(inner_td)) {
732  throw TableMustBeReplicated(inner_td->tableName);
733  }
734  }
735 }
736 
738  const JoinColumn& join_column,
739  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
740  const HashEntryInfo hash_entry_info,
741  const int32_t hash_join_invalid_val) {
742  auto timer = DEBUG_TIMER(__func__);
743  const auto inner_col = cols.first;
744  CHECK(inner_col);
745  const auto& ti = inner_col->get_type_info();
746  if (!cpu_hash_table_buff_) {
747  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
748  hash_entry_info.getNormalizedHashEntryCount());
749  const StringDictionaryProxy* sd_inner_proxy{nullptr};
750  const StringDictionaryProxy* sd_outer_proxy{nullptr};
751  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
752  if (ti.is_string() &&
753  (outer_col && !(inner_col->get_comp_param() == outer_col->get_comp_param()))) {
754  CHECK_EQ(kENCODING_DICT, ti.get_compression());
755  sd_inner_proxy = executor_->getStringDictionaryProxy(
756  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
757  CHECK(sd_inner_proxy);
758  CHECK(outer_col);
759  sd_outer_proxy = executor_->getStringDictionaryProxy(
760  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
761  CHECK(sd_outer_proxy);
762  }
763  int thread_count = cpu_threads();
764  std::vector<std::thread> init_cpu_buff_threads;
765  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
766  init_cpu_buff_threads.emplace_back(
767  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
769  hash_entry_info.getNormalizedHashEntryCount(),
770  hash_join_invalid_val,
771  thread_idx,
772  thread_count);
773  });
774  }
775  for (auto& t : init_cpu_buff_threads) {
776  t.join();
777  }
778  init_cpu_buff_threads.clear();
779  std::atomic<int> err{0};
780  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
781  init_cpu_buff_threads.emplace_back([this,
782  hash_join_invalid_val,
783  &join_column,
784  sd_inner_proxy,
785  sd_outer_proxy,
786  thread_idx,
787  thread_count,
788  &ti,
789  &err,
790  hash_entry_info] {
791  int partial_err =
793  hash_join_invalid_val,
794  join_column,
795  {static_cast<size_t>(ti.get_size()),
799  isBitwiseEq(),
800  col_range_.getIntMax() + 1,
802  sd_inner_proxy,
803  sd_outer_proxy,
804  thread_idx,
805  thread_count,
806  hash_entry_info.bucket_normalization);
807  int zero{0};
808  err.compare_exchange_strong(zero, partial_err);
809  });
810  }
811  for (auto& t : init_cpu_buff_threads) {
812  t.join();
813  }
814  if (err) {
815  cpu_hash_table_buff_.reset();
816  // Too many hash entries, need to retry with a 1:many table
817  throw NeedsOneToManyHash();
818  }
819  } else {
820  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
821  // Too many hash entries, need to retry with a 1:many table
822  throw NeedsOneToManyHash();
823  }
824  }
825 }
826 
828  const JoinColumn& join_column,
829  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
830  const HashEntryInfo hash_entry_info,
831  const int32_t hash_join_invalid_val) {
832  auto timer = DEBUG_TIMER(__func__);
833  const auto inner_col = cols.first;
834  CHECK(inner_col);
835  const auto& ti = inner_col->get_type_info();
836  if (cpu_hash_table_buff_) {
837  return;
838  }
839  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
840  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems);
841  const StringDictionaryProxy* sd_inner_proxy{nullptr};
842  const StringDictionaryProxy* sd_outer_proxy{nullptr};
843  if (ti.is_string()) {
844  CHECK_EQ(kENCODING_DICT, ti.get_compression());
845  sd_inner_proxy = executor_->getStringDictionaryProxy(
846  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
847  CHECK(sd_inner_proxy);
848  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
849  CHECK(outer_col);
850  sd_outer_proxy = executor_->getStringDictionaryProxy(
851  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
852  CHECK(sd_outer_proxy);
853  }
854  int thread_count = cpu_threads();
855  std::vector<std::future<void>> init_threads;
856  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
857  init_threads.emplace_back(std::async(std::launch::async,
859  &(*cpu_hash_table_buff_)[0],
860  hash_entry_info.getNormalizedHashEntryCount(),
861  hash_join_invalid_val,
862  thread_idx,
863  thread_count));
864  }
865  for (auto& child : init_threads) {
866  child.wait();
867  }
868  for (auto& child : init_threads) {
869  child.get();
870  }
871 
872  if (ti.get_type() == kDATE) {
874  hash_entry_info,
875  hash_join_invalid_val,
876  join_column,
877  {static_cast<size_t>(ti.get_size()),
881  isBitwiseEq(),
882  col_range_.getIntMax() + 1,
884  sd_inner_proxy,
885  sd_outer_proxy,
886  thread_count);
887  } else {
889  hash_entry_info,
890  hash_join_invalid_val,
891  join_column,
892  {static_cast<size_t>(ti.get_size()),
896  isBitwiseEq(),
897  col_range_.getIntMax() + 1,
899  sd_inner_proxy,
900  sd_outer_proxy,
901  thread_count);
902  }
903 }
904 
905 namespace {
906 
907 #ifdef HAVE_CUDA
908 // Number of entries per shard, rounded up.
909 size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count) {
910  CHECK_NE(size_t(0), shard_count);
911  return (total_entry_count + shard_count - 1) / shard_count;
912 }
913 #endif // HAVE_CUDA
914 
915 } // namespace
916 
918  const ChunkKey& chunk_key,
919  const JoinColumn& join_column,
920  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
921  const Data_Namespace::MemoryLevel effective_memory_level,
922  const int device_id) {
923  auto timer = DEBUG_TIMER(__func__);
924  const auto inner_col = cols.first;
925  CHECK(inner_col);
926 
927  auto hash_entry_info = get_bucketized_hash_entry_info(
928  inner_col->get_type_info(), col_range_, isBitwiseEq());
929  if (!hash_entry_info) {
930  return;
931  }
932 
933 #ifdef HAVE_CUDA
934  const auto shard_count = shardCount();
935  const size_t entries_per_shard{
936  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
937  : 0};
938  // Even if we join on dictionary encoded strings, the memory on the GPU is still
939  // needed once the join hash table has been built on the CPU.
940  const auto catalog = executor_->getCatalog();
942  auto& data_mgr = catalog->getDataMgr();
943  if (shard_count) {
944  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
945  CHECK_GT(shards_per_device, 0u);
946  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
947  }
948  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
949  &data_mgr,
950  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
951  device_id);
952  }
953 #else
954  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
955 #endif
956  if (!device_id) {
957  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
958  }
959 
960 #ifdef HAVE_CUDA
961  const auto& ti = inner_col->get_type_info();
962 #endif
963  const int32_t hash_join_invalid_val{-1};
964  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
965  CHECK(!chunk_key.empty());
966  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
967  {
968  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
970  join_column, cols, hash_entry_info, hash_join_invalid_val);
971  }
972  if (inner_col->get_table_id() > 0) {
973  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
974  }
975  // Transfer the hash table on the GPU if we've only built it on CPU
976  // but the query runs on GPU (join on dictionary encoded columns).
978 #ifdef HAVE_CUDA
979  CHECK(ti.is_string());
980  auto& data_mgr = catalog->getDataMgr();
981  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
982 
983  copy_to_gpu(
984  &data_mgr,
985  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
986  &(*cpu_hash_table_buff_)[0],
987  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
988  device_id);
989 #else
990  CHECK(false);
991 #endif
992  }
993  } else {
994 #ifdef HAVE_CUDA
995  int err{0};
996  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
997  auto& data_mgr = catalog->getDataMgr();
998  gpu_hash_table_err_buff_[device_id] =
999  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
1000  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
1001  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
1002  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1004  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1005  hash_entry_info.getNormalizedHashEntryCount(),
1006  hash_join_invalid_val,
1007  executor_->blockSize(),
1008  executor_->gridSize());
1009  if (chunk_key.empty()) {
1010  return;
1011  }
1012  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1016  isBitwiseEq(),
1017  col_range_.getIntMax() + 1,
1019  if (shard_count) {
1020  CHECK_GT(device_count_, 0);
1021  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1022  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1024  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1025  hash_join_invalid_val,
1026  reinterpret_cast<int*>(dev_err_buff),
1027  join_column,
1028  type_info,
1029  shard_info,
1030  executor_->blockSize(),
1031  executor_->gridSize(),
1032  hash_entry_info.bucket_normalization);
1033  }
1034  } else {
1036  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1037  hash_join_invalid_val,
1038  reinterpret_cast<int*>(dev_err_buff),
1039  join_column,
1040  type_info,
1041  executor_->blockSize(),
1042  executor_->gridSize(),
1043  hash_entry_info.bucket_normalization);
1044  }
1045  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1046 
1047  if (err) {
1048  throw NeedsOneToManyHash();
1049  }
1050 #else
1051  CHECK(false);
1052 #endif
1053  }
1054 }
1055 
1057  const ChunkKey& chunk_key,
1058  const JoinColumn& join_column,
1059  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
1060  const Data_Namespace::MemoryLevel effective_memory_level,
1061  const int device_id) {
1062  auto timer = DEBUG_TIMER(__func__);
1063  auto const inner_col = cols.first;
1064  CHECK(inner_col);
1065 
1066  auto hash_entry_info = get_bucketized_hash_entry_info(
1067  inner_col->get_type_info(), col_range_, isBitwiseEq());
1068 
1069 #ifdef HAVE_CUDA
1070  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1071  const size_t entries_per_shard =
1072  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1073  : 0);
1074  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1075  // needed once the join hash table has been built on the CPU.
1076  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1077  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1078  CHECK_GT(shards_per_device, 0u);
1079  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1080  }
1081 #else
1082  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1083 #endif
1084  if (!device_id) {
1085  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1086  }
1087 
1088 #ifdef HAVE_CUDA
1089  const auto& ti = inner_col->get_type_info();
1090  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1092  const size_t total_count =
1093  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
1094  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1095  &data_mgr, total_count * sizeof(int32_t), device_id);
1096  }
1097 #endif
1098  const int32_t hash_join_invalid_val{-1};
1099  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1100  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
1101  {
1102  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1104  join_column, cols, hash_entry_info, hash_join_invalid_val);
1105  }
1106  if (inner_col->get_table_id() > 0) {
1107  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
1108  }
1109  // Transfer the hash table on the GPU if we've only built it on CPU
1110  // but the query runs on GPU (join on dictionary encoded columns).
1111  // Don't transfer the buffer if there was an error since we'll bail anyway.
1113 #ifdef HAVE_CUDA
1114  CHECK(ti.is_string());
1115  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1116  copy_to_gpu(
1117  &data_mgr,
1118  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1119  &(*cpu_hash_table_buff_)[0],
1120  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1121  device_id);
1122 #else
1123  CHECK(false);
1124 #endif
1125  }
1126  } else {
1127 #ifdef HAVE_CUDA
1128  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1129  data_mgr.getCudaMgr()->setContext(device_id);
1131  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1132  hash_entry_info.getNormalizedHashEntryCount(),
1133  hash_join_invalid_val,
1134  executor_->blockSize(),
1135  executor_->gridSize());
1136  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1140  isBitwiseEq(),
1141  col_range_.getIntMax() + 1,
1143  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1144 
1145  if (shard_count) {
1146  CHECK_GT(device_count_, 0);
1147  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1148  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1150  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1151  hash_entry_info,
1152  hash_join_invalid_val,
1153  join_column,
1154  type_info,
1155  shard_info,
1156  executor_->blockSize(),
1157  executor_->gridSize());
1158  }
1159  } else {
1160  if (use_bucketization) {
1162  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1163  hash_entry_info,
1164  hash_join_invalid_val,
1165  join_column,
1166  type_info,
1167  executor_->blockSize(),
1168  executor_->gridSize());
1169  } else {
1171  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1172  hash_entry_info,
1173  hash_join_invalid_val,
1174  join_column,
1175  type_info,
1176  executor_->blockSize(),
1177  executor_->gridSize());
1178  }
1179  }
1180 #else
1181  CHECK(false);
1182 #endif
1183  }
1184 }
1185 
1187  const ChunkKey& chunk_key,
1188  const size_t num_elements,
1189  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1190  auto timer = DEBUG_TIMER(__func__);
1191  CHECK_GE(chunk_key.size(), size_t(2));
1192  if (chunk_key[1] < 0) {
1193  // Do not cache hash tables over intermediate results
1194  return;
1195  }
1196  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1198  *cols.first,
1199  outer_col ? *outer_col : *cols.first,
1200  num_elements,
1201  chunk_key,
1202  qual_bin_oper_->get_optype()};
1203  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1204  for (const auto& kv : join_hash_table_cache_) {
1205  if (kv.first == cache_key) {
1206  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1207  cpu_hash_table_buff_ = kv.second;
1208  break;
1209  }
1210  }
1211 }
1212 
1214  const ChunkKey& chunk_key,
1215  const size_t num_elements,
1216  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1217  CHECK_GE(chunk_key.size(), size_t(2));
1218  if (chunk_key[1] < 0) {
1219  // Do not cache hash tables over intermediate results
1220  return;
1221  }
1222  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1224  *cols.first,
1225  outer_col ? *outer_col : *cols.first,
1226  num_elements,
1227  chunk_key,
1228  qual_bin_oper_->get_optype()};
1229  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1230  for (const auto& kv : join_hash_table_cache_) {
1231  if (kv.first == cache_key) {
1232  return;
1233  }
1234  }
1235  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1236 }
1237 
1238 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx) {
1239  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1240  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1241  if (hash_ptr->getType()->isIntegerTy(64)) {
1242  return hash_ptr;
1243  }
1244  CHECK(hash_ptr->getType()->isPointerTy());
1245  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1246  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1247  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1248 }
1249 
1250 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx,
1251  Executor* executor) {
1252  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1253  llvm::Value* hash_ptr = nullptr;
1254  const auto total_table_count =
1255  executor->plan_state_->join_info_.join_hash_tables_.size();
1256  CHECK_LT(table_idx, total_table_count);
1257  if (total_table_count > 1) {
1258  auto hash_tables_ptr =
1259  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1260  auto hash_pptr =
1261  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1262  hash_tables_ptr,
1263  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1264  : hash_tables_ptr;
1265  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1266  } else {
1267  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1268  }
1269  CHECK(hash_ptr);
1270  return hash_ptr;
1271 }
1272 
1273 std::vector<llvm::Value*> JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr,
1274  const Analyzer::Expr* key_col,
1275  const int shard_count,
1276  const CompilationOptions& co) {
1277  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1278  CodeGenerator code_generator(executor_);
1279  const auto key_lvs = code_generator.codegen(key_col, true, co);
1280  CHECK_EQ(size_t(1), key_lvs.size());
1281  auto const& key_col_ti = key_col->get_type_info();
1282  auto hash_entry_info =
1284 
1285  std::vector<llvm::Value*> hash_join_idx_args{
1286  hash_ptr,
1287  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1288  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1289  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1290  if (shard_count) {
1291  const auto expected_hash_entry_count =
1293  const auto entry_count_per_shard =
1294  (expected_hash_entry_count + shard_count - 1) / shard_count;
1295  hash_join_idx_args.push_back(
1296  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1297  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1298  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1299  }
1300  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1301  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1302  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1303  inline_fixed_encoding_null_val(key_col_logical_ti)));
1304  }
1305  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1306  if (isBitwiseEq()) {
1307  if (special_date_bucketization_case) {
1308  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1309  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1310  } else {
1311  hash_join_idx_args.push_back(
1312  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1313  }
1314  }
1315 
1316  if (special_date_bucketization_case) {
1317  hash_join_idx_args.emplace_back(
1318  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1319  }
1320 
1321  return hash_join_idx_args;
1322 }
1323 
1325  const size_t index) {
1326  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1327  const auto cols = get_cols(
1328  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1329  auto key_col = cols.second;
1330  CHECK(key_col);
1331  auto val_col = cols.first;
1332  CHECK(val_col);
1333  auto pos_ptr = codegenHashTableLoad(index);
1334  CHECK(pos_ptr);
1335  const int shard_count = shardCount();
1336  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1337  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1338  if (key_col_var && val_col_var &&
1340  key_col_var,
1341  val_col_var,
1342  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1343  throw std::runtime_error(
1344  "Query execution fails because the query contains not supported self-join "
1345  "pattern. We suspect the query requires multiple left-deep join tree due to the "
1346  "join condition of the self-join and is not supported for now. Please consider "
1347  "rewriting table order in "
1348  "FROM clause.");
1349  }
1350  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1351  const int64_t sub_buff_size = getComponentBufferSize();
1352  const auto& key_col_ti = key_col->get_type_info();
1353 
1354  auto bucketize = (key_col_ti.get_type() == kDATE);
1355  return codegenMatchingSet(hash_join_idx_args,
1356  shard_count,
1357  !key_col_ti.get_notnull(),
1358  isBitwiseEq(),
1359  sub_buff_size,
1360  executor_,
1361  bucketize);
1362 }
1363 
1365  const std::vector<llvm::Value*>& hash_join_idx_args_in,
1366  const bool is_sharded,
1367  const bool col_is_nullable,
1368  const bool is_bw_eq,
1369  const int64_t sub_buff_size,
1370  Executor* executor,
1371  bool is_bucketized) {
1372  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
1373  using namespace std::string_literals;
1374 
1375  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1376 
1377  if (is_bw_eq) {
1378  fname += "_bitwise";
1379  }
1380  if (is_sharded) {
1381  fname += "_sharded";
1382  }
1383  if (!is_bw_eq && col_is_nullable) {
1384  fname += "_nullable";
1385  }
1386 
1387  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1388  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1389  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1390 
1391  auto pos_ptr = hash_join_idx_args_in[0];
1392  CHECK(pos_ptr);
1393 
1394  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1395  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1396  auto hash_join_idx_args = hash_join_idx_args_in;
1397  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1398  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1399 
1400  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1401  slot_valid_lv,
1402  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1403  executor->cgen_state_->llInt(int64_t(0)));
1404  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1405  executor->cgen_state_->ir_builder_.CreateAdd(
1406  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1407  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1408  auto rowid_ptr_i32 =
1409  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1410  return {rowid_ptr_i32, row_count_lv, slot_lv};
1411 }
1412 
1413 size_t JoinHashTable::offsetBufferOff() const noexcept {
1414  return 0;
1415 }
1416 
1417 size_t JoinHashTable::countBufferOff() const noexcept {
1418  return getComponentBufferSize();
1419 }
1420 
1421 size_t JoinHashTable::payloadBufferOff() const noexcept {
1422  return 2 * getComponentBufferSize();
1423 }
1424 
1425 size_t JoinHashTable::getComponentBufferSize() const noexcept {
1427  return hash_entry_count_ * sizeof(int32_t);
1428  } else {
1429  return 0;
1430  }
1431 }
1432 
1434  const int device_id) const noexcept {
1435  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1436  return 0;
1437  }
1438 #ifdef HAVE_CUDA
1439  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1440  if (device_type == ExecutorDeviceType::CPU) {
1441  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1442  } else {
1443  return gpu_hash_table_buff_[device_id]
1444  ? reinterpret_cast<CUdeviceptr>(
1445  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1446  : reinterpret_cast<CUdeviceptr>(nullptr);
1447  }
1448 #else
1449  CHECK(device_type == ExecutorDeviceType::CPU);
1450  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1451 #endif
1452 }
1453 
1455  const int device_id) const noexcept {
1456  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1457  return 0;
1458  }
1459 #ifdef HAVE_CUDA
1460  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1461  if (device_type == ExecutorDeviceType::CPU) {
1462  return cpu_hash_table_buff_->size() *
1463  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1464  } else {
1465  return gpu_hash_table_buff_[device_id]
1466  ? gpu_hash_table_buff_[device_id]->reservedSize()
1467  : 0;
1468  }
1469 #else
1470  CHECK(device_type == ExecutorDeviceType::CPU);
1471  return cpu_hash_table_buff_->size() *
1472  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1473 #endif
1474 }
1475 
1476 std::string JoinHashTable::toString(const ExecutorDeviceType device_type,
1477  const int device_id,
1478  bool raw) const {
1479  auto buffer = getJoinHashBuffer(device_type, device_id);
1480  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1481 #ifdef HAVE_CUDA
1482  std::unique_ptr<int8_t[]> buffer_copy;
1483  if (device_type == ExecutorDeviceType::GPU) {
1484  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1485 
1486  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1487  buffer_copy.get(),
1488  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1489  buffer_size,
1490  device_id);
1491  }
1492  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1493 #else
1494  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1495 #endif // HAVE_CUDA
1496  auto ptr2 = ptr1 + offsetBufferOff();
1497  auto ptr3 = ptr1 + countBufferOff();
1498  auto ptr4 = ptr1 + payloadBufferOff();
1499  return JoinHashTableInterface::toString("perfect",
1501  0,
1502  0,
1504  ptr1,
1505  ptr2,
1506  ptr3,
1507  ptr4,
1508  buffer_size,
1509  raw);
1510 }
1511 
1512 std::set<DecodedJoinHashBufferEntry> JoinHashTable::toSet(
1513  const ExecutorDeviceType device_type,
1514  const int device_id) const {
1515  auto buffer = getJoinHashBuffer(device_type, device_id);
1516  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1517 #ifdef HAVE_CUDA
1518  std::unique_ptr<int8_t[]> buffer_copy;
1519  if (device_type == ExecutorDeviceType::GPU) {
1520  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1521 
1522  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1523  buffer_copy.get(),
1524  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1525  buffer_size,
1526  device_id);
1527  }
1528  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1529 #else
1530  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1531 #endif // HAVE_CUDA
1532  auto ptr2 = ptr1 + offsetBufferOff();
1533  auto ptr3 = ptr1 + countBufferOff();
1534  auto ptr4 = ptr1 + payloadBufferOff();
1536  0, 0, hash_entry_count_, ptr1, ptr2, ptr3, ptr4, buffer_size);
1537 }
1538 
1540  const size_t index) {
1541  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1542  using namespace std::string_literals;
1543 
1545  const auto cols = get_cols(
1546  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1547  auto key_col = cols.second;
1548  CHECK(key_col);
1549  auto val_col = cols.first;
1550  CHECK(val_col);
1551  CodeGenerator code_generator(executor_);
1552  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(key_col);
1553  const auto val_col_var = dynamic_cast<const Analyzer::ColumnVar*>(val_col);
1554  if (key_col_var && val_col_var &&
1556  key_col_var,
1557  val_col_var,
1558  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
1559  throw std::runtime_error(
1560  "Query execution fails because the query contains not supported self-join "
1561  "pattern. We suspect the query requires multiple left-deep join tree due to the "
1562  "join condition of the self-join and is not supported for now. Please consider "
1563  "rewriting table order in "
1564  "FROM clause.");
1565  }
1566  const auto key_lvs = code_generator.codegen(key_col, true, co);
1567  CHECK_EQ(size_t(1), key_lvs.size());
1568  auto hash_ptr = codegenHashTableLoad(index);
1569  CHECK(hash_ptr);
1570  const int shard_count = shardCount();
1571  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1572 
1573  const auto& key_col_ti = key_col->get_type_info();
1574  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1575  : "hash_join_idx"s);
1576 
1577  if (isBitwiseEq()) {
1578  fname += "_bitwise";
1579  }
1580  if (shard_count) {
1581  fname += "_sharded";
1582  }
1583 
1584  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1585  fname += "_nullable";
1586  }
1587  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1588 }
1589 
1591  const Analyzer::ColumnVar* inner_col) const {
1592  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1593 }
1594 
1596  const int inner_table_id,
1597  const std::vector<InputTableInfo>& query_infos) {
1598  std::optional<size_t> ti_idx;
1599  for (size_t i = 0; i < query_infos.size(); ++i) {
1600  if (inner_table_id == query_infos[i].table_id) {
1601  ti_idx = i;
1602  break;
1603  }
1604  }
1605  CHECK(ti_idx);
1606  return query_infos[*ti_idx];
1607 }
1608 
1609 size_t get_entries_per_device(const size_t total_entries,
1610  const size_t shard_count,
1611  const size_t device_count,
1612  const Data_Namespace::MemoryLevel memory_level) {
1613  const auto entries_per_shard =
1614  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1615  size_t entries_per_device = entries_per_shard;
1616  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1617  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1618  CHECK_GT(shards_per_device, 0u);
1619  entries_per_device = entries_per_shard * shards_per_device;
1620  }
1621  return entries_per_device;
1622 }
1623 
1624 // TODO(adb): unify with BaselineJoinHashTable
1628  : 0;
1629 }
1630 
1632  return qual_bin_oper_->get_optype() == kBW_EQ;
1633 }
1634 
1636 #ifdef HAVE_CUDA
1638 #endif
1640 }
1641 
1643 #ifdef HAVE_CUDA
1644  const auto& catalog = *executor_->getCatalog();
1645  auto& data_mgr = catalog.getDataMgr();
1646  for (auto& buf : gpu_hash_table_buff_) {
1647  if (buf) {
1648  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1649  buf = nullptr;
1650  }
1651  }
1652  for (auto& buf : gpu_hash_table_err_buff_) {
1653  if (buf) {
1654  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1655  buf = nullptr;
1656  }
1657  }
1658 #else
1659  CHECK(false);
1660 #endif // HAVE_CUDA
1661 }
1662 
1664  cpu_hash_table_buff_.reset();
1665 }
int get_table_id() const
Definition: Analyzer.h:194
void initOneToManyHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::vector< InputTableInfo > & query_infos_
std::vector< int > ChunkKey
Definition: types.h:37
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
std::string cat(Ts &&...args)
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
ExecutorDeviceType
const int device_count_
size_t num_elems
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
const Expr * get_right_operand() const
Definition: Analyzer.h:443
void freeHashBufferMemory()
size_t payloadBufferOff() const noexceptoverride
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:199
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:899
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
HashType hash_type_
T visit(const Analyzer::Expr *expr) const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:330
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
void checkHashJoinReplicationConstraint(const int table_id) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:129
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void reifyOneToOneForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
#define CHECK_NE(x, y)
Definition: Logger.h:206
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
size_t offsetBufferOff() const noexceptoverride
void reifyOneToManyForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
int64_t bucket_normalization
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define VLOGGING(n)
Definition: Logger.h:195
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
#define CHECK_LT(x, y)
Definition: Logger.h:207
size_t shardCount() const
Definition: sqltypes.h:55
Executor * executor_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
bool table_is_replicated(const TableDescriptor *td)
ExpressionRange col_range_
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
uint64_t ThreadId
Definition: Logger.h:306
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getComponentBufferSize() const noexcept
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ThreadId thread_id()
Definition: Logger.cpp:731
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
size_t countBufferOff() const noexceptoverride
Definition: sqldefs.h:31
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
const Expr * get_left_operand() const
Definition: Analyzer.h:442
bool is_overlaps_oper() const
Definition: Analyzer.h:440
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
virtual ~JoinHashTable()
static std::string getHashTypeString(HashType ht) noexcept
void initOneToOneHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HashType getHashType() const noexceptoverride
Definition: JoinHashTable.h:89
int get_column_id() const
Definition: Analyzer.h:195
const Data_Namespace::MemoryLevel memory_level_
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:24
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
#define VLOG(n)
Definition: Logger.h:291
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
size_t hash_entry_count_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)