OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "JoinHashTable.h"
18 #include "CodeGenerator.h"
19 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "ExpressionRewrite.h"
22 #include "HashJoinRuntime.h"
23 #include "RangeTableIndexVisitor.h"
24 #include "RuntimeFunctions.h"
25 #include "Shared/Logger.h"
26 
27 #include <future>
28 #include <numeric>
29 #include <thread>
30 
31 namespace {
32 
34  public:
35  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
36 };
37 
38 } // namespace
39 
41  const Analyzer::Expr* rhs,
43  const TemporaryTables* temporary_tables,
44  const bool is_overlaps_join) {
45  const auto& lhs_ti = lhs->get_type_info();
46  const auto& rhs_ti = rhs->get_type_info();
47  if (!is_overlaps_join) {
48  if (lhs_ti.get_type() != rhs_ti.get_type()) {
49  throw HashJoinFail("Equijoin types must be identical, found: " +
50  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
51  }
52  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
53  !lhs_ti.is_decimal()) {
54  throw HashJoinFail("Cannot apply hash join to inner column type " +
55  lhs_ti.get_type_name());
56  }
57  // Decimal types should be identical.
58  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
59  lhs_ti.get_precision() != rhs_ti.get_precision())) {
60  throw HashJoinFail("Equijoin with different decimal types");
61  }
62  }
63 
64  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
65  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
66  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
67  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
68  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
69  throw HashJoinFail("Cannot use hash join for given expression");
70  }
71  // Casts to decimal are not suported.
72  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
73  throw HashJoinFail("Cannot use hash join for given expression");
74  }
75  const auto lhs_col =
76  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
77  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
78  const auto rhs_col =
79  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
80  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
81  if (!lhs_col && !rhs_col) {
82  throw HashJoinFail("Cannot use hash join for given expression");
83  }
84  const Analyzer::ColumnVar* inner_col{nullptr};
85  const Analyzer::ColumnVar* outer_col{nullptr};
86  auto outer_ti = lhs_ti;
87  auto inner_ti = rhs_ti;
88  const Analyzer::Expr* outer_expr{lhs};
89  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
90  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
91  inner_col = rhs_col;
92  outer_col = lhs_col;
93  } else {
94  if (lhs_col && lhs_col->get_rte_idx() == 0) {
95  throw HashJoinFail("Cannot use hash join for given expression");
96  }
97  inner_col = lhs_col;
98  outer_col = rhs_col;
99  std::swap(outer_ti, inner_ti);
100  outer_expr = rhs;
101  }
102  if (!inner_col) {
103  throw HashJoinFail("Cannot use hash join for given expression");
104  }
105  if (!outer_col) {
106  MaxRangeTableIndexVisitor rte_idx_visitor;
107  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
108  // The inner column candidate is not actually inner; the outer
109  // expression contains columns which are at least as deep.
110  if (inner_col->get_rte_idx() <= outer_rte_idx) {
111  throw HashJoinFail("Cannot use hash join for given expression");
112  }
113  }
114  // We need to fetch the actual type information from the catalog since Analyzer
115  // always reports nullable as true for inner table columns in left joins.
116  const auto inner_col_cd = get_column_descriptor_maybe(
117  inner_col->get_column_id(), inner_col->get_table_id(), cat);
118  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
119  inner_col->get_table_id(),
120  inner_col_cd,
121  temporary_tables);
122  const auto& outer_col_ti =
123  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
124  ? outer_col->get_type_info()
125  : outer_ti;
126  // Casts from decimal are not supported.
127  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
128  (lhs_cast || rhs_cast)) {
129  throw HashJoinFail("Cannot use hash join for given expression");
130  }
131  if (is_overlaps_join) {
132  if (!inner_col_real_ti.is_array()) {
133  throw HashJoinFail(
134  "Overlaps join only supported for inner columns with array type");
135  }
136  auto is_bounds_array = [](const auto ti) {
137  return ti.is_fixlen_array() && ti.get_size() == 32;
138  };
139  if (!is_bounds_array(inner_col_real_ti)) {
140  throw HashJoinFail(
141  "Overlaps join only supported for 4-element double fixed length arrays");
142  }
143  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti))) {
144  throw HashJoinFail(
145  "Overlaps join only supported for geometry outer columns of type point or "
146  "geometry columns with bounds");
147  }
148  } else {
149  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
150  inner_col_real_ti.is_decimal() ||
151  (inner_col_real_ti.is_string() &&
152  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
153  throw HashJoinFail(
154  "Can only apply hash join to integer-like types and dictionary encoded "
155  "strings");
156  }
157  }
158 
159  auto normalized_inner_col = inner_col;
160  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
161 
162  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
163  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
164 
165  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
166  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
167  normalized_inner_ti.get_type_name() + " and " +
168  normalized_outer_ti.get_type_name()));
169  }
170 
171  return {normalized_inner_col, normalized_outer_col};
172 }
173 
174 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
176  const TemporaryTables* temporary_tables) {
177  std::vector<InnerOuter> result;
178  const auto lhs_tuple_expr =
179  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
180  const auto rhs_tuple_expr =
181  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
182 
183  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
184  if (lhs_tuple_expr) {
185  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
186  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
187  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
188  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
189  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
190  rhs_tuple[i].get(),
191  cat,
192  temporary_tables,
193  condition->is_overlaps_oper()));
194  }
195  } else {
196  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
197  result.push_back(normalize_column_pair(condition->get_left_operand(),
198  condition->get_right_operand(),
199  cat,
200  temporary_tables,
201  condition->is_overlaps_oper()));
202  }
203 
204  return result;
205 }
206 
207 namespace {
208 
209 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> get_cols(
210  const Analyzer::BinOper* qual_bin_oper,
212  const TemporaryTables* temporary_tables) {
213  const auto lhs = qual_bin_oper->get_left_operand();
214  const auto rhs = qual_bin_oper->get_right_operand();
215  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
216 }
217 
219  ExpressionRange const& col_range,
220  bool const is_bw_eq) {
221  using EmptyRangeSize = boost::optional<size_t>;
222  auto empty_range_check = [](ExpressionRange const& col_range,
223  bool const is_bw_eq) -> EmptyRangeSize {
224  if (col_range.getIntMin() > col_range.getIntMax()) {
225  CHECK_EQ(col_range.getIntMin(), int64_t(0));
226  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
227  if (is_bw_eq) {
228  return size_t(1);
229  }
230  return size_t(0);
231  }
232  return EmptyRangeSize{};
233  };
234 
235  auto empty_range = empty_range_check(col_range, is_bw_eq);
236  if (empty_range) {
237  return {size_t(*empty_range), 1};
238  }
239 
240  int64_t bucket_normalization =
241  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
242  CHECK_GT(bucket_normalization, 0);
243  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
244  bucket_normalization};
245 }
246 
247 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
248  if (col_range.getIntMin() > col_range.getIntMax()) {
249  CHECK_EQ(col_range.getIntMin(), int64_t(0));
250  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
251  return is_bw_eq ? 1 : 0;
252  }
253  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
254 }
255 
256 } // namespace
257 
258 std::vector<std::pair<JoinHashTable::JoinHashTableCacheKey,
259  std::shared_ptr<std::vector<int32_t>>>>
262 
263 size_t get_shard_count(const Analyzer::BinOper* join_condition,
264  const Executor* executor) {
265  const Analyzer::ColumnVar* inner_col{nullptr};
266  const Analyzer::Expr* outer_col{nullptr};
267  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
268  try {
269  std::tie(inner_col, outer_col) =
270  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
271  } catch (...) {
272  return 0;
273  }
274  if (!inner_col || !outer_col) {
275  return 0;
276  }
277  return get_shard_count({inner_col, outer_col}, executor);
278 }
279 
280 namespace {
281 
282 bool shard_count_less_or_equal_device_count(const int inner_table_id,
283  const Executor* executor) {
284  const auto inner_table_info = executor->getTableInfo(inner_table_id);
285  std::unordered_set<int> device_holding_fragments;
286  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
287  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
288  for (const auto& fragment : inner_table_info.fragments) {
289  if (fragment.shard != -1) {
290  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
291  if (!it_ok.second) {
292  return false;
293  }
294  }
295  }
296  return true;
297 }
298 
299 } // namespace
300 
302  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
303  const Executor* executor) {
304  const auto inner_col = equi_pair.first;
305  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
306  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
307  return 0;
308  }
309  if (outer_col->get_rte_idx()) {
310  return 0;
311  }
312  if (inner_col->get_type_info() != outer_col->get_type_info()) {
313  return 0;
314  }
315  const auto catalog = executor->getCatalog();
316  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
317  CHECK(inner_td);
318  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
319  CHECK(outer_td);
320  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
321  inner_td->nShards != outer_td->nShards) {
322  return 0;
323  }
324  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
325  return 0;
326  }
327  // The two columns involved must be the ones on which the tables have been sharded on.
328  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
329  outer_td->shardedColumnId == outer_col->get_column_id()) ||
330  (outer_td->shardedColumnId == inner_col->get_column_id() &&
331  inner_td->shardedColumnId == inner_col->get_column_id())
332  ? inner_td->nShards
333  : 0;
334 }
335 
337 std::shared_ptr<JoinHashTable> JoinHashTable::getInstance(
338  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
339  const std::vector<InputTableInfo>& query_infos,
340  const Data_Namespace::MemoryLevel memory_level,
341  const HashType preferred_hash_type,
342  const int device_count,
343  ColumnCacheMap& column_cache,
344  Executor* executor) {
345  decltype(std::chrono::steady_clock::now()) ts1, ts2;
346  if (VLOGGING(1)) {
347  VLOG(1) << "Building perfect hash table " << getHashTypeString(preferred_hash_type)
348  << " for qual: " << qual_bin_oper->toString();
349  ts1 = std::chrono::steady_clock::now();
350  }
351  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
352  const auto cols =
353  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
354  const auto inner_col = cols.first;
355  CHECK(inner_col);
356  const auto& ti = inner_col->get_type_info();
357  auto col_range =
358  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
359  if (col_range.getType() == ExpressionRangeType::Invalid) {
360  throw HashJoinFail(
361  "Could not compute range for the expressions involved in the equijoin");
362  }
363  if (ti.is_string()) {
364  // The nullable info must be the same as the source column.
365  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
366  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
367  throw HashJoinFail(
368  "Could not compute range for the expressions involved in the equijoin");
369  }
370  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
371  // If the inner column expression range is empty, use the inner col range
372  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
373  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
374  col_range = source_col_range;
375  } else {
376  col_range = ExpressionRange::makeIntRange(
377  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
378  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
379  0,
380  source_col_range.hasNulls());
381  }
382  }
383  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
384  const auto max_hash_entry_count =
386  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
387  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
388 
389  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
390  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
391  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
392 
393  if (bucketized_entry_count > max_hash_entry_count) {
394  throw TooManyHashEntries();
395  }
396 
397  if (qual_bin_oper->get_optype() == kBW_EQ &&
398  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
399  throw HashJoinFail("Cannot translate null value for kBW_EQ");
400  }
401  auto join_hash_table =
402  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
403  inner_col,
404  query_infos,
405  memory_level,
406  preferred_hash_type,
407  col_range,
408  column_cache,
409  executor,
410  device_count));
411  try {
412  join_hash_table->reify();
413  } catch (const TableMustBeReplicated& e) {
414  // Throw a runtime error to abort the query
415  join_hash_table->freeHashBufferMemory();
416  throw std::runtime_error(e.what());
417  } catch (const HashJoinFail& e) {
418  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
419  // possible)
420  join_hash_table->freeHashBufferMemory();
421  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
422  "involved in equijoin | ") +
423  e.what());
424  } catch (const ColumnarConversionNotSupported& e) {
425  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
426  e.what());
427  } catch (const OutOfMemory& e) {
428  throw HashJoinFail(
429  std::string("Ran out of memory while building hash tables for equijoin | ") +
430  e.what());
431  } catch (const std::exception& e) {
432  throw std::runtime_error(
433  std::string("Fatal error while attempting to build hash tables for join: ") +
434  e.what());
435  }
436  if (VLOGGING(1)) {
437  ts2 = std::chrono::steady_clock::now();
438  VLOG(1) << "Built perfect hash table "
439  << getHashTypeString(join_hash_table->getHashType()) << " in "
440  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
441  << " ms";
442  }
443  return join_hash_table;
444 }
445 
447  const Analyzer::Expr* outer_col_expr,
448  const Executor* executor) {
449  const auto catalog = executor->getCatalog();
450  CHECK(catalog);
451  const auto inner_cd = get_column_descriptor_maybe(
452  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
453  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
454  inner_col->get_table_id(),
455  inner_cd,
456  executor->getTemporaryTables());
457  // Only strings may need dictionary translation.
458  if (!inner_ti.is_string()) {
459  return false;
460  }
461  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
462  CHECK(outer_col);
463  const auto outer_cd = get_column_descriptor_maybe(
464  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
465  // Don't want to deal with temporary tables for now, require translation.
466  if (!inner_cd || !outer_cd) {
467  return true;
468  }
469  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
470  outer_col->get_table_id(),
471  outer_cd,
472  executor->getTemporaryTables());
473  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
474  // If the two columns don't share the dictionary, translation is needed.
475  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
476 }
477 
478 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
479  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
480  const int device_id,
481  const int device_count) {
482  std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
483  for (const auto& fragment : fragments) {
484  CHECK_GE(fragment.shard, 0);
485  if (fragment.shard % device_count == device_id) {
486  shards_for_device.push_back(fragment);
487  }
488  }
489  return shards_for_device;
490 }
491 
493  auto timer = DEBUG_TIMER(__func__);
495  const auto& catalog = *executor_->getCatalog();
496  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
497  const auto inner_col = cols.first;
498  checkHashJoinReplicationConstraint(inner_col->get_table_id());
499  const auto& query_info = getInnerQueryInfo(inner_col).info;
500  if (query_info.fragments.empty()) {
501  return;
502  }
503  if (query_info.getNumTuplesUpperBound() >
504  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
505  throw TooManyHashEntries();
506  }
507 #ifdef HAVE_CUDA
508  gpu_hash_table_buff_.resize(device_count_);
509  gpu_hash_table_err_buff_.resize(device_count_);
510 #endif // HAVE_CUDA
511  std::vector<std::future<void>> init_threads;
512  const int shard_count = shardCount();
513 
514  try {
515  for (int device_id = 0; device_id < device_count_; ++device_id) {
516  const auto fragments =
517  shard_count
518  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
519  : query_info.fragments;
520  init_threads.push_back(
521  std::async(std::launch::async,
525  this,
526  fragments,
527  device_id,
528  logger::thread_id()));
529  }
530  for (auto& init_thread : init_threads) {
531  init_thread.wait();
532  }
533  for (auto& init_thread : init_threads) {
534  init_thread.get();
535  }
536 
537  } catch (const NeedsOneToManyHash& e) {
540  init_threads.clear();
541  for (int device_id = 0; device_id < device_count_; ++device_id) {
542  const auto fragments =
543  shard_count
544  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
545  : query_info.fragments;
546 
547  init_threads.push_back(std::async(std::launch::async,
549  this,
550  fragments,
551  device_id,
552  logger::thread_id()));
553  }
554  for (auto& init_thread : init_threads) {
555  init_thread.wait();
556  }
557  for (auto& init_thread : init_threads) {
558  init_thread.get();
559  }
560  }
561 }
562 
564 #ifdef HAVE_CUDA
565  CHECK(executor_);
566  CHECK(executor_->catalog_);
567  auto& data_mgr = executor_->catalog_->getDataMgr();
568  for (auto& gpu_buffer : gpu_hash_table_buff_) {
569  if (gpu_buffer) {
570  data_mgr.free(gpu_buffer);
571  }
572  }
573  for (auto& gpu_buffer : gpu_hash_table_err_buff_) {
574  if (gpu_buffer) {
575  data_mgr.free(gpu_buffer);
576  }
577  }
578 #endif
579 }
580 
582  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
583  const Analyzer::Expr* outer_col_expr,
584  const Analyzer::ColumnVar* inner_col) const {
585  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
586  inner_col->get_table_id(),
587  inner_col->get_column_id()};
588  const auto& ti = inner_col->get_type_info();
589  if (ti.is_string()) {
590  CHECK_EQ(kENCODING_DICT, ti.get_compression());
591  size_t outer_elem_count = 0;
592  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
593  CHECK(outer_col);
594  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
595  for (auto& frag : outer_query_info.fragments) {
596  outer_elem_count = frag.getNumTuples();
597  }
598  hash_table_key.push_back(outer_elem_count);
599  }
600  if (fragments.size() < 2) {
601  hash_table_key.push_back(fragments.front().fragmentId);
602  }
603  return hash_table_key;
604 }
605 
607  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
608  const int device_id,
609  const logger::ThreadId parent_thread_id) {
610  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
611  const auto& catalog = *executor_->getCatalog();
612  auto& data_mgr = catalog.getDataMgr();
613  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
614  const auto inner_col = cols.first;
615  CHECK(inner_col);
616  const auto inner_cd = get_column_descriptor_maybe(
617  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
618  if (inner_cd && inner_cd->isVirtualCol) {
620  }
621  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
622  // Since we don't have the string dictionary payloads on the GPU, we'll build
623  // the join hash table on the CPU and transfer it to the GPU.
624  const auto effective_memory_level =
625  needs_dictionary_translation(inner_col, cols.second, executor_)
627  : memory_level_;
628  if (fragments.empty()) {
629  // No data in this fragment. Still need to create a hash table and initialize it
630  // properly.
631  ChunkKey empty_chunk;
632  initOneToOneHashTable(empty_chunk,
633  JoinColumn{nullptr, 0, 0, 0, 0},
634  cols,
635  effective_memory_level,
636  device_id);
637  return;
638  }
639 
640  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
641  std::unique_ptr<DeviceAllocator> device_allocator;
642  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
643  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
644  }
645  std::vector<std::shared_ptr<void>> malloc_owner;
646 
647  JoinColumn join_column = fetchJoinColumn(inner_col,
648  fragments,
649  effective_memory_level,
650  device_id,
651  chunks_owner,
652  device_allocator.get(),
653  malloc_owner,
654  executor_,
655  &column_cache_);
656 
657  initOneToOneHashTable(genHashTableKey(fragments, cols.second, inner_col),
658  join_column,
659  cols,
660  effective_memory_level,
661  device_id);
662 }
663 
665  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
666  const int device_id,
667  const logger::ThreadId parent_thread_id) {
668  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
669  const auto& catalog = *executor_->getCatalog();
670  auto& data_mgr = catalog.getDataMgr();
671  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
672  const auto inner_col = cols.first;
673  CHECK(inner_col);
674  const auto inner_cd = get_column_descriptor_maybe(
675  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
676  if (inner_cd && inner_cd->isVirtualCol) {
678  }
679  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
680  // Since we don't have the string dictionary payloads on the GPU, we'll build
681  // the join hash table on the CPU and transfer it to the GPU.
682  const auto effective_memory_level =
683  needs_dictionary_translation(inner_col, cols.second, executor_)
685  : memory_level_;
686  if (fragments.empty()) {
687  ChunkKey empty_chunk;
688  initOneToManyHashTable(empty_chunk,
689  JoinColumn{nullptr, 0, 0, 0, 0},
690  cols,
691  effective_memory_level,
692  device_id);
693  return;
694  }
695 
696  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
697  std::unique_ptr<DeviceAllocator> device_allocator;
698  if (effective_memory_level == MemoryLevel::GPU_LEVEL) {
699  device_allocator = std::make_unique<CudaAllocator>(&data_mgr, device_id);
700  }
701  std::vector<std::shared_ptr<void>> malloc_owner;
702 
703  JoinColumn join_column = fetchJoinColumn(inner_col,
704  fragments,
705  effective_memory_level,
706  device_id,
707  chunks_owner,
708  device_allocator.get(),
709  malloc_owner,
710  executor_,
711  &column_cache_);
712 
713  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
714  join_column,
715  cols,
716  effective_memory_level,
717  device_id);
718 }
719 
721  if (!g_cluster) {
722  return;
723  }
724  if (table_id >= 0) {
725  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
726  CHECK(inner_td);
727  size_t shard_count{0};
728  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
729  if (!shard_count && !table_is_replicated(inner_td)) {
730  throw TableMustBeReplicated(inner_td->tableName);
731  }
732  }
733 }
734 
736  const JoinColumn& join_column,
737  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
738  const HashEntryInfo hash_entry_info,
739  const int32_t hash_join_invalid_val) {
740  auto timer = DEBUG_TIMER(__func__);
741  const auto inner_col = cols.first;
742  CHECK(inner_col);
743  const auto& ti = inner_col->get_type_info();
744  if (!cpu_hash_table_buff_) {
745  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
746  hash_entry_info.getNormalizedHashEntryCount());
747  const StringDictionaryProxy* sd_inner_proxy{nullptr};
748  const StringDictionaryProxy* sd_outer_proxy{nullptr};
749  if (ti.is_string()) {
750  CHECK_EQ(kENCODING_DICT, ti.get_compression());
751  sd_inner_proxy = executor_->getStringDictionaryProxy(
752  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
753  CHECK(sd_inner_proxy);
754  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
755  CHECK(outer_col);
756  sd_outer_proxy = executor_->getStringDictionaryProxy(
757  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
758  CHECK(sd_outer_proxy);
759  }
760  int thread_count = cpu_threads();
761  std::vector<std::thread> init_cpu_buff_threads;
762  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
763  init_cpu_buff_threads.emplace_back(
764  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
766  hash_entry_info.getNormalizedHashEntryCount(),
767  hash_join_invalid_val,
768  thread_idx,
769  thread_count);
770  });
771  }
772  for (auto& t : init_cpu_buff_threads) {
773  t.join();
774  }
775  init_cpu_buff_threads.clear();
776  int err{0};
777  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
778  init_cpu_buff_threads.emplace_back([this,
779  hash_join_invalid_val,
780  &join_column,
781  sd_inner_proxy,
782  sd_outer_proxy,
783  thread_idx,
784  thread_count,
785  &ti,
786  &err,
787  hash_entry_info] {
788  int partial_err =
790  hash_join_invalid_val,
791  join_column,
792  {static_cast<size_t>(ti.get_size()),
796  isBitwiseEq(),
797  col_range_.getIntMax() + 1,
799  sd_inner_proxy,
800  sd_outer_proxy,
801  thread_idx,
802  thread_count,
803  hash_entry_info.bucket_normalization);
804  __sync_val_compare_and_swap(&err, 0, partial_err);
805  });
806  }
807  for (auto& t : init_cpu_buff_threads) {
808  t.join();
809  }
810  if (err) {
811  cpu_hash_table_buff_.reset();
812  // Too many hash entries, need to retry with a 1:many table
813  throw NeedsOneToManyHash();
814  }
815  } else {
816  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
817  // Too many hash entries, need to retry with a 1:many table
818  throw NeedsOneToManyHash();
819  }
820  }
821 }
822 
824  const JoinColumn& join_column,
825  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
826  const HashEntryInfo hash_entry_info,
827  const int32_t hash_join_invalid_val) {
828  auto timer = DEBUG_TIMER(__func__);
829  const auto inner_col = cols.first;
830  CHECK(inner_col);
831  const auto& ti = inner_col->get_type_info();
832  if (cpu_hash_table_buff_) {
833  return;
834  }
835  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
836  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems);
837  const StringDictionaryProxy* sd_inner_proxy{nullptr};
838  const StringDictionaryProxy* sd_outer_proxy{nullptr};
839  if (ti.is_string()) {
840  CHECK_EQ(kENCODING_DICT, ti.get_compression());
841  sd_inner_proxy = executor_->getStringDictionaryProxy(
842  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
843  CHECK(sd_inner_proxy);
844  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
845  CHECK(outer_col);
846  sd_outer_proxy = executor_->getStringDictionaryProxy(
847  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
848  CHECK(sd_outer_proxy);
849  }
850  int thread_count = cpu_threads();
851  std::vector<std::future<void>> init_threads;
852  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
853  init_threads.emplace_back(std::async(std::launch::async,
855  &(*cpu_hash_table_buff_)[0],
856  hash_entry_info.getNormalizedHashEntryCount(),
857  hash_join_invalid_val,
858  thread_idx,
859  thread_count));
860  }
861  for (auto& child : init_threads) {
862  child.wait();
863  }
864  for (auto& child : init_threads) {
865  child.get();
866  }
867 
868  if (ti.get_type() == kDATE) {
870  hash_entry_info,
871  hash_join_invalid_val,
872  join_column,
873  {static_cast<size_t>(ti.get_size()),
877  isBitwiseEq(),
878  col_range_.getIntMax() + 1,
880  sd_inner_proxy,
881  sd_outer_proxy,
882  thread_count);
883  } else {
885  hash_entry_info,
886  hash_join_invalid_val,
887  join_column,
888  {static_cast<size_t>(ti.get_size()),
892  isBitwiseEq(),
893  col_range_.getIntMax() + 1,
895  sd_inner_proxy,
896  sd_outer_proxy,
897  thread_count);
898  }
899 }
900 
901 namespace {
902 
903 #ifdef HAVE_CUDA
904 // Number of entries per shard, rounded up.
905 size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count) {
906  CHECK_NE(size_t(0), shard_count);
907  return (total_entry_count + shard_count - 1) / shard_count;
908 }
909 #endif // HAVE_CUDA
910 
911 } // namespace
912 
914  const ChunkKey& chunk_key,
915  const JoinColumn& join_column,
916  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
917  const Data_Namespace::MemoryLevel effective_memory_level,
918  const int device_id) {
919  auto timer = DEBUG_TIMER(__func__);
920  const auto inner_col = cols.first;
921  CHECK(inner_col);
922 
923  auto hash_entry_info = get_bucketized_hash_entry_info(
924  inner_col->get_type_info(), col_range_, isBitwiseEq());
925  if (!hash_entry_info) {
926  return;
927  }
928 
929 #ifdef HAVE_CUDA
930  const auto shard_count = shardCount();
931  const size_t entries_per_shard{
932  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
933  : 0};
934  // Even if we join on dictionary encoded strings, the memory on the GPU is still
935  // needed once the join hash table has been built on the CPU.
936  const auto catalog = executor_->getCatalog();
938  auto& data_mgr = catalog->getDataMgr();
939  if (shard_count) {
940  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
941  CHECK_GT(shards_per_device, 0u);
942  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
943  }
944  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
945  &data_mgr,
946  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
947  device_id);
948  }
949 #else
950  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
951 #endif
952  if (!device_id) {
953  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
954  }
955 
956 #ifdef HAVE_CUDA
957  const auto& ti = inner_col->get_type_info();
958 #endif
959  const int32_t hash_join_invalid_val{-1};
960  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
961  CHECK(!chunk_key.empty());
962  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
963  {
964  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
966  join_column, cols, hash_entry_info, hash_join_invalid_val);
967  }
968  if (inner_col->get_table_id() > 0) {
969  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
970  }
971  // Transfer the hash table on the GPU if we've only built it on CPU
972  // but the query runs on GPU (join on dictionary encoded columns).
974 #ifdef HAVE_CUDA
975  CHECK(ti.is_string());
976  auto& data_mgr = catalog->getDataMgr();
977  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
978 
979  copy_to_gpu(
980  &data_mgr,
981  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
982  &(*cpu_hash_table_buff_)[0],
983  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
984  device_id);
985 #else
986  CHECK(false);
987 #endif
988  }
989  } else {
990 #ifdef HAVE_CUDA
991  int err{0};
992  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
993  auto& data_mgr = catalog->getDataMgr();
994  gpu_hash_table_err_buff_[device_id] =
995  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
996  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
997  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
998  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1000  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1001  hash_entry_info.getNormalizedHashEntryCount(),
1002  hash_join_invalid_val,
1003  executor_->blockSize(),
1004  executor_->gridSize());
1005  if (chunk_key.empty()) {
1006  return;
1007  }
1008  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1012  isBitwiseEq(),
1013  col_range_.getIntMax() + 1,
1015  if (shard_count) {
1016  CHECK_GT(device_count_, 0);
1017  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1018  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1020  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1021  hash_join_invalid_val,
1022  reinterpret_cast<int*>(dev_err_buff),
1023  join_column,
1024  type_info,
1025  shard_info,
1026  executor_->blockSize(),
1027  executor_->gridSize(),
1028  hash_entry_info.bucket_normalization);
1029  }
1030  } else {
1032  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1033  hash_join_invalid_val,
1034  reinterpret_cast<int*>(dev_err_buff),
1035  join_column,
1036  type_info,
1037  executor_->blockSize(),
1038  executor_->gridSize(),
1039  hash_entry_info.bucket_normalization);
1040  }
1041  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1042 
1043  if (err) {
1044  throw NeedsOneToManyHash();
1045  }
1046 #else
1047  CHECK(false);
1048 #endif
1049  }
1050 }
1051 
1053  const ChunkKey& chunk_key,
1054  const JoinColumn& join_column,
1055  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
1056  const Data_Namespace::MemoryLevel effective_memory_level,
1057  const int device_id) {
1058  auto timer = DEBUG_TIMER(__func__);
1059  auto const inner_col = cols.first;
1060  CHECK(inner_col);
1061 
1062  auto hash_entry_info = get_bucketized_hash_entry_info(
1063  inner_col->get_type_info(), col_range_, isBitwiseEq());
1064 
1065 #ifdef HAVE_CUDA
1066  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1067  const size_t entries_per_shard =
1068  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1069  : 0);
1070  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1071  // needed once the join hash table has been built on the CPU.
1072  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1073  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1074  CHECK_GT(shards_per_device, 0u);
1075  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1076  }
1077 #else
1078  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1079 #endif
1080  if (!device_id) {
1081  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1082  }
1083 
1084 #ifdef HAVE_CUDA
1085  const auto& ti = inner_col->get_type_info();
1086  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1088  const size_t total_count =
1089  2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
1090  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1091  &data_mgr, total_count * sizeof(int32_t), device_id);
1092  }
1093 #endif
1094  const int32_t hash_join_invalid_val{-1};
1095  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1096  initHashTableOnCpuFromCache(chunk_key, join_column.num_elems, cols);
1097  {
1098  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1100  join_column, cols, hash_entry_info, hash_join_invalid_val);
1101  }
1102  if (inner_col->get_table_id() > 0) {
1103  putHashTableOnCpuToCache(chunk_key, join_column.num_elems, cols);
1104  }
1105  // Transfer the hash table on the GPU if we've only built it on CPU
1106  // but the query runs on GPU (join on dictionary encoded columns).
1107  // Don't transfer the buffer if there was an error since we'll bail anyway.
1109 #ifdef HAVE_CUDA
1110  CHECK(ti.is_string());
1111  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1112  copy_to_gpu(
1113  &data_mgr,
1114  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1115  &(*cpu_hash_table_buff_)[0],
1116  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1117  device_id);
1118 #else
1119  CHECK(false);
1120 #endif
1121  }
1122  } else {
1123 #ifdef HAVE_CUDA
1124  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1125  data_mgr.getCudaMgr()->setContext(device_id);
1127  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1128  hash_entry_info.getNormalizedHashEntryCount(),
1129  hash_join_invalid_val,
1130  executor_->blockSize(),
1131  executor_->gridSize());
1132  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1136  isBitwiseEq(),
1137  col_range_.getIntMax() + 1,
1139  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1140 
1141  if (shard_count) {
1142  CHECK_GT(device_count_, 0);
1143  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1144  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1146  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1147  hash_entry_info,
1148  hash_join_invalid_val,
1149  join_column,
1150  type_info,
1151  shard_info,
1152  executor_->blockSize(),
1153  executor_->gridSize());
1154  }
1155  } else {
1156  if (use_bucketization) {
1158  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1159  hash_entry_info,
1160  hash_join_invalid_val,
1161  join_column,
1162  type_info,
1163  executor_->blockSize(),
1164  executor_->gridSize());
1165  } else {
1167  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1168  hash_entry_info,
1169  hash_join_invalid_val,
1170  join_column,
1171  type_info,
1172  executor_->blockSize(),
1173  executor_->gridSize());
1174  }
1175  }
1176 #else
1177  CHECK(false);
1178 #endif
1179  }
1180 }
1181 
1183  const ChunkKey& chunk_key,
1184  const size_t num_elements,
1185  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1186  auto timer = DEBUG_TIMER(__func__);
1187  CHECK_GE(chunk_key.size(), size_t(2));
1188  if (chunk_key[1] < 0) {
1189  // Do not cache hash tables over intermediate results
1190  return;
1191  }
1192  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1194  *cols.first,
1195  outer_col ? *outer_col : *cols.first,
1196  num_elements,
1197  chunk_key,
1198  qual_bin_oper_->get_optype()};
1199  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1200  for (const auto& kv : join_hash_table_cache_) {
1201  if (kv.first == cache_key) {
1202  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1203  cpu_hash_table_buff_ = kv.second;
1204  break;
1205  }
1206  }
1207 }
1208 
1210  const ChunkKey& chunk_key,
1211  const size_t num_elements,
1212  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1213  CHECK_GE(chunk_key.size(), size_t(2));
1214  if (chunk_key[1] < 0) {
1215  // Do not cache hash tables over intermediate results
1216  return;
1217  }
1218  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1220  *cols.first,
1221  outer_col ? *outer_col : *cols.first,
1222  num_elements,
1223  chunk_key,
1224  qual_bin_oper_->get_optype()};
1225  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1226  for (const auto& kv : join_hash_table_cache_) {
1227  if (kv.first == cache_key) {
1228  return;
1229  }
1230  }
1231  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1232 }
1233 
1234 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx) {
1235  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1236  if (hash_ptr->getType()->isIntegerTy(64)) {
1237  return hash_ptr;
1238  }
1239  CHECK(hash_ptr->getType()->isPointerTy());
1240  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1241  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1242  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1243 }
1244 
1245 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx,
1246  Executor* executor) {
1247  llvm::Value* hash_ptr = nullptr;
1248  const auto total_table_count =
1249  executor->plan_state_->join_info_.join_hash_tables_.size();
1250  CHECK_LT(table_idx, total_table_count);
1251  if (total_table_count > 1) {
1252  auto hash_tables_ptr =
1253  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1254  auto hash_pptr =
1255  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1256  hash_tables_ptr,
1257  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1258  : hash_tables_ptr;
1259  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1260  } else {
1261  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1262  }
1263  CHECK(hash_ptr);
1264  return hash_ptr;
1265 }
1266 
1267 std::vector<llvm::Value*> JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr,
1268  const Analyzer::Expr* key_col,
1269  const int shard_count,
1270  const CompilationOptions& co) {
1271  CodeGenerator code_generator(executor_);
1272  const auto key_lvs = code_generator.codegen(key_col, true, co);
1273  CHECK_EQ(size_t(1), key_lvs.size());
1274  auto const& key_col_ti = key_col->get_type_info();
1275  auto hash_entry_info =
1277 
1278  std::vector<llvm::Value*> hash_join_idx_args{
1279  hash_ptr,
1280  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1281  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1282  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1283  if (shard_count) {
1284  const auto expected_hash_entry_count =
1286  const auto entry_count_per_shard =
1287  (expected_hash_entry_count + shard_count - 1) / shard_count;
1288  hash_join_idx_args.push_back(
1289  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1290  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1291  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1292  }
1293  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1294  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1295  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1296  inline_fixed_encoding_null_val(key_col_logical_ti)));
1297  }
1298  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1299  if (isBitwiseEq()) {
1300  if (special_date_bucketization_case) {
1301  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1302  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1303  } else {
1304  hash_join_idx_args.push_back(
1305  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1306  }
1307  }
1308 
1309  if (special_date_bucketization_case) {
1310  hash_join_idx_args.emplace_back(
1311  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1312  }
1313 
1314  return hash_join_idx_args;
1315 }
1316 
1318  const size_t index) {
1319  const auto cols = get_cols(
1320  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1321  auto key_col = cols.second;
1322  CHECK(key_col);
1323  auto val_col = cols.first;
1324  CHECK(val_col);
1325  auto pos_ptr = codegenHashTableLoad(index);
1326  CHECK(pos_ptr);
1327  const int shard_count = shardCount();
1328  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1329  const int64_t sub_buff_size = getComponentBufferSize();
1330  const auto& key_col_ti = key_col->get_type_info();
1331 
1332  auto bucketize = (key_col_ti.get_type() == kDATE);
1333  return codegenMatchingSet(hash_join_idx_args,
1334  shard_count,
1335  !key_col_ti.get_notnull(),
1336  isBitwiseEq(),
1337  sub_buff_size,
1338  executor_,
1339  bucketize);
1340 }
1341 
1343  const std::vector<llvm::Value*>& hash_join_idx_args_in,
1344  const bool is_sharded,
1345  const bool col_is_nullable,
1346  const bool is_bw_eq,
1347  const int64_t sub_buff_size,
1348  Executor* executor,
1349  bool is_bucketized) {
1350  using namespace std::string_literals;
1351 
1352  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1353 
1354  if (is_bw_eq) {
1355  fname += "_bitwise";
1356  }
1357  if (is_sharded) {
1358  fname += "_sharded";
1359  }
1360  if (!is_bw_eq && col_is_nullable) {
1361  fname += "_nullable";
1362  }
1363 
1364  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1365  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1366  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1367 
1368  auto pos_ptr = hash_join_idx_args_in[0];
1369  CHECK(pos_ptr);
1370 
1371  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1372  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1373  auto hash_join_idx_args = hash_join_idx_args_in;
1374  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1375  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1376 
1377  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1378  slot_valid_lv,
1379  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1380  executor->cgen_state_->llInt(int64_t(0)));
1381  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1382  executor->cgen_state_->ir_builder_.CreateAdd(
1383  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1384  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1385  auto rowid_ptr_i32 =
1386  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1387  return {rowid_ptr_i32, row_count_lv, slot_lv};
1388 }
1389 
1390 size_t JoinHashTable::offsetBufferOff() const noexcept {
1391  return 0;
1392 }
1393 
1394 size_t JoinHashTable::countBufferOff() const noexcept {
1395  return getComponentBufferSize();
1396 }
1397 
1398 size_t JoinHashTable::payloadBufferOff() const noexcept {
1399  return 2 * getComponentBufferSize();
1400 }
1401 
1402 size_t JoinHashTable::getComponentBufferSize() const noexcept {
1404  return hash_entry_count_ * sizeof(int32_t);
1405  } else {
1406  return 0;
1407  }
1408 }
1409 
1411  const int device_id) const noexcept {
1412  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1413  return 0;
1414  }
1415 #ifdef HAVE_CUDA
1416  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1417  if (device_type == ExecutorDeviceType::CPU) {
1418  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1419  } else {
1420  return gpu_hash_table_buff_[device_id]
1421  ? reinterpret_cast<CUdeviceptr>(
1422  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1423  : reinterpret_cast<CUdeviceptr>(nullptr);
1424  }
1425 #else
1426  CHECK(device_type == ExecutorDeviceType::CPU);
1427  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1428 #endif
1429 }
1430 
1432  const int device_id) const noexcept {
1433  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1434  return 0;
1435  }
1436 #ifdef HAVE_CUDA
1437  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1438  if (device_type == ExecutorDeviceType::CPU) {
1439  return cpu_hash_table_buff_->size() *
1440  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1441  } else {
1442  return gpu_hash_table_buff_[device_id]
1443  ? gpu_hash_table_buff_[device_id]->reservedSize()
1444  : 0;
1445  }
1446 #else
1447  CHECK(device_type == ExecutorDeviceType::CPU);
1448  return cpu_hash_table_buff_->size() *
1449  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1450 #endif
1451 }
1452 
1453 std::string JoinHashTable::toString(const ExecutorDeviceType device_type,
1454  const int device_id,
1455  bool raw) const {
1456  auto buffer = getJoinHashBuffer(device_type, device_id);
1457  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1458 #ifdef HAVE_CUDA
1459  std::unique_ptr<int8_t[]> buffer_copy;
1460  if (device_type == ExecutorDeviceType::GPU) {
1461  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1462 
1463  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1464  buffer_copy.get(),
1465  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1466  buffer_size,
1467  device_id);
1468  }
1469  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1470 #else
1471  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1472 #endif // HAVE_CUDA
1473  auto ptr2 = ptr1 + offsetBufferOff();
1474  auto ptr3 = ptr1 + countBufferOff();
1475  auto ptr4 = ptr1 + payloadBufferOff();
1476  return JoinHashTableInterface::toString("perfect",
1478  0,
1479  0,
1481  ptr1,
1482  ptr2,
1483  ptr3,
1484  ptr4,
1485  buffer_size,
1486  raw);
1487 }
1488 
1489 std::set<DecodedJoinHashBufferEntry> JoinHashTable::toSet(
1490  const ExecutorDeviceType device_type,
1491  const int device_id) const {
1492  auto buffer = getJoinHashBuffer(device_type, device_id);
1493  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1494 #ifdef HAVE_CUDA
1495  std::unique_ptr<int8_t[]> buffer_copy;
1496  if (device_type == ExecutorDeviceType::GPU) {
1497  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1498 
1499  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1500  buffer_copy.get(),
1501  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1502  buffer_size,
1503  device_id);
1504  }
1505  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1506 #else
1507  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1508 #endif // HAVE_CUDA
1509  auto ptr2 = ptr1 + offsetBufferOff();
1510  auto ptr3 = ptr1 + countBufferOff();
1511  auto ptr4 = ptr1 + payloadBufferOff();
1513  0, 0, hash_entry_count_, ptr1, ptr2, ptr3, ptr4, buffer_size);
1514 }
1515 
1517  const size_t index) {
1518  using namespace std::string_literals;
1519 
1521  const auto cols = get_cols(
1522  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1523  auto key_col = cols.second;
1524  CHECK(key_col);
1525  auto val_col = cols.first;
1526  CHECK(val_col);
1527  CodeGenerator code_generator(executor_);
1528  const auto key_lvs = code_generator.codegen(key_col, true, co);
1529  CHECK_EQ(size_t(1), key_lvs.size());
1530  auto hash_ptr = codegenHashTableLoad(index);
1531  CHECK(hash_ptr);
1532  const int shard_count = shardCount();
1533  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1534 
1535  const auto& key_col_ti = key_col->get_type_info();
1536  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1537  : "hash_join_idx"s);
1538 
1539  if (isBitwiseEq()) {
1540  fname += "_bitwise";
1541  }
1542  if (shard_count) {
1543  fname += "_sharded";
1544  }
1545 
1546  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1547  fname += "_nullable";
1548  }
1549  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1550 }
1551 
1553  const Analyzer::ColumnVar* inner_col) const {
1554  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1555 }
1556 
1558  const int inner_table_id,
1559  const std::vector<InputTableInfo>& query_infos) {
1560  ssize_t ti_idx = -1;
1561  for (size_t i = 0; i < query_infos.size(); ++i) {
1562  if (inner_table_id == query_infos[i].table_id) {
1563  ti_idx = i;
1564  break;
1565  }
1566  }
1567  CHECK_NE(ssize_t(-1), ti_idx);
1568  return query_infos[ti_idx];
1569 }
1570 
1571 size_t get_entries_per_device(const size_t total_entries,
1572  const size_t shard_count,
1573  const size_t device_count,
1574  const Data_Namespace::MemoryLevel memory_level) {
1575  const auto entries_per_shard =
1576  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1577  size_t entries_per_device = entries_per_shard;
1578  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1579  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1580  CHECK_GT(shards_per_device, 0u);
1581  entries_per_device = entries_per_shard * shards_per_device;
1582  }
1583  return entries_per_device;
1584 }
1585 
1586 // TODO(adb): unify with BaselineJoinHashTable
1590  : 0;
1591 }
1592 
1594  return qual_bin_oper_->get_optype() == kBW_EQ;
1595 }
1596 
1598 #ifdef HAVE_CUDA
1600 #endif
1602 }
1603 
1605 #ifdef HAVE_CUDA
1606  const auto& catalog = *executor_->getCatalog();
1607  auto& data_mgr = catalog.getDataMgr();
1608  for (auto& buf : gpu_hash_table_buff_) {
1609  if (buf) {
1610  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1611  buf = nullptr;
1612  }
1613  }
1614  for (auto& buf : gpu_hash_table_err_buff_) {
1615  if (buf) {
1616  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1617  buf = nullptr;
1618  }
1619  }
1620 #else
1621  CHECK(false);
1622 #endif // HAVE_CUDA
1623 }
1624 
1626  cpu_hash_table_buff_.reset();
1627 }
int get_table_id() const
Definition: Analyzer.h:195
void initOneToManyHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const std::vector< InputTableInfo > & query_infos_
std::vector< int > ChunkKey
Definition: types.h:35
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
std::string cat(Ts &&...args)
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ExecutorDeviceType
const int device_count_
size_t num_elems
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
const Expr * get_right_operand() const
Definition: Analyzer.h:444
void freeHashBufferMemory()
size_t payloadBufferOff() const noexceptoverride
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:164
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:807
Definition: sqldefs.h:49
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:315
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
HashType hash_type_
T visit(const Analyzer::Expr *expr) const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:257
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
void checkHashJoinReplicationConstraint(const int table_id) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:94
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void reifyOneToOneForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:148
#define CHECK_NE(x, y)
Definition: Logger.h:206
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
size_t offsetBufferOff() const noexceptoverride
void reifyOneToManyForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
int64_t bucket_normalization
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define VLOGGING(n)
Definition: Logger.h:195
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
#define CHECK_LT(x, y)
Definition: Logger.h:207
size_t shardCount() const
Definition: sqltypes.h:54
Executor * executor_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
bool table_is_replicated(const TableDescriptor *td)
ExpressionRange col_range_
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
uint64_t ThreadId
Definition: Logger.h:306
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
size_t getComponentBufferSize() const noexcept
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
ThreadId thread_id()
Definition: Logger.cpp:715
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
size_t countBufferOff() const noexceptoverride
Definition: sqldefs.h:31
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool g_cluster
const Expr * get_left_operand() const
Definition: Analyzer.h:443
bool is_overlaps_oper() const
Definition: Analyzer.h:441
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
virtual ~JoinHashTable()
static std::string getHashTypeString(HashType ht) noexcept
void initOneToOneHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HashType getHashType() const noexceptoverride
Definition: JoinHashTable.h:89
int get_column_id() const
Definition: Analyzer.h:196
const Data_Namespace::MemoryLevel memory_level_
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:25
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
#define VLOG(n)
Definition: Logger.h:291
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
size_t hash_entry_count_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)