OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
HashJoin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
28 
29 extern bool g_enable_overlaps_hashjoin;
30 
32  const std::vector<double>& inverse_bucket_sizes_for_dimension,
33  const std::vector<InnerOuter> inner_outer_pairs) {
34  join_buckets.clear();
35 
36  CHECK_EQ(inner_outer_pairs.size(), join_columns.size());
37  CHECK_EQ(join_columns.size(), join_column_types.size());
38  for (size_t i = 0; i < join_columns.size(); i++) {
39  const auto& inner_outer_pair = inner_outer_pairs[i];
40  const auto inner_col = inner_outer_pair.first;
41  const auto& ti = inner_col->get_type_info();
42  const auto elem_ti = ti.get_elem_type();
43  CHECK(elem_ti.is_fp());
44 
45  join_buckets.emplace_back(JoinBucketInfo{inverse_bucket_sizes_for_dimension,
46  elem_ti.get_type() == kDOUBLE});
47  }
48 }
49 
55  const Analyzer::ColumnVar* hash_col,
56  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
57  const Data_Namespace::MemoryLevel effective_memory_level,
58  const int device_id,
59  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
60  DeviceAllocator* dev_buff_owner,
61  std::vector<std::shared_ptr<void>>& malloc_owner,
62  Executor* executor,
63  ColumnCacheMap* column_cache) {
64  static std::mutex fragment_fetch_mutex;
65  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
66  try {
67  JoinColumn join_column = ColumnFetcher::makeJoinColumn(executor,
68  *hash_col,
69  fragment_info,
70  effective_memory_level,
71  device_id,
72  dev_buff_owner,
73  /*thread_idx=*/0,
74  chunks_owner,
75  malloc_owner,
76  *column_cache);
77  if (effective_memory_level == Data_Namespace::GPU_LEVEL) {
78  CHECK(dev_buff_owner);
79  auto device_col_chunks_buff = dev_buff_owner->alloc(join_column.col_chunks_buff_sz);
80  dev_buff_owner->copyToDevice(device_col_chunks_buff,
81  join_column.col_chunks_buff,
82  join_column.col_chunks_buff_sz);
83  join_column.col_chunks_buff = device_col_chunks_buff;
84  }
85  return join_column;
86  } catch (...) {
87  throw FailedToFetchColumn();
88  }
89 }
90 
91 namespace {
92 
93 template <typename T>
94 std::string toStringFlat(const HashJoin* hash_table,
95  const ExecutorDeviceType device_type,
96  const int device_id) {
97  auto mem =
98  reinterpret_cast<const T*>(hash_table->getJoinHashBuffer(device_type, device_id));
99  auto memsz = hash_table->getJoinHashBufferSize(device_type, device_id) / sizeof(T);
100  std::string txt;
101  for (size_t i = 0; i < memsz; ++i) {
102  if (i > 0) {
103  txt += ", ";
104  }
105  txt += std::to_string(mem[i]);
106  }
107  return txt;
108 }
109 
110 } // anonymous namespace
111 
112 std::string HashJoin::toStringFlat64(const ExecutorDeviceType device_type,
113  const int device_id) const {
114  return toStringFlat<int64_t>(this, device_type, device_id);
115 }
116 
117 std::string HashJoin::toStringFlat32(const ExecutorDeviceType device_type,
118  const int device_id) const {
119  return toStringFlat<int32_t>(this, device_type, device_id);
120 }
121 
122 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e) {
123  os << " {{";
124  bool first = true;
125  for (auto k : e.key) {
126  if (!first) {
127  os << ",";
128  } else {
129  first = false;
130  }
131  os << k;
132  }
133  os << "}, ";
134  os << "{";
135  first = true;
136  for (auto p : e.payload) {
137  if (!first) {
138  os << ", ";
139  } else {
140  first = false;
141  }
142  os << p;
143  }
144  os << "}}";
145  return os;
146 }
147 
148 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s) {
149  os << "{\n";
150  bool first = true;
151  for (auto e : s) {
152  if (!first) {
153  os << ",\n";
154  } else {
155  first = false;
156  }
157  os << e;
158  }
159  if (!s.empty()) {
160  os << "\n";
161  }
162  os << "}\n";
163  return os;
164 }
165 
167  const std::vector<llvm::Value*>& hash_join_idx_args_in,
168  const bool is_sharded,
169  const bool col_is_nullable,
170  const bool is_bw_eq,
171  const int64_t sub_buff_size,
172  Executor* executor,
173  bool is_bucketized) {
174  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
175  using namespace std::string_literals;
176 
177  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
178 
179  if (is_bw_eq) {
180  fname += "_bitwise";
181  }
182  if (is_sharded) {
183  fname += "_sharded";
184  }
185  if (!is_bw_eq && col_is_nullable) {
186  fname += "_nullable";
187  }
188 
189  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
190  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
191  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
192 
193  auto pos_ptr = hash_join_idx_args_in[0];
194  CHECK(pos_ptr);
195 
196  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
197  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
198  auto hash_join_idx_args = hash_join_idx_args_in;
199  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
200  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
201 
202  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
203  slot_valid_lv,
204  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
205  executor->cgen_state_->llInt(int64_t(0)));
206  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
207  executor->cgen_state_->ir_builder_.CreateAdd(
208  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
209  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
210  auto rowid_ptr_i32 =
211  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
212  return {rowid_ptr_i32, row_count_lv, slot_lv};
213 }
214 
215 llvm::Value* HashJoin::codegenHashTableLoad(const size_t table_idx, Executor* executor) {
216  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
217  llvm::Value* hash_ptr = nullptr;
218  const auto total_table_count =
219  executor->plan_state_->join_info_.join_hash_tables_.size();
220  CHECK_LT(table_idx, total_table_count);
221  if (total_table_count > 1) {
222  auto hash_tables_ptr =
223  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
224  auto hash_pptr =
225  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
226  hash_tables_ptr,
227  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
228  : hash_tables_ptr;
229  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
230  } else {
231  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
232  }
233  CHECK(hash_ptr);
234  return hash_ptr;
235 }
236 
238 std::shared_ptr<HashJoin> HashJoin::getInstance(
239  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
240  const std::vector<InputTableInfo>& query_infos,
241  const Data_Namespace::MemoryLevel memory_level,
242  const HashType preferred_hash_type,
243  const int device_count,
244  ColumnCacheMap& column_cache,
245  Executor* executor,
246  const QueryHint& query_hint) {
247  auto timer = DEBUG_TIMER(__func__);
248  std::shared_ptr<HashJoin> join_hash_table;
249  CHECK_GT(device_count, 0);
250  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
251  throw std::runtime_error(
252  "Overlaps hash join disabled, attempting to fall back to loop join");
253  }
254  if (qual_bin_oper->is_overlaps_oper()) {
255  VLOG(1) << "Trying to build geo hash table:";
256  join_hash_table = OverlapsJoinHashTable::getInstance(qual_bin_oper,
257  query_infos,
258  memory_level,
259  device_count,
260  column_cache,
261  executor,
262  query_hint);
263  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
264  qual_bin_oper->get_left_operand())) {
265  VLOG(1) << "Trying to build keyed hash table:";
266  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
267  query_infos,
268  memory_level,
269  preferred_hash_type,
270  device_count,
271  column_cache,
272  executor);
273  } else {
274  try {
275  VLOG(1) << "Trying to build perfect hash table:";
276  join_hash_table = PerfectJoinHashTable::getInstance(qual_bin_oper,
277  query_infos,
278  memory_level,
279  preferred_hash_type,
280  device_count,
281  column_cache,
282  executor);
283  } catch (TooManyHashEntries&) {
284  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
285  CHECK_EQ(join_quals.size(), size_t(1));
286  const auto join_qual =
287  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
288  VLOG(1) << "Trying to build keyed hash table after perfect hash table:";
289  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
290  query_infos,
291  memory_level,
292  preferred_hash_type,
293  device_count,
294  column_cache,
295  executor);
296  }
297  }
298  CHECK(join_hash_table);
299  if (VLOGGING(2)) {
300  if (join_hash_table->getMemoryLevel() == Data_Namespace::MemoryLevel::GPU_LEVEL) {
301  for (int device_id = 0; device_id < join_hash_table->getDeviceCount();
302  ++device_id) {
303  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::GPU, device_id) <=
304  1000) {
305  VLOG(2) << "Built GPU hash table: "
306  << join_hash_table->toString(ExecutorDeviceType::GPU, device_id);
307  }
308  }
309  } else {
310  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::CPU) <= 1000) {
311  VLOG(2) << "Built CPU hash table: "
312  << join_hash_table->toString(ExecutorDeviceType::CPU);
313  }
314  }
315  }
316  return join_hash_table;
317 }
318 
320  const std::vector<InnerOuter>& inner_outer_pairs,
321  const Executor* executor) {
322  CHECK(executor);
323  std::vector<const void*> sd_inner_proxy_per_key;
324  std::vector<const void*> sd_outer_proxy_per_key;
325  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
326  const auto db_id = executor->getCatalog()->getCurrentDB().dbId;
327  for (const auto& inner_outer_pair : inner_outer_pairs) {
328  const auto inner_col = inner_outer_pair.first;
329  const auto outer_col = inner_outer_pair.second;
330  const auto& inner_ti = inner_col->get_type_info();
331  const auto& outer_ti = outer_col->get_type_info();
332  ChunkKey cache_key_chunks_for_column{
333  db_id, inner_col->get_table_id(), inner_col->get_column_id()};
334  if (inner_ti.is_string() &&
335  !(inner_ti.get_comp_param() == outer_ti.get_comp_param())) {
336  CHECK(outer_ti.is_string());
337  CHECK(inner_ti.get_compression() == kENCODING_DICT &&
338  outer_ti.get_compression() == kENCODING_DICT);
339  const auto sd_inner_proxy = executor->getStringDictionaryProxy(
340  inner_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
341  const auto sd_outer_proxy = executor->getStringDictionaryProxy(
342  outer_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
343  CHECK(sd_inner_proxy && sd_outer_proxy);
344  sd_inner_proxy_per_key.push_back(sd_inner_proxy);
345  sd_outer_proxy_per_key.push_back(sd_outer_proxy);
346  cache_key_chunks_for_column.push_back(sd_outer_proxy->getGeneration());
347  } else {
348  sd_inner_proxy_per_key.emplace_back();
349  sd_outer_proxy_per_key.emplace_back();
350  }
351  cache_key_chunks.push_back(cache_key_chunks_for_column);
352  }
353  return {sd_inner_proxy_per_key, sd_outer_proxy_per_key, cache_key_chunks};
354 }
355 
356 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
357  std::string_view column,
358  int rte_idx,
359  Executor* executor) {
360  auto catalog = executor->getCatalog();
361  CHECK(catalog);
362 
363  auto tmeta = catalog->getMetadataForTable(std::string(table));
364  CHECK(tmeta);
365 
366  auto cmeta = catalog->getMetadataForColumn(tmeta->tableId, std::string(column));
367  CHECK(cmeta);
368 
369  auto ti = cmeta->columnType;
370 
371  if (ti.is_geometry() && ti.get_type() != kPOINT) {
372  int geoColumnId{0};
373  switch (ti.get_type()) {
374  case kLINESTRING: {
375  geoColumnId = cmeta->columnId + 2;
376  break;
377  }
378  case kPOLYGON: {
379  geoColumnId = cmeta->columnId + 3;
380  break;
381  }
382  case kMULTIPOLYGON: {
383  geoColumnId = cmeta->columnId + 4;
384  break;
385  }
386  default:
387  CHECK(false);
388  }
389  cmeta = catalog->getMetadataForColumn(tmeta->tableId, geoColumnId);
390  CHECK(cmeta);
391  ti = cmeta->columnType;
392  }
393 
394  auto cv =
395  std::make_shared<Analyzer::ColumnVar>(ti, tmeta->tableId, cmeta->columnId, rte_idx);
396  return cv;
397 }
398 
400  : public ScalarExprVisitor<std::set<const Analyzer::ColumnVar*>> {
401  protected:
402  std::set<const Analyzer::ColumnVar*> visitColumnVar(
403  const Analyzer::ColumnVar* column) const override {
404  return {column};
405  }
406 
407  std::set<const Analyzer::ColumnVar*> visitColumnVarTuple(
408  const Analyzer::ExpressionTuple* expr_tuple) const override {
409  AllColumnVarsVisitor visitor;
410  std::set<const Analyzer::ColumnVar*> result;
411  for (const auto& expr_component : expr_tuple->getTuple()) {
412  const auto component_rte_set = visitor.visit(expr_component.get());
413  result.insert(component_rte_set.begin(), component_rte_set.end());
414  }
415  return result;
416  }
417 
418  std::set<const Analyzer::ColumnVar*> aggregateResult(
419  const std::set<const Analyzer::ColumnVar*>& aggregate,
420  const std::set<const Analyzer::ColumnVar*>& next_result) const override {
421  auto result = aggregate;
422  result.insert(next_result.begin(), next_result.end());
423  return result;
424  }
425 };
426 
427 void setupSyntheticCaching(std::set<const Analyzer::ColumnVar*> cvs, Executor* executor) {
428  std::unordered_set<int> phys_table_ids;
429  for (auto cv : cvs) {
430  phys_table_ids.insert(cv->get_table_id());
431  }
432 
433  std::unordered_set<PhysicalInput> phys_inputs;
434  for (auto cv : cvs) {
435  phys_inputs.emplace(PhysicalInput{cv->get_column_id(), cv->get_table_id()});
436  }
437 
438  executor->setupCaching(phys_inputs, phys_table_ids);
439 }
440 
441 std::vector<InputTableInfo> getSyntheticInputTableInfo(
442  std::set<const Analyzer::ColumnVar*> cvs,
443  Executor* executor) {
444  auto catalog = executor->getCatalog();
445  CHECK(catalog);
446 
447  std::unordered_set<int> phys_table_ids;
448  for (auto cv : cvs) {
449  phys_table_ids.insert(cv->get_table_id());
450  }
451 
452  // NOTE(sy): This vector ordering seems to work for now, but maybe we need to
453  // review how rte_idx is assigned for ColumnVars. See for example Analyzer.h
454  // and RelAlgExecutor.cpp and rte_idx there.
455  std::vector<InputTableInfo> query_infos(phys_table_ids.size());
456  size_t i = 0;
457  for (auto id : phys_table_ids) {
458  auto tmeta = catalog->getMetadataForTable(id);
459  query_infos[i].table_id = id;
460  query_infos[i].info = tmeta->fragmenter->getFragmentsForQuery();
461  ++i;
462  }
463 
464  return query_infos;
465 }
466 
468 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
469  std::string_view table1,
470  std::string_view column1,
471  std::string_view table2,
472  std::string_view column2,
473  const Data_Namespace::MemoryLevel memory_level,
474  const HashType preferred_hash_type,
475  const int device_count,
476  ColumnCacheMap& column_cache,
477  Executor* executor) {
478  auto a1 = getSyntheticColumnVar(table1, column1, 0, executor);
479  auto a2 = getSyntheticColumnVar(table2, column2, 1, executor);
480 
481  auto qual_bin_oper = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kEQ, kONE, a1, a2);
482 
483  std::set<const Analyzer::ColumnVar*> cvs =
484  AllColumnVarsVisitor().visit(qual_bin_oper.get());
485  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
486  setupSyntheticCaching(cvs, executor);
487  QueryHint query_hint = QueryHint::defaults();
488 
489  auto hash_table = HashJoin::getInstance(qual_bin_oper,
490  query_infos,
491  memory_level,
492  preferred_hash_type,
493  device_count,
494  column_cache,
495  executor,
496  query_hint);
497  return hash_table;
498 }
499 
501 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
502  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
503  const Data_Namespace::MemoryLevel memory_level,
504  const HashType preferred_hash_type,
505  const int device_count,
506  ColumnCacheMap& column_cache,
507  Executor* executor) {
508  std::set<const Analyzer::ColumnVar*> cvs =
509  AllColumnVarsVisitor().visit(qual_bin_oper.get());
510  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
511  setupSyntheticCaching(cvs, executor);
512  QueryHint query_hint = QueryHint::defaults();
513 
514  auto hash_table = HashJoin::getInstance(qual_bin_oper,
515  query_infos,
516  memory_level,
517  preferred_hash_type,
518  device_count,
519  column_cache,
520  executor,
521  query_hint);
522  return hash_table;
523 }
524 
526  const size_t shard_count,
527  const Executor* executor) {
528  if (!g_cluster) {
529  return;
530  }
531  if (table_id >= 0) {
532  CHECK(executor);
533  const auto inner_td = executor->getCatalog()->getMetadataForTable(table_id);
534  CHECK(inner_td);
535  if (!shard_count && !table_is_replicated(inner_td)) {
536  throw TableMustBeReplicated(inner_td->tableName);
537  }
538  }
539 }
540 
541 namespace {
542 
543 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
545  const TemporaryTables* temporary_tables) {
546  const auto lhs = qual_bin_oper->get_left_operand();
547  const auto rhs = qual_bin_oper->get_right_operand();
548  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
549 }
550 
551 } // namespace
552 
553 size_t get_shard_count(const Analyzer::BinOper* join_condition,
554  const Executor* executor) {
555  const Analyzer::ColumnVar* inner_col{nullptr};
556  const Analyzer::Expr* outer_col{nullptr};
557  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
558  try {
559  std::tie(inner_col, outer_col) =
560  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
561  } catch (...) {
562  return 0;
563  }
564  if (!inner_col || !outer_col) {
565  return 0;
566  }
567  return get_shard_count({inner_col, outer_col}, executor);
568 }
569 
571  const Analyzer::Expr* rhs,
573  const TemporaryTables* temporary_tables,
574  const bool is_overlaps_join) {
575  const auto& lhs_ti = lhs->get_type_info();
576  const auto& rhs_ti = rhs->get_type_info();
577  if (!is_overlaps_join) {
578  if (lhs_ti.get_type() != rhs_ti.get_type()) {
579  throw HashJoinFail("Equijoin types must be identical, found: " +
580  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
581  }
582  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
583  !lhs_ti.is_decimal()) {
584  throw HashJoinFail("Cannot apply hash join to inner column type " +
585  lhs_ti.get_type_name());
586  }
587  // Decimal types should be identical.
588  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
589  lhs_ti.get_precision() != rhs_ti.get_precision())) {
590  throw HashJoinFail("Equijoin with different decimal types");
591  }
592  }
593 
594  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
595  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
596  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
597  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
598  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
599  throw HashJoinFail("Cannot use hash join for given expression");
600  }
601  // Casts to decimal are not suported.
602  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
603  throw HashJoinFail("Cannot use hash join for given expression");
604  }
605  const auto lhs_col =
606  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
607  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
608  const auto rhs_col =
609  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
610  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
611  if (!lhs_col && !rhs_col) {
612  throw HashJoinFail("Cannot use hash join for given expression");
613  }
614  const Analyzer::ColumnVar* inner_col{nullptr};
615  const Analyzer::ColumnVar* outer_col{nullptr};
616  auto outer_ti = lhs_ti;
617  auto inner_ti = rhs_ti;
618  const Analyzer::Expr* outer_expr{lhs};
619  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
620  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
621  inner_col = rhs_col;
622  outer_col = lhs_col;
623  } else {
624  if (lhs_col && lhs_col->get_rte_idx() == 0) {
625  throw HashJoinFail("Cannot use hash join for given expression");
626  }
627  inner_col = lhs_col;
628  outer_col = rhs_col;
629  std::swap(outer_ti, inner_ti);
630  outer_expr = rhs;
631  }
632  if (!inner_col) {
633  throw HashJoinFail("Cannot use hash join for given expression");
634  }
635  if (!outer_col) {
636  MaxRangeTableIndexVisitor rte_idx_visitor;
637  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
638  // The inner column candidate is not actually inner; the outer
639  // expression contains columns which are at least as deep.
640  if (inner_col->get_rte_idx() <= outer_rte_idx) {
641  throw HashJoinFail("Cannot use hash join for given expression");
642  }
643  }
644  // We need to fetch the actual type information from the catalog since Analyzer
645  // always reports nullable as true for inner table columns in left joins.
646  const auto inner_col_cd = get_column_descriptor_maybe(
647  inner_col->get_column_id(), inner_col->get_table_id(), cat);
648  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
649  inner_col->get_table_id(),
650  inner_col_cd,
651  temporary_tables);
652  const auto& outer_col_ti =
653  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
654  ? outer_col->get_type_info()
655  : outer_ti;
656  // Casts from decimal are not supported.
657  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
658  (lhs_cast || rhs_cast)) {
659  throw HashJoinFail("Cannot use hash join for given expression");
660  }
661  if (is_overlaps_join) {
662  if (!inner_col_real_ti.is_array()) {
663  throw HashJoinFail(
664  "Overlaps join only supported for inner columns with array type");
665  }
666  auto is_bounds_array = [](const auto ti) {
667  return ti.is_fixlen_array() && ti.get_size() == 32;
668  };
669  if (!is_bounds_array(inner_col_real_ti)) {
670  throw HashJoinFail(
671  "Overlaps join only supported for 4-element double fixed length arrays");
672  }
673  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti) ||
674  is_constructed_point(outer_expr))) {
675  throw HashJoinFail(
676  "Overlaps join only supported for geometry outer columns of type point, "
677  "geometry columns with bounds or constructed points");
678  }
679  } else {
680  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
681  inner_col_real_ti.is_decimal() ||
682  (inner_col_real_ti.is_string() &&
683  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
684  throw HashJoinFail(
685  "Can only apply hash join to integer-like types and dictionary encoded "
686  "strings");
687  }
688  }
689 
690  auto normalized_inner_col = inner_col;
691  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
692 
693  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
694  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
695 
696  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
697  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
698  normalized_inner_ti.get_type_name() + " and " +
699  normalized_outer_ti.get_type_name()));
700  }
701 
702  return {normalized_inner_col, normalized_outer_col};
703 }
704 
705 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
707  const TemporaryTables* temporary_tables) {
708  std::vector<InnerOuter> result;
709  const auto lhs_tuple_expr =
710  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
711  const auto rhs_tuple_expr =
712  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
713 
714  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
715  if (lhs_tuple_expr) {
716  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
717  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
718  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
719  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
720  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
721  rhs_tuple[i].get(),
722  cat,
723  temporary_tables,
724  condition->is_overlaps_oper()));
725  }
726  } else {
727  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
728  result.push_back(normalize_column_pair(condition->get_left_operand(),
729  condition->get_right_operand(),
730  cat,
731  temporary_tables,
732  condition->is_overlaps_oper()));
733  }
734 
735  return result;
736 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< InputTableInfo > getSyntheticInputTableInfo(std::set< const Analyzer::ColumnVar * > cvs, Executor *executor)
Definition: HashJoin.cpp:441
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:233
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:76
std::string toStringFlat(const HashJoin *hash_table, const ExecutorDeviceType device_type, const int device_id)
Definition: HashJoin.cpp:94
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
ExecutorDeviceType
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:112
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:31
std::set< const Analyzer::ColumnVar * > aggregateResult(const std::set< const Analyzer::ColumnVar * > &aggregate, const std::set< const Analyzer::ColumnVar * > &next_result) const override
Definition: HashJoin.cpp:418
const Expr * get_right_operand() const
Definition: Analyzer.h:443
bool is_constructed_point(const Analyzer::Expr *expr)
Definition: Execute.h:1176
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:238
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Definition: HashTable.h:21
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
virtual int8_t * alloc(const size_t num_bytes)=0
T visit(const Analyzer::Expr *expr) const
#define CHECK_GT(x, y)
Definition: Logger.h:209
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:117
std::string to_string(char const *&&v)
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:96
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:80
size_t col_chunks_buff_sz
const std::vector< std::shared_ptr< Analyzer::Expr > > & getTuple() const
Definition: Analyzer.h:244
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:168
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:222
static QueryHint defaults()
Definition: QueryHint.h:76
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
Definition: HashJoin.cpp:570
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:705
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const int8_t * col_chunks_buff
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:525
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
void setupSyntheticCaching(std::set< const Analyzer::ColumnVar * > cvs, Executor *executor)
Definition: HashJoin.cpp:427
#define VLOGGING(n)
Definition: Logger.h:195
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:319
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqldefs.h:69
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:219
bool table_is_replicated(const TableDescriptor *td)
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
std::set< const Analyzer::ColumnVar * > visitColumnVarTuple(const Analyzer::ExpressionTuple *expr_tuple) const override
Definition: HashJoin.cpp:407
std::set< const Analyzer::ColumnVar * > visitColumnVar(const Analyzer::ColumnVar *column) const override
Definition: HashJoin.cpp:402
#define CHECK(condition)
Definition: Logger.h:197
std::set< int32_t > payload
Definition: HashTable.h:23
#define DEBUG_TIMER(name)
Definition: Logger.h:313
bool g_cluster
const Expr * get_left_operand() const
Definition: Analyzer.h:442
bool is_overlaps_oper() const
Definition: Analyzer.h:440
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:468
std::vector< int64_t > key
Definition: HashTable.h:22
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:82
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:553
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:356
HashType
Definition: HashTable.h:19
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:543
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:79
#define VLOG(n)
Definition: Logger.h:291