OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "JoinHashTable.h"
18 #include "CodeGenerator.h"
19 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "ExpressionRewrite.h"
22 #include "HashJoinRuntime.h"
23 #include "RangeTableIndexVisitor.h"
24 #include "RuntimeFunctions.h"
25 #include "Shared/Logger.h"
26 
27 #include <future>
28 #include <numeric>
29 #include <thread>
30 
31 namespace {
32 
34  public:
35  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
36 };
37 
38 } // namespace
39 
41  const Analyzer::Expr* rhs,
42  const Catalog_Namespace::Catalog& cat,
43  const TemporaryTables* temporary_tables,
44  const bool is_overlaps_join) {
45  const auto& lhs_ti = lhs->get_type_info();
46  const auto& rhs_ti = rhs->get_type_info();
47  if (!is_overlaps_join) {
48  if (lhs_ti.get_type() != rhs_ti.get_type()) {
49  throw HashJoinFail("Equijoin types must be identical, found: " +
50  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
51  }
52  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string()) {
53  throw HashJoinFail("Cannot apply hash join to inner column type " +
54  lhs_ti.get_type_name());
55  }
56  }
57 
58  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
59  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
60  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
61  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
62  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
63  throw HashJoinFail("Cannot use hash join for given expression");
64  }
65  const auto lhs_col =
66  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
67  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
68  const auto rhs_col =
69  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
70  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
71  if (!lhs_col && !rhs_col) {
72  throw HashJoinFail("Cannot use hash join for given expression");
73  }
74  const Analyzer::ColumnVar* inner_col{nullptr};
75  const Analyzer::ColumnVar* outer_col{nullptr};
76  auto outer_ti = lhs_ti;
77  auto inner_ti = rhs_ti;
78  const Analyzer::Expr* outer_expr{lhs};
79  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
80  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
81  inner_col = rhs_col;
82  outer_col = lhs_col;
83  } else {
84  if (lhs_col && lhs_col->get_rte_idx() == 0) {
85  throw HashJoinFail("Cannot use hash join for given expression");
86  }
87  inner_col = lhs_col;
88  outer_col = rhs_col;
89  std::swap(outer_ti, inner_ti);
90  outer_expr = rhs;
91  }
92  if (!inner_col) {
93  throw HashJoinFail("Cannot use hash join for given expression");
94  }
95  if (!outer_col) {
96  MaxRangeTableIndexVisitor rte_idx_visitor;
97  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
98  // The inner column candidate is not actually inner; the outer
99  // expression contains columns which are at least as deep.
100  if (inner_col->get_rte_idx() <= outer_rte_idx) {
101  throw HashJoinFail("Cannot use hash join for given expression");
102  }
103  }
104  // We need to fetch the actual type information from the catalog since Analyzer
105  // always reports nullable as true for inner table columns in left joins.
106  const auto inner_col_cd = get_column_descriptor_maybe(
107  inner_col->get_column_id(), inner_col->get_table_id(), cat);
108  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
109  inner_col->get_table_id(),
110  inner_col_cd,
111  temporary_tables);
112  const auto& outer_col_ti =
113  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
114  ? outer_col->get_type_info()
115  : outer_ti;
116  if (is_overlaps_join) {
117  if (!inner_col_real_ti.is_array()) {
118  throw HashJoinFail(
119  "Overlaps join only supported for inner columns with array type");
120  }
121  if (!(inner_col_real_ti.is_fixlen_array() && inner_col_real_ti.get_size() == 32)) {
122  throw HashJoinFail(
123  "Overlaps join only supported for 4-element double fixed length arrays");
124  }
125  if (!(outer_col_ti.get_type() == kPOINT)) {
126  throw HashJoinFail(
127  "Overlaps join only supported for geometry outer columns of type point");
128  }
129  } else {
130  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
131  (inner_col_real_ti.is_string() &&
132  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
133  throw HashJoinFail(
134  "Can only apply hash join to integer-like types and dictionary encoded "
135  "strings");
136  }
137  }
138  return {inner_col, outer_col ? outer_col : outer_expr};
139 }
140 
141 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
142  const Catalog_Namespace::Catalog& cat,
143  const TemporaryTables* temporary_tables) {
144  std::vector<InnerOuter> result;
145  const auto lhs_tuple_expr =
146  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
147  const auto rhs_tuple_expr =
148  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
149 
150  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
151  if (lhs_tuple_expr) {
152  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
153  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
154  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
155  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
156  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
157  rhs_tuple[i].get(),
158  cat,
159  temporary_tables,
160  condition->is_overlaps_oper()));
161  }
162  } else {
163  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
164  result.push_back(normalize_column_pair(condition->get_left_operand(),
165  condition->get_right_operand(),
166  cat,
167  temporary_tables,
168  condition->is_overlaps_oper()));
169  }
170 
171  return result;
172 }
173 
174 namespace {
175 
176 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> get_cols(
177  const Analyzer::BinOper* qual_bin_oper,
178  const Catalog_Namespace::Catalog& cat,
179  const TemporaryTables* temporary_tables) {
180  const auto lhs = qual_bin_oper->get_left_operand();
181  const auto rhs = qual_bin_oper->get_right_operand();
182  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
183 }
184 
186  ExpressionRange const& col_range,
187  bool const is_bw_eq) {
188  using EmptyRangeSize = boost::optional<size_t>;
189  auto empty_range_check = [](ExpressionRange const& col_range,
190  bool const is_bw_eq) -> EmptyRangeSize {
191  if (col_range.getIntMin() > col_range.getIntMax()) {
192  CHECK_EQ(col_range.getIntMin(), int64_t(0));
193  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
194  if (is_bw_eq) {
195  return size_t(1);
196  }
197  return size_t(0);
198  }
199  return EmptyRangeSize{};
200  };
201 
202  auto empty_range = empty_range_check(col_range, is_bw_eq);
203  if (empty_range) {
204  return {size_t(*empty_range), 1};
205  }
206 
207  int64_t bucket_normalization =
208  context_ti.get_type() == kDATE ? col_range.getBucket() : 1;
209  CHECK_GT(bucket_normalization, 0);
210  return {size_t(col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
211  bucket_normalization};
212 }
213 
214 size_t get_hash_entry_count(const ExpressionRange& col_range, const bool is_bw_eq) {
215  if (col_range.getIntMin() > col_range.getIntMax()) {
216  CHECK_EQ(col_range.getIntMin(), int64_t(0));
217  CHECK_EQ(col_range.getIntMax(), int64_t(-1));
218  return is_bw_eq ? 1 : 0;
219  }
220  return col_range.getIntMax() - col_range.getIntMin() + 1 + (is_bw_eq ? 1 : 0);
221 }
222 
223 } // namespace
224 
225 std::vector<std::pair<JoinHashTable::JoinHashTableCacheKey,
226  std::shared_ptr<std::vector<int32_t>>>>
229 
230 size_t get_shard_count(const Analyzer::BinOper* join_condition,
231  const Executor* executor) {
232  const Analyzer::ColumnVar* inner_col{nullptr};
233  const Analyzer::Expr* outer_col{nullptr};
234  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
235  try {
236  std::tie(inner_col, outer_col) =
237  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
238  } catch (...) {
239  return 0;
240  }
241  if (!inner_col || !outer_col) {
242  return 0;
243  }
244  return get_shard_count({inner_col, outer_col}, executor);
245 }
246 
247 namespace {
248 
249 bool shard_count_less_or_equal_device_count(const int inner_table_id,
250  const Executor* executor) {
251  const auto inner_table_info = executor->getTableInfo(inner_table_id);
252  std::unordered_set<int> device_holding_fragments;
253  auto cuda_mgr = executor->getCatalog()->getDataMgr().getCudaMgr();
254  const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
255  for (const auto& fragment : inner_table_info.fragments) {
256  if (fragment.shard != -1) {
257  const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
258  if (!it_ok.second) {
259  return false;
260  }
261  }
262  }
263  return true;
264 }
265 
266 } // namespace
267 
269  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
270  const Executor* executor) {
271  const auto inner_col = equi_pair.first;
272  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(equi_pair.second);
273  if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
274  return 0;
275  }
276  if (outer_col->get_rte_idx()) {
277  return 0;
278  }
279  if (inner_col->get_type_info() != outer_col->get_type_info()) {
280  return 0;
281  }
282  const auto catalog = executor->getCatalog();
283  const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
284  CHECK(inner_td);
285  const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
286  CHECK(outer_td);
287  if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
288  inner_td->nShards != outer_td->nShards) {
289  return 0;
290  }
291  if (!shard_count_less_or_equal_device_count(inner_td->tableId, executor)) {
292  return 0;
293  }
294  // The two columns involved must be the ones on which the tables have been sharded on.
295  return (inner_td->shardedColumnId == inner_col->get_column_id() &&
296  outer_td->shardedColumnId == outer_col->get_column_id()) ||
297  (outer_td->shardedColumnId == inner_col->get_column_id() &&
298  inner_td->shardedColumnId == inner_col->get_column_id())
299  ? inner_td->nShards
300  : 0;
301 }
302 
304 std::shared_ptr<JoinHashTable> JoinHashTable::getInstance(
305  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
306  const std::vector<InputTableInfo>& query_infos,
307  const Data_Namespace::MemoryLevel memory_level,
308  const HashType preferred_hash_type,
309  const int device_count,
310  ColumnCacheMap& column_cache,
311  Executor* executor) {
312  CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype()));
313  const auto cols =
314  get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
315  const auto inner_col = cols.first;
316  CHECK(inner_col);
317  const auto& ti = inner_col->get_type_info();
318  auto col_range =
319  getExpressionRange(ti.is_string() ? cols.second : inner_col, query_infos, executor);
320  if (col_range.getType() == ExpressionRangeType::Invalid) {
321  throw HashJoinFail(
322  "Could not compute range for the expressions involved in the equijoin");
323  }
324  if (ti.is_string()) {
325  // The nullable info must be the same as the source column.
326  const auto source_col_range = getExpressionRange(inner_col, query_infos, executor);
327  if (source_col_range.getType() == ExpressionRangeType::Invalid) {
328  throw HashJoinFail(
329  "Could not compute range for the expressions involved in the equijoin");
330  }
331  if (source_col_range.getIntMin() > source_col_range.getIntMax()) {
332  // If the inner column expression range is empty, use the inner col range
333  CHECK_EQ(source_col_range.getIntMin(), int64_t(0));
334  CHECK_EQ(source_col_range.getIntMax(), int64_t(-1));
335  col_range = source_col_range;
336  } else {
337  col_range = ExpressionRange::makeIntRange(
338  std::min(source_col_range.getIntMin(), col_range.getIntMin()),
339  std::max(source_col_range.getIntMax(), col_range.getIntMax()),
340  0,
341  source_col_range.hasNulls());
342  }
343  }
344  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
345  const auto max_hash_entry_count =
347  ? static_cast<size_t>(std::numeric_limits<int32_t>::max() / sizeof(int32_t))
348  : static_cast<size_t>(std::numeric_limits<int32_t>::max());
349 
350  auto bucketized_entry_count_info = get_bucketized_hash_entry_info(
351  ti, col_range, qual_bin_oper->get_optype() == kBW_EQ);
352  auto bucketized_entry_count = bucketized_entry_count_info.getNormalizedHashEntryCount();
353 
354  if (bucketized_entry_count > max_hash_entry_count) {
355  throw TooManyHashEntries();
356  }
357 
358  if (qual_bin_oper->get_optype() == kBW_EQ &&
359  col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
360  throw HashJoinFail("Cannot translate null value for kBW_EQ");
361  }
362  auto join_hash_table =
363  std::shared_ptr<JoinHashTable>(new JoinHashTable(qual_bin_oper,
364  inner_col,
365  query_infos,
366  memory_level,
367  preferred_hash_type,
368  col_range,
369  column_cache,
370  executor,
371  device_count));
372  try {
373  join_hash_table->reify(device_count);
374  } catch (const TableMustBeReplicated& e) {
375  // Throw a runtime error to abort the query
376  join_hash_table->freeHashBufferMemory();
377  throw std::runtime_error(e.what());
378  } catch (const HashJoinFail& e) {
379  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
380  // possible)
381  join_hash_table->freeHashBufferMemory();
382  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
383  "involved in equijoin | ") +
384  e.what());
385  } catch (const ColumnarConversionNotSupported& e) {
386  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
387  e.what());
388  } catch (const OutOfMemory& e) {
389  throw HashJoinFail(
390  std::string("Ran out of memory while building hash tables for equijoin | ") +
391  e.what());
392  } catch (const std::exception& e) {
393  throw std::runtime_error(
394  std::string("Fatal error while attempting to build hash tables for join: ") +
395  e.what());
396  }
397  return join_hash_table;
398 }
399 
401 std::shared_ptr<JoinHashTable> JoinHashTable::getSyntheticInstance(
402  std::string_view table1,
403  std::string_view column1,
404  std::string_view table2,
405  std::string_view column2,
406  const Data_Namespace::MemoryLevel memory_level,
407  const HashType preferred_hash_type,
408  const int device_count,
409  ColumnCacheMap& column_cache,
410  Executor* executor) {
411  auto catalog = executor->getCatalog();
412  CHECK(catalog);
413 
414  auto tmeta1 = catalog->getMetadataForTable(std::string(table1));
415  auto tmeta2 = catalog->getMetadataForTable(std::string(table2));
416 
417  CHECK(tmeta1);
418  CHECK(tmeta2);
419 
420  auto cmeta1 = catalog->getMetadataForColumn(tmeta1->tableId, std::string(column1));
421  auto cmeta2 = catalog->getMetadataForColumn(tmeta2->tableId, std::string(column2));
422 
423  CHECK(cmeta1);
424  CHECK(cmeta2);
425 
426  auto ti1 = cmeta1->columnType;
427  auto ti2 = cmeta2->columnType;
428 
429  auto a1 =
430  std::make_shared<Analyzer::ColumnVar>(ti1, tmeta1->tableId, cmeta1->columnId, 0);
431  auto a2 =
432  std::make_shared<Analyzer::ColumnVar>(ti2, tmeta2->tableId, cmeta2->columnId, 1);
433 
434  auto op = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kEQ, kONE, a1, a2);
435 
436  size_t number_of_join_tables{2};
437  std::vector<InputTableInfo> query_infos(number_of_join_tables);
438  query_infos[0].table_id = tmeta1->tableId;
439  query_infos[0].info = tmeta1->fragmenter->getFragmentsForQuery();
440  query_infos[1].table_id = tmeta2->tableId;
441  query_infos[1].info = tmeta2->fragmenter->getFragmentsForQuery();
442 
443  std::unordered_set<PhysicalInput> phys_inputs;
444  phys_inputs.emplace(PhysicalInput{cmeta1->columnId, cmeta1->tableId});
445  phys_inputs.emplace(PhysicalInput{cmeta2->columnId, cmeta2->tableId});
446 
447  std::unordered_set<int> phys_table_ids;
448  phys_table_ids.insert(cmeta1->tableId);
449  phys_table_ids.insert(cmeta2->tableId);
450 
451  executor->setupCaching(phys_inputs, phys_table_ids);
452 
453  auto hash_table = JoinHashTable::getInstance(op,
454  query_infos,
455  memory_level,
456  preferred_hash_type,
457  device_count,
458  column_cache,
459  executor);
460 
461  return hash_table;
462 }
463 
464 std::pair<const int8_t*, size_t> JoinHashTable::getOneColumnFragment(
465  const Analyzer::ColumnVar& hash_col,
466  const Fragmenter_Namespace::FragmentInfo& fragment,
467  const Data_Namespace::MemoryLevel effective_mem_lvl,
468  const int device_id,
469  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
471  hash_col,
472  fragment,
473  effective_mem_lvl,
474  device_id,
475  chunks_owner,
476  column_cache_);
477 }
478 
479 std::pair<const int8_t*, size_t> JoinHashTable::getAllColumnFragments(
480  const Analyzer::ColumnVar& hash_col,
481  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
482  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
483  std::lock_guard<std::mutex> linearized_multifrag_column_lock(
485  if (linearized_multifrag_column_.first) {
487  }
488  const int8_t* col_buff;
489  size_t total_elem_count;
490  std::tie(col_buff, total_elem_count) = ColumnFetcher::getAllColumnFragments(
491  executor_, hash_col, fragments, chunks_owner, column_cache_);
493  if (!shardCount()) {
494  linearized_multifrag_column_ = {col_buff, total_elem_count};
495  }
496  return {col_buff, total_elem_count};
497 }
498 
500  const Analyzer::Expr* outer_col_expr,
501  const Executor* executor) {
502  const auto catalog = executor->getCatalog();
503  CHECK(catalog);
504  const auto inner_cd = get_column_descriptor_maybe(
505  inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
506  const auto& inner_ti = get_column_type(inner_col->get_column_id(),
507  inner_col->get_table_id(),
508  inner_cd,
509  executor->getTemporaryTables());
510  // Only strings may need dictionary translation.
511  if (!inner_ti.is_string()) {
512  return false;
513  }
514  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
515  CHECK(outer_col);
516  const auto outer_cd = get_column_descriptor_maybe(
517  outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
518  // Don't want to deal with temporary tables for now, require translation.
519  if (!inner_cd || !outer_cd) {
520  return true;
521  }
522  const auto& outer_ti = get_column_type(outer_col->get_column_id(),
523  outer_col->get_table_id(),
524  outer_cd,
525  executor->getTemporaryTables());
526  CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
527  // If the two columns don't share the dictionary, translation is needed.
528  return outer_ti.get_comp_param() != inner_ti.get_comp_param();
529 }
530 
531 std::deque<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
532  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
533  const int device_id,
534  const int device_count) {
535  std::deque<Fragmenter_Namespace::FragmentInfo> shards_for_device;
536  for (const auto& fragment : fragments) {
537  CHECK_GE(fragment.shard, 0);
538  if (fragment.shard % device_count == device_id) {
539  shards_for_device.push_back(fragment);
540  }
541  }
542  return shards_for_device;
543 }
544 
545 void JoinHashTable::reify(const int device_count) {
546  CHECK_LT(0, device_count);
547  const auto& catalog = *executor_->getCatalog();
548  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
549  const auto inner_col = cols.first;
550  checkHashJoinReplicationConstraint(inner_col->get_table_id());
551  const auto& query_info = getInnerQueryInfo(inner_col).info;
552  if (query_info.fragments.empty()) {
553  return;
554  }
555  if (query_info.getNumTuplesUpperBound() >
556  static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
557  throw TooManyHashEntries();
558  }
559 #ifdef HAVE_CUDA
560  gpu_hash_table_buff_.resize(device_count);
561  gpu_hash_table_err_buff_.resize(device_count);
562 #endif // HAVE_CUDA
563  std::vector<std::future<void>> init_threads;
564  const int shard_count = shardCount();
565 
566  try {
567  for (int device_id = 0; device_id < device_count; ++device_id) {
568  const auto fragments =
569  shard_count
570  ? only_shards_for_device(query_info.fragments, device_id, device_count)
571  : query_info.fragments;
572  init_threads.push_back(
573  std::async(std::launch::async,
577  this,
578  fragments,
579  device_id));
580  }
581  for (auto& init_thread : init_threads) {
582  init_thread.wait();
583  }
584  for (auto& init_thread : init_threads) {
585  init_thread.get();
586  }
587 
588  } catch (const NeedsOneToManyHash& e) {
591  init_threads.clear();
592  for (int device_id = 0; device_id < device_count; ++device_id) {
593  const auto fragments =
594  shard_count
595  ? only_shards_for_device(query_info.fragments, device_id, device_count)
596  : query_info.fragments;
597 
598  init_threads.push_back(std::async(std::launch::async,
600  this,
601  fragments,
602  device_id));
603  }
604  for (auto& init_thread : init_threads) {
605  init_thread.wait();
606  }
607  for (auto& init_thread : init_threads) {
608  init_thread.get();
609  }
610  }
611 }
612 
613 std::pair<const int8_t*, size_t> JoinHashTable::fetchFragments(
614  const Analyzer::ColumnVar* hash_col,
615  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragment_info,
616  const Data_Namespace::MemoryLevel effective_memory_level,
617  const int device_id,
618  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
619  ThrustAllocator& dev_buff_owner) {
620  static std::mutex fragment_fetch_mutex;
621  const bool has_multi_frag = fragment_info.size() > 1;
622  const auto& catalog = *executor_->getCatalog();
623  auto& data_mgr = catalog.getDataMgr();
624  const auto& first_frag = fragment_info.front();
625  const int8_t* col_buff = nullptr;
626  size_t elem_count = 0;
627 
628  const size_t elem_width = hash_col->get_type_info().get_size();
629  if (has_multi_frag) {
630  std::tie(col_buff, elem_count) =
631  getAllColumnFragments(*hash_col, fragment_info, chunks_owner);
632  }
633 
634  {
635  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
636  if (has_multi_frag) {
637  if (effective_memory_level == Data_Namespace::GPU_LEVEL && col_buff) {
638  CHECK_NE(elem_count, size_t(0));
639  int8_t* dev_col_buff = nullptr;
640  dev_col_buff = dev_buff_owner.allocate(elem_count * elem_width);
641  copy_to_gpu(&data_mgr,
642  reinterpret_cast<CUdeviceptr>(dev_col_buff),
643  col_buff,
644  elem_count * elem_width,
645  device_id);
646  col_buff = dev_col_buff;
647  }
648  } else {
649  std::tie(col_buff, elem_count) = getOneColumnFragment(
650  *hash_col, first_frag, effective_memory_level, device_id, chunks_owner);
651  }
652  }
653  return {col_buff, elem_count};
654 }
655 
657  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
658  const Analyzer::Expr* outer_col_expr,
659  const Analyzer::ColumnVar* inner_col) const {
660  ChunkKey hash_table_key{executor_->getCatalog()->getCurrentDB().dbId,
661  inner_col->get_table_id(),
662  inner_col->get_column_id()};
663  const auto& ti = inner_col->get_type_info();
664  if (ti.is_string()) {
665  CHECK_EQ(kENCODING_DICT, ti.get_compression());
666  size_t outer_elem_count = 0;
667  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(outer_col_expr);
668  CHECK(outer_col);
669  const auto& outer_query_info = getInnerQueryInfo(outer_col).info;
670  for (auto& frag : outer_query_info.fragments) {
671  outer_elem_count = frag.getNumTuples();
672  }
673  hash_table_key.push_back(outer_elem_count);
674  }
675  if (fragments.size() < 2) {
676  hash_table_key.push_back(fragments.front().fragmentId);
677  }
678  return hash_table_key;
679 }
680 
682  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
683  const int device_id) {
684  const auto& catalog = *executor_->getCatalog();
685  auto& data_mgr = catalog.getDataMgr();
686  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
687  const auto inner_col = cols.first;
688  CHECK(inner_col);
689  const auto inner_cd = get_column_descriptor_maybe(
690  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
691  if (inner_cd && inner_cd->isVirtualCol) {
693  }
694  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
695  // Since we don't have the string dictionary payloads on the GPU, we'll build
696  // the join hash table on the CPU and transfer it to the GPU.
697  const auto effective_memory_level =
698  needs_dictionary_translation(inner_col, cols.second, executor_)
700  : memory_level_;
701  if (fragments.empty()) {
702  // No data in this fragment. Still need to create a hash table and initialize it
703  // properly.
704  ChunkKey empty_chunk;
706  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
707  }
708 
709  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
710  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
711  const int8_t* col_buff = nullptr;
712  size_t elem_count = 0;
713 
714  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
715  fragments,
716  effective_memory_level,
717  device_id,
718  chunks_owner,
719  dev_buff_owner);
720 
721  initHashTableForDevice(genHashTableKey(fragments, cols.second, inner_col),
722  col_buff,
723  elem_count,
724  cols,
725  effective_memory_level,
726  device_id);
727 }
728 
730  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
731  const int device_id) {
732  const auto& catalog = *executor_->getCatalog();
733  auto& data_mgr = catalog.getDataMgr();
734  const auto cols = get_cols(qual_bin_oper_.get(), catalog, executor_->temporary_tables_);
735  const auto inner_col = cols.first;
736  CHECK(inner_col);
737  const auto inner_cd = get_column_descriptor_maybe(
738  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
739  if (inner_cd && inner_cd->isVirtualCol) {
741  }
742  CHECK(!inner_cd || !(inner_cd->isVirtualCol));
743  // Since we don't have the string dictionary payloads on the GPU, we'll build
744  // the join hash table on the CPU and transfer it to the GPU.
745  const auto effective_memory_level =
746  needs_dictionary_translation(inner_col, cols.second, executor_)
748  : memory_level_;
749  if (fragments.empty()) {
750  ChunkKey empty_chunk;
752  empty_chunk, nullptr, 0, cols, effective_memory_level, device_id);
753  return;
754  }
755 
756  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
757  ThrustAllocator dev_buff_owner(&data_mgr, device_id);
758  const int8_t* col_buff = nullptr;
759  size_t elem_count = 0;
760 
761  std::tie(col_buff, elem_count) = fetchFragments(inner_col,
762  fragments,
763  effective_memory_level,
764  device_id,
765  chunks_owner,
766  dev_buff_owner);
767 
768  initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col),
769  col_buff,
770  elem_count,
771  cols,
772  effective_memory_level,
773  device_id);
774 }
775 
777  if (!g_cluster) {
778  return;
779  }
780  if (table_id >= 0) {
781  const auto inner_td = executor_->getCatalog()->getMetadataForTable(table_id);
782  CHECK(inner_td);
783  size_t shard_count{0};
784  shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
785  if (!shard_count && !table_is_replicated(inner_td)) {
786  throw TableMustBeReplicated(inner_td->tableName);
787  }
788  }
789 }
790 
792  const int8_t* col_buff,
793  const size_t num_elements,
794  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
795  const HashEntryInfo hash_entry_info,
796  const int32_t hash_join_invalid_val) {
797  const auto inner_col = cols.first;
798  CHECK(inner_col);
799  const auto& ti = inner_col->get_type_info();
800  if (!cpu_hash_table_buff_) {
801  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
802  hash_entry_info.getNormalizedHashEntryCount());
803  const StringDictionaryProxy* sd_inner_proxy{nullptr};
804  const StringDictionaryProxy* sd_outer_proxy{nullptr};
805  if (ti.is_string()) {
806  CHECK_EQ(kENCODING_DICT, ti.get_compression());
807  sd_inner_proxy = executor_->getStringDictionaryProxy(
808  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
809  CHECK(sd_inner_proxy);
810  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
811  CHECK(outer_col);
812  sd_outer_proxy = executor_->getStringDictionaryProxy(
813  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
814  CHECK(sd_outer_proxy);
815  }
816  int thread_count = cpu_threads();
817  std::vector<std::thread> init_cpu_buff_threads;
818  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
819  init_cpu_buff_threads.emplace_back(
820  [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] {
822  hash_entry_info.getNormalizedHashEntryCount(),
823  hash_join_invalid_val,
824  thread_idx,
825  thread_count);
826  });
827  }
828  for (auto& t : init_cpu_buff_threads) {
829  t.join();
830  }
831  init_cpu_buff_threads.clear();
832  int err{0};
833  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
834  init_cpu_buff_threads.emplace_back([this,
835  hash_join_invalid_val,
836  col_buff,
837  num_elements,
838  sd_inner_proxy,
839  sd_outer_proxy,
840  thread_idx,
841  thread_count,
842  &ti,
843  &err,
844  hash_entry_info] {
845  int partial_err =
847  hash_join_invalid_val,
848  {col_buff, num_elements},
849  {static_cast<size_t>(ti.get_size()),
853  isBitwiseEq(),
854  col_range_.getIntMax() + 1,
856  sd_inner_proxy,
857  sd_outer_proxy,
858  thread_idx,
859  thread_count,
860  hash_entry_info.bucket_normalization);
861  __sync_val_compare_and_swap(&err, 0, partial_err);
862  });
863  }
864  for (auto& t : init_cpu_buff_threads) {
865  t.join();
866  }
867  if (err) {
868  cpu_hash_table_buff_.reset();
869  // Too many hash entries, need to retry with a 1:many table
870  throw NeedsOneToManyHash();
871  }
872  } else {
873  if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) {
874  // Too many hash entries, need to retry with a 1:many table
875  throw NeedsOneToManyHash();
876  }
877  }
878 }
879 
881  const int8_t* col_buff,
882  const size_t num_elements,
883  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
884  const HashEntryInfo hash_entry_info,
885  const int32_t hash_join_invalid_val) {
886  const auto inner_col = cols.first;
887  CHECK(inner_col);
888  const auto& ti = inner_col->get_type_info();
889  if (cpu_hash_table_buff_) {
890  return;
891  }
892  cpu_hash_table_buff_ = std::make_shared<std::vector<int32_t>>(
893  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements);
894  const StringDictionaryProxy* sd_inner_proxy{nullptr};
895  const StringDictionaryProxy* sd_outer_proxy{nullptr};
896  if (ti.is_string()) {
897  CHECK_EQ(kENCODING_DICT, ti.get_compression());
898  sd_inner_proxy = executor_->getStringDictionaryProxy(
899  inner_col->get_comp_param(), executor_->row_set_mem_owner_, true);
900  CHECK(sd_inner_proxy);
901  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
902  CHECK(outer_col);
903  sd_outer_proxy = executor_->getStringDictionaryProxy(
904  outer_col->get_comp_param(), executor_->row_set_mem_owner_, true);
905  CHECK(sd_outer_proxy);
906  }
907  int thread_count = cpu_threads();
908  std::vector<std::future<void>> init_threads;
909  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
910  init_threads.emplace_back(std::async(std::launch::async,
912  &(*cpu_hash_table_buff_)[0],
913  hash_entry_info.getNormalizedHashEntryCount(),
914  hash_join_invalid_val,
915  thread_idx,
916  thread_count));
917  }
918  for (auto& child : init_threads) {
919  child.wait();
920  }
921  for (auto& child : init_threads) {
922  child.get();
923  }
924 
925  if (ti.get_type() == kDATE) {
927  hash_entry_info,
928  hash_join_invalid_val,
929  {col_buff, num_elements},
930  {static_cast<size_t>(ti.get_size()),
934  isBitwiseEq(),
935  col_range_.getIntMax() + 1,
937  sd_inner_proxy,
938  sd_outer_proxy,
939  thread_count);
940  } else {
942  hash_entry_info,
943  hash_join_invalid_val,
944  {col_buff, num_elements},
945  {static_cast<size_t>(ti.get_size()),
949  isBitwiseEq(),
950  col_range_.getIntMax() + 1,
952  sd_inner_proxy,
953  sd_outer_proxy,
954  thread_count);
955  }
956 }
957 
958 namespace {
959 
960 #ifdef HAVE_CUDA
961 // Number of entries per shard, rounded up.
962 size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count) {
963  CHECK_NE(size_t(0), shard_count);
964  return (total_entry_count + shard_count - 1) / shard_count;
965 }
966 #endif // HAVE_CUDA
967 
968 } // namespace
969 
971  const ChunkKey& chunk_key,
972  const int8_t* col_buff,
973  const size_t num_elements,
974  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
975  const Data_Namespace::MemoryLevel effective_memory_level,
976  const int device_id) {
977  const auto inner_col = cols.first;
978  CHECK(inner_col);
979 
980  auto hash_entry_info = get_bucketized_hash_entry_info(
981  inner_col->get_type_info(), col_range_, isBitwiseEq());
982  if (!hash_entry_info) {
983  return;
984  }
985 
986 #ifdef HAVE_CUDA
987  const auto shard_count = shardCount();
988  const size_t entries_per_shard{
989  shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
990  : 0};
991  // Even if we join on dictionary encoded strings, the memory on the GPU is still
992  // needed once the join hash table has been built on the CPU.
993  const auto catalog = executor_->getCatalog();
995  auto& data_mgr = catalog->getDataMgr();
996  if (shard_count) {
997  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
998  CHECK_GT(shards_per_device, 0u);
999  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1000  }
1001  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1002  &data_mgr,
1003  hash_entry_info.getNormalizedHashEntryCount() * sizeof(int32_t),
1004  device_id);
1005  }
1006 #else
1007  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1008 #endif
1009 
1010 #ifdef HAVE_CUDA
1011  const auto& ti = inner_col->get_type_info();
1012 #endif
1013  const int32_t hash_join_invalid_val{-1};
1014  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1015  CHECK(!chunk_key.empty());
1016  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1017  {
1018  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1020  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1021  }
1022  if (inner_col->get_table_id() > 0) {
1023  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1024  }
1025  // Transfer the hash table on the GPU if we've only built it on CPU
1026  // but the query runs on GPU (join on dictionary encoded columns).
1028 #ifdef HAVE_CUDA
1029  CHECK(ti.is_string());
1030  auto& data_mgr = catalog->getDataMgr();
1031  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1032 
1033  copy_to_gpu(
1034  &data_mgr,
1035  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1036  &(*cpu_hash_table_buff_)[0],
1037  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1038  device_id);
1039 #else
1040  CHECK(false);
1041 #endif
1042  }
1043  } else {
1044 #ifdef HAVE_CUDA
1045  int err{0};
1046  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1047  auto& data_mgr = catalog->getDataMgr();
1048  gpu_hash_table_err_buff_[device_id] =
1049  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
1050  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(
1051  gpu_hash_table_err_buff_[device_id]->getMemoryPtr());
1052  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
1054  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1055  hash_entry_info.getNormalizedHashEntryCount(),
1056  hash_join_invalid_val,
1057  executor_->blockSize(),
1058  executor_->gridSize());
1059  if (chunk_key.empty()) {
1060  return;
1061  }
1062  JoinColumn join_column{col_buff, num_elements};
1063  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1067  isBitwiseEq(),
1068  col_range_.getIntMax() + 1,
1070  if (shard_count) {
1071  CHECK_GT(device_count_, 0);
1072  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1073  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1075  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1076  hash_join_invalid_val,
1077  reinterpret_cast<int*>(dev_err_buff),
1078  join_column,
1079  type_info,
1080  shard_info,
1081  executor_->blockSize(),
1082  executor_->gridSize(),
1083  hash_entry_info.bucket_normalization);
1084  }
1085  } else {
1087  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1088  hash_join_invalid_val,
1089  reinterpret_cast<int*>(dev_err_buff),
1090  join_column,
1091  type_info,
1092  executor_->blockSize(),
1093  executor_->gridSize(),
1094  hash_entry_info.bucket_normalization);
1095  }
1096  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
1097 
1098  if (err) {
1099  throw NeedsOneToManyHash();
1100  }
1101 #else
1102  CHECK(false);
1103 #endif
1104  }
1105 }
1106 
1108  const ChunkKey& chunk_key,
1109  const int8_t* col_buff,
1110  const size_t num_elements,
1111  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
1112  const Data_Namespace::MemoryLevel effective_memory_level,
1113  const int device_id) {
1114  auto const inner_col = cols.first;
1115  CHECK(inner_col);
1116 
1117  auto hash_entry_info = get_bucketized_hash_entry_info(
1118  inner_col->get_type_info(), col_range_, isBitwiseEq());
1119 
1120 #ifdef HAVE_CUDA
1121  const auto shard_count = get_shard_count(qual_bin_oper_.get(), executor_);
1122  const size_t entries_per_shard =
1123  (shard_count ? get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count)
1124  : 0);
1125  // Even if we join on dictionary encoded strings, the memory on the GPU is still
1126  // needed once the join hash table has been built on the CPU.
1127  if (memory_level_ == Data_Namespace::GPU_LEVEL && shard_count) {
1128  const auto shards_per_device = (shard_count + device_count_ - 1) / device_count_;
1129  CHECK_GT(shards_per_device, 0u);
1130  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
1131  }
1132 #else
1133  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
1134 #endif
1135  if (!device_id) {
1136  hash_entry_count_ = hash_entry_info.getNormalizedHashEntryCount();
1137  }
1138 
1139 #ifdef HAVE_CUDA
1140  const auto& ti = inner_col->get_type_info();
1141  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1143  const size_t total_count =
1144  2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements;
1145  gpu_hash_table_buff_[device_id] = CudaAllocator::allocGpuAbstractBuffer(
1146  &data_mgr, total_count * sizeof(int32_t), device_id);
1147  }
1148 #endif
1149  const int32_t hash_join_invalid_val{-1};
1150  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
1151  initHashTableOnCpuFromCache(chunk_key, num_elements, cols);
1152  {
1153  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1155  col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val);
1156  }
1157  if (inner_col->get_table_id() > 0) {
1158  putHashTableOnCpuToCache(chunk_key, num_elements, cols);
1159  }
1160  // Transfer the hash table on the GPU if we've only built it on CPU
1161  // but the query runs on GPU (join on dictionary encoded columns).
1162  // Don't transfer the buffer if there was an error since we'll bail anyway.
1164 #ifdef HAVE_CUDA
1165  CHECK(ti.is_string());
1166  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1167  copy_to_gpu(
1168  &data_mgr,
1169  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1170  &(*cpu_hash_table_buff_)[0],
1171  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1172  device_id);
1173 #else
1174  CHECK(false);
1175 #endif
1176  }
1177  } else {
1178 #ifdef HAVE_CUDA
1179  CHECK_EQ(Data_Namespace::GPU_LEVEL, effective_memory_level);
1180  data_mgr.getCudaMgr()->setContext(device_id);
1182  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1183  hash_entry_info.getNormalizedHashEntryCount(),
1184  hash_join_invalid_val,
1185  executor_->blockSize(),
1186  executor_->gridSize());
1187  JoinColumn join_column{col_buff, num_elements};
1188  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
1192  isBitwiseEq(),
1193  col_range_.getIntMax() + 1,
1195  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
1196 
1197  if (shard_count) {
1198  CHECK_GT(device_count_, 0);
1199  for (size_t shard = device_id; shard < shard_count; shard += device_count_) {
1200  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count_};
1202  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1203  hash_entry_info,
1204  hash_join_invalid_val,
1205  join_column,
1206  type_info,
1207  shard_info,
1208  executor_->blockSize(),
1209  executor_->gridSize());
1210  }
1211  } else {
1212  if (use_bucketization) {
1214  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1215  hash_entry_info,
1216  hash_join_invalid_val,
1217  join_column,
1218  type_info,
1219  executor_->blockSize(),
1220  executor_->gridSize());
1221  } else {
1223  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1224  hash_entry_info,
1225  hash_join_invalid_val,
1226  join_column,
1227  type_info,
1228  executor_->blockSize(),
1229  executor_->gridSize());
1230  }
1231  }
1232 #else
1233  CHECK(false);
1234 #endif
1235  }
1236 }
1237 
1239  const ChunkKey& chunk_key,
1240  const size_t num_elements,
1241  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1242  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1244  *cols.first,
1245  outer_col ? *outer_col : *cols.first,
1246  num_elements,
1247  chunk_key,
1248  qual_bin_oper_->get_optype()};
1249  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1250  for (const auto& kv : join_hash_table_cache_) {
1251  if (kv.first == cache_key) {
1252  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
1253  cpu_hash_table_buff_ = kv.second;
1254  break;
1255  }
1256  }
1257 }
1258 
1260  const ChunkKey& chunk_key,
1261  const size_t num_elements,
1262  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols) {
1263  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
1265  *cols.first,
1266  outer_col ? *outer_col : *cols.first,
1267  num_elements,
1268  chunk_key,
1269  qual_bin_oper_->get_optype()};
1270  std::lock_guard<std::mutex> join_hash_table_cache_lock(join_hash_table_cache_mutex_);
1271  for (const auto& kv : join_hash_table_cache_) {
1272  if (kv.first == cache_key) {
1273  return;
1274  }
1275  }
1276  join_hash_table_cache_.emplace_back(cache_key, cpu_hash_table_buff_);
1277 }
1278 
1279 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx) {
1280  const auto hash_ptr = codegenHashTableLoad(table_idx, executor_);
1281  if (hash_ptr->getType()->isIntegerTy(64)) {
1282  return hash_ptr;
1283  }
1284  CHECK(hash_ptr->getType()->isPointerTy());
1285  return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
1286  get_arg_by_name(executor_->cgen_state_->row_func_, "join_hash_tables"),
1287  llvm::Type::getInt64Ty(executor_->cgen_state_->context_));
1288 }
1289 
1290 llvm::Value* JoinHashTable::codegenHashTableLoad(const size_t table_idx,
1291  Executor* executor) {
1292  llvm::Value* hash_ptr = nullptr;
1293  const auto total_table_count =
1294  executor->plan_state_->join_info_.join_hash_tables_.size();
1295  CHECK_LT(table_idx, total_table_count);
1296  if (total_table_count > 1) {
1297  auto hash_tables_ptr =
1298  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1299  auto hash_pptr =
1300  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
1301  hash_tables_ptr,
1302  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
1303  : hash_tables_ptr;
1304  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
1305  } else {
1306  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
1307  }
1308  CHECK(hash_ptr);
1309  return hash_ptr;
1310 }
1311 
1312 std::vector<llvm::Value*> JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr,
1313  const Analyzer::Expr* key_col,
1314  const int shard_count,
1315  const CompilationOptions& co) {
1316  CodeGenerator code_generator(executor_);
1317  const auto key_lvs = code_generator.codegen(key_col, true, co);
1318  CHECK_EQ(size_t(1), key_lvs.size());
1319  auto const& key_col_ti = key_col->get_type_info();
1320  auto hash_entry_info =
1322 
1323  std::vector<llvm::Value*> hash_join_idx_args{
1324  hash_ptr,
1325  executor_->cgen_state_->castToTypeIn(key_lvs.front(), 64),
1326  executor_->cgen_state_->llInt(col_range_.getIntMin()),
1327  executor_->cgen_state_->llInt(col_range_.getIntMax())};
1328  if (shard_count) {
1329  const auto expected_hash_entry_count =
1331  const auto entry_count_per_shard =
1332  (expected_hash_entry_count + shard_count - 1) / shard_count;
1333  hash_join_idx_args.push_back(
1334  executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
1335  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(shard_count));
1336  hash_join_idx_args.push_back(executor_->cgen_state_->llInt<uint32_t>(device_count_));
1337  }
1338  auto key_col_logical_ti = get_logical_type_info(key_col->get_type_info());
1339  if (!key_col_logical_ti.get_notnull() || isBitwiseEq()) {
1340  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1341  inline_fixed_encoding_null_val(key_col_logical_ti)));
1342  }
1343  auto special_date_bucketization_case = key_col_ti.get_type() == kDATE;
1344  if (isBitwiseEq()) {
1345  if (special_date_bucketization_case) {
1346  hash_join_idx_args.push_back(executor_->cgen_state_->llInt(
1347  col_range_.getIntMax() / hash_entry_info.bucket_normalization + 1));
1348  } else {
1349  hash_join_idx_args.push_back(
1350  executor_->cgen_state_->llInt(col_range_.getIntMax() + 1));
1351  }
1352  }
1353 
1354  if (special_date_bucketization_case) {
1355  hash_join_idx_args.emplace_back(
1356  executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1357  }
1358 
1359  return hash_join_idx_args;
1360 }
1361 
1363  const size_t index) {
1364  const auto cols = get_cols(
1365  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1366  auto key_col = cols.second;
1367  CHECK(key_col);
1368  auto val_col = cols.first;
1369  CHECK(val_col);
1370  auto pos_ptr = codegenHashTableLoad(index);
1371  CHECK(pos_ptr);
1372  const int shard_count = shardCount();
1373  auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co);
1374  const int64_t sub_buff_size = getComponentBufferSize();
1375  const auto& key_col_ti = key_col->get_type_info();
1376 
1377  auto bucketize = (key_col_ti.get_type() == kDATE);
1378  return codegenMatchingSet(hash_join_idx_args,
1379  shard_count,
1380  !key_col_ti.get_notnull(),
1381  isBitwiseEq(),
1382  sub_buff_size,
1383  executor_,
1384  bucketize);
1385 }
1386 
1388  const std::vector<llvm::Value*>& hash_join_idx_args_in,
1389  const bool is_sharded,
1390  const bool col_is_nullable,
1391  const bool is_bw_eq,
1392  const int64_t sub_buff_size,
1393  Executor* executor,
1394  bool is_bucketized) {
1395  using namespace std::string_literals;
1396 
1397  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
1398 
1399  if (is_bw_eq) {
1400  fname += "_bitwise";
1401  }
1402  if (is_sharded) {
1403  fname += "_sharded";
1404  }
1405  if (!is_bw_eq && col_is_nullable) {
1406  fname += "_nullable";
1407  }
1408 
1409  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
1410  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
1411  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
1412 
1413  auto pos_ptr = hash_join_idx_args_in[0];
1414  CHECK(pos_ptr);
1415 
1416  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
1417  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
1418  auto hash_join_idx_args = hash_join_idx_args_in;
1419  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
1420  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
1421 
1422  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
1423  slot_valid_lv,
1424  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
1425  executor->cgen_state_->llInt(int64_t(0)));
1426  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
1427  executor->cgen_state_->ir_builder_.CreateAdd(
1428  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
1429  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
1430  auto rowid_ptr_i32 =
1431  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
1432  return {rowid_ptr_i32, row_count_lv, slot_lv};
1433 }
1434 
1435 size_t JoinHashTable::offsetBufferOff() const noexcept {
1437  return 0;
1438 }
1439 
1440 size_t JoinHashTable::countBufferOff() const noexcept {
1442  return getComponentBufferSize();
1443 }
1444 
1445 size_t JoinHashTable::payloadBufferOff() const noexcept {
1447  return 2 * getComponentBufferSize();
1448 }
1449 
1450 size_t JoinHashTable::getComponentBufferSize() const noexcept {
1451  return hash_entry_count_ * sizeof(int32_t);
1452 }
1453 
1455  const int device_id) const noexcept {
1456  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1457  return 0;
1458  }
1459 #ifdef HAVE_CUDA
1460  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1461  if (device_type == ExecutorDeviceType::CPU) {
1462  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1463  } else {
1464  return gpu_hash_table_buff_[device_id]
1465  ? reinterpret_cast<CUdeviceptr>(
1466  gpu_hash_table_buff_[device_id]->getMemoryPtr())
1467  : reinterpret_cast<CUdeviceptr>(nullptr);
1468  }
1469 #else
1470  CHECK(device_type == ExecutorDeviceType::CPU);
1471  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
1472 #endif
1473 }
1474 
1476  const int device_id) const noexcept {
1477  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
1478  return 0;
1479  }
1480 #ifdef HAVE_CUDA
1481  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
1482  if (device_type == ExecutorDeviceType::CPU) {
1483  return cpu_hash_table_buff_->size() *
1484  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1485  } else {
1486  return gpu_hash_table_buff_[device_id]
1487  ? gpu_hash_table_buff_[device_id]->reservedSize()
1488  : 0;
1489  }
1490 #else
1491  CHECK(device_type == ExecutorDeviceType::CPU);
1492  return cpu_hash_table_buff_->size() *
1493  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
1494 #endif
1495 }
1496 
1497 std::string JoinHashTable::toString(const ExecutorDeviceType device_type,
1498  const int device_id,
1499  bool raw) const noexcept {
1500  auto buffer = getJoinHashBuffer(device_type, device_id);
1501  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1502 #ifdef HAVE_CUDA
1503  std::unique_ptr<int8_t[]> buffer_copy;
1504  if (device_type == ExecutorDeviceType::GPU) {
1505  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1506 
1507  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1508  buffer_copy.get(),
1509  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1510  buffer_size,
1511  device_id);
1512  }
1513  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1514 #else
1515  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1516 #endif // HAVE_CUDA
1517  auto ptr2 = ptr1 + offsetBufferOff();
1518  auto ptr3 = ptr1 + countBufferOff();
1519  auto ptr4 = ptr1 + payloadBufferOff();
1521  1, sizeof(int32_t), ptr1, ptr2, ptr3, ptr4, buffer_size, raw);
1522 }
1523 
1524 std::set<DecodedJoinHashBufferEntry> JoinHashTable::decodeJoinHashBuffer(
1525  const ExecutorDeviceType device_type,
1526  const int device_id) const noexcept {
1527  auto buffer = getJoinHashBuffer(device_type, device_id);
1528  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
1529 #ifdef HAVE_CUDA
1530  std::unique_ptr<int8_t[]> buffer_copy;
1531  if (device_type == ExecutorDeviceType::GPU) {
1532  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1533 
1534  copy_from_gpu(&executor_->getCatalog()->getDataMgr(),
1535  buffer_copy.get(),
1536  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1537  buffer_size,
1538  device_id);
1539  }
1540  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1541 #else
1542  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1543 #endif // HAVE_CUDA
1544  auto ptr2 = ptr1 + offsetBufferOff();
1545  auto ptr3 = ptr1 + countBufferOff();
1546  auto ptr4 = ptr1 + payloadBufferOff();
1547  return ::decodeJoinHashBuffer(1, sizeof(int32_t), ptr1, ptr2, ptr3, ptr4, buffer_size);
1548 }
1549 
1551  const size_t index) {
1552  using namespace std::string_literals;
1553 
1555  const auto cols = get_cols(
1556  qual_bin_oper_.get(), *executor_->getCatalog(), executor_->temporary_tables_);
1557  auto key_col = cols.second;
1558  CHECK(key_col);
1559  auto val_col = cols.first;
1560  CHECK(val_col);
1561  CodeGenerator code_generator(executor_);
1562  const auto key_lvs = code_generator.codegen(key_col, true, co);
1563  CHECK_EQ(size_t(1), key_lvs.size());
1564  auto hash_ptr = codegenHashTableLoad(index);
1565  CHECK(hash_ptr);
1566  const int shard_count = shardCount();
1567  const auto hash_join_idx_args = getHashJoinArgs(hash_ptr, key_col, shard_count, co);
1568 
1569  const auto& key_col_ti = key_col->get_type_info();
1570  std::string fname((key_col_ti.get_type() == kDATE) ? "bucketized_hash_join_idx"s
1571  : "hash_join_idx"s);
1572 
1573  if (isBitwiseEq()) {
1574  fname += "_bitwise";
1575  }
1576  if (shard_count) {
1577  fname += "_sharded";
1578  }
1579 
1580  if (!isBitwiseEq() && !key_col_ti.get_notnull()) {
1581  fname += "_nullable";
1582  }
1583  return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1584 }
1585 
1587  const Analyzer::ColumnVar* inner_col) const {
1588  return get_inner_query_info(inner_col->get_table_id(), query_infos_);
1589 }
1590 
1592  const int inner_table_id,
1593  const std::vector<InputTableInfo>& query_infos) {
1594  ssize_t ti_idx = -1;
1595  for (size_t i = 0; i < query_infos.size(); ++i) {
1596  if (inner_table_id == query_infos[i].table_id) {
1597  ti_idx = i;
1598  break;
1599  }
1600  }
1601  CHECK_NE(ssize_t(-1), ti_idx);
1602  return query_infos[ti_idx];
1603 }
1604 
1605 size_t get_entries_per_device(const size_t total_entries,
1606  const size_t shard_count,
1607  const size_t device_count,
1608  const Data_Namespace::MemoryLevel memory_level) {
1609  const auto entries_per_shard =
1610  shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1611  size_t entries_per_device = entries_per_shard;
1612  if (memory_level == Data_Namespace::GPU_LEVEL && shard_count) {
1613  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1614  CHECK_GT(shards_per_device, 0u);
1615  entries_per_device = entries_per_shard * shards_per_device;
1616  }
1617  return entries_per_device;
1618 }
1619 
1620 // TODO(adb): unify with BaselineJoinHashTable
1624  : 0;
1625 }
1626 
1628  return qual_bin_oper_->get_optype() == kBW_EQ;
1629 }
1630 
1632 #ifdef HAVE_CUDA
1634 #endif
1636 }
1637 
1639 #ifdef HAVE_CUDA
1640  const auto& catalog = *executor_->getCatalog();
1641  auto& data_mgr = catalog.getDataMgr();
1642  for (auto& buf : gpu_hash_table_buff_) {
1643  if (buf) {
1644  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1645  buf = nullptr;
1646  }
1647  }
1648  for (auto& buf : gpu_hash_table_err_buff_) {
1649  if (buf) {
1650  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1651  buf = nullptr;
1652  }
1653  }
1654 #else
1655  CHECK(false);
1656 #endif // HAVE_CUDA
1657 }
1658 
1660  cpu_hash_table_buff_.reset();
1661 }
int get_table_id() const
Definition: Analyzer.h:194
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
int64_t getIntMin() const
bool isBitwiseEq() const
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const std::vector< InputTableInfo > & query_infos_
std::vector< int > ChunkKey
Definition: types.h:35
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::pair< const int8_t *, size_t > linearized_multifrag_column_
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
bool g_cluster
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
#define IS_EQUIVALENCE(X)
Definition: sqldefs.h:67
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ExecutorDeviceType
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
int8_t * allocate(std::ptrdiff_t num_bytes)
const int device_count_
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const size_t block_size_x, const size_t grid_size_x)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
const Expr * get_right_operand() const
Definition: Analyzer.h:437
void freeHashBufferMemory()
size_t payloadBufferOff() const noexceptoverride
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:187
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
#define CHECK_GE(x, y)
Definition: Logger.h:203
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
Definition: sqldefs.h:49
Definition: sqldefs.h:30
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
ColumnCacheMap & column_cache_
std::string decodeJoinHashBufferToString(size_t key_component_count, size_t key_component_width, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
HashType hash_type_
T visit(const Analyzer::Expr *expr) const
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
#define CHECK_GT(x, y)
Definition: Logger.h:202
void checkHashJoinReplicationConstraint(const int table_id) const
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
std::mutex linearized_multifrag_column_mutex_
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
CHECK(cgen_state)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:117
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
#define CHECK_NE(x, y)
Definition: Logger.h:199
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const size_t block_size_x, const size_t grid_size_x)
size_t offsetBufferOff() const noexceptoverride
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
int64_t bucket_normalization
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::set< DecodedJoinHashBufferEntry > decodeJoinHashBuffer(size_t key_component_count, size_t key_component_width, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:25
#define CHECK_LT(x, y)
Definition: Logger.h:200
size_t shardCount() const
Definition: sqltypes.h:56
Executor * executor_
RowSetMemoryOwner linearized_multifrag_column_owner_
Definition: sqldefs.h:69
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
bool table_is_replicated(const TableDescriptor *td)
ExpressionRange col_range_
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
size_t getComponentBufferSize() const noexcept
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
int64_t getBucket() const
std::set< DecodedJoinHashBufferEntry > decodeJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
size_t countBufferOff() const noexceptoverride
Definition: sqldefs.h:31
void reify(const int device_count)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::string toString(const ExecutorDeviceType device_type, const int device_id, bool raw=false) const noexceptoverride
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
const Expr * get_left_operand() const
Definition: Analyzer.h:436
bool is_overlaps_oper() const
Definition: Analyzer.h:434
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
static std::pair< const int8_t *, size_t > getAllColumnFragments(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HashType getHashType() const noexceptoverride
int get_column_id() const
Definition: Analyzer.h:195
const Data_Namespace::MemoryLevel memory_level_
void addColBuffer(const void *col_buffer)
static std::shared_ptr< JoinHashTable > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:25
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
size_t hash_entry_count_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const size_t block_size_x, const size_t grid_size_x, const int64_t bucket_normalization)
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)