OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
28 
29 extern bool g_enable_overlaps_hashjoin;
30 
32  const std::vector<double>& inverse_bucket_sizes_for_dimension,
33  const std::vector<InnerOuter> inner_outer_pairs) {
34  join_buckets.clear();
35 
36  CHECK_EQ(inner_outer_pairs.size(), join_columns.size());
37  CHECK_EQ(join_columns.size(), join_column_types.size());
38  for (size_t i = 0; i < join_columns.size(); i++) {
39  const auto& inner_outer_pair = inner_outer_pairs[i];
40  const auto inner_col = inner_outer_pair.first;
41  const auto& ti = inner_col->get_type_info();
42  const auto elem_ti = ti.get_elem_type();
43  // CHECK(elem_ti.is_fp());
44 
45  join_buckets.emplace_back(JoinBucketInfo{inverse_bucket_sizes_for_dimension,
46  elem_ti.get_type() == kDOUBLE});
47  }
48 }
49 
55  const Analyzer::ColumnVar* hash_col,
56  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
57  const Data_Namespace::MemoryLevel effective_memory_level,
58  const int device_id,
59  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
60  DeviceAllocator* dev_buff_owner,
61  std::vector<std::shared_ptr<void>>& malloc_owner,
62  Executor* executor,
63  ColumnCacheMap* column_cache) {
64  static std::mutex fragment_fetch_mutex;
65  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
66  try {
67  JoinColumn join_column = ColumnFetcher::makeJoinColumn(executor,
68  *hash_col,
69  fragment_info,
70  effective_memory_level,
71  device_id,
72  dev_buff_owner,
73  /*thread_idx=*/0,
74  chunks_owner,
75  malloc_owner,
76  *column_cache);
77  if (effective_memory_level == Data_Namespace::GPU_LEVEL) {
78  CHECK(dev_buff_owner);
79  auto device_col_chunks_buff = dev_buff_owner->alloc(join_column.col_chunks_buff_sz);
80  dev_buff_owner->copyToDevice(device_col_chunks_buff,
81  join_column.col_chunks_buff,
82  join_column.col_chunks_buff_sz);
83  join_column.col_chunks_buff = device_col_chunks_buff;
84  }
85  return join_column;
86  } catch (...) {
87  throw FailedToFetchColumn();
88  }
89 }
90 
91 namespace {
92 
93 template <typename T>
94 std::string toStringFlat(const HashJoin* hash_table,
95  const ExecutorDeviceType device_type,
96  const int device_id) {
97  auto mem =
98  reinterpret_cast<const T*>(hash_table->getJoinHashBuffer(device_type, device_id));
99  auto memsz = hash_table->getJoinHashBufferSize(device_type, device_id) / sizeof(T);
100  std::string txt;
101  for (size_t i = 0; i < memsz; ++i) {
102  if (i > 0) {
103  txt += ", ";
104  }
105  txt += std::to_string(mem[i]);
106  }
107  return txt;
108 }
109 
110 } // anonymous namespace
111 
112 std::string HashJoin::toStringFlat64(const ExecutorDeviceType device_type,
113  const int device_id) const {
114  return toStringFlat<int64_t>(this, device_type, device_id);
115 }
116 
117 std::string HashJoin::toStringFlat32(const ExecutorDeviceType device_type,
118  const int device_id) const {
119  return toStringFlat<int32_t>(this, device_type, device_id);
120 }
121 
122 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e) {
123  os << " {{";
124  bool first = true;
125  for (auto k : e.key) {
126  if (!first) {
127  os << ",";
128  } else {
129  first = false;
130  }
131  os << k;
132  }
133  os << "}, ";
134  os << "{";
135  first = true;
136  for (auto p : e.payload) {
137  if (!first) {
138  os << ", ";
139  } else {
140  first = false;
141  }
142  os << p;
143  }
144  os << "}}";
145  return os;
146 }
147 
148 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s) {
149  os << "{\n";
150  bool first = true;
151  for (auto e : s) {
152  if (!first) {
153  os << ",\n";
154  } else {
155  first = false;
156  }
157  os << e;
158  }
159  if (!s.empty()) {
160  os << "\n";
161  }
162  os << "}\n";
163  return os;
164 }
165 
167  const std::vector<llvm::Value*>& hash_join_idx_args_in,
168  const bool is_sharded,
169  const bool col_is_nullable,
170  const bool is_bw_eq,
171  const int64_t sub_buff_size,
172  Executor* executor,
173  bool is_bucketized) {
174  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
175  using namespace std::string_literals;
176 
177  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
178 
179  if (is_bw_eq) {
180  fname += "_bitwise";
181  }
182  if (is_sharded) {
183  fname += "_sharded";
184  }
185  if (!is_bw_eq && col_is_nullable) {
186  fname += "_nullable";
187  }
188 
189  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
190  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
191  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
192 
193  auto pos_ptr = hash_join_idx_args_in[0];
194  CHECK(pos_ptr);
195 
196  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
197  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
198  auto hash_join_idx_args = hash_join_idx_args_in;
199  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
200  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
201 
202  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
203  slot_valid_lv,
204  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
205  executor->cgen_state_->llInt(int64_t(0)));
206  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
207  executor->cgen_state_->ir_builder_.CreateAdd(
208  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
209  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
210  auto rowid_ptr_i32 =
211  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
212  return {rowid_ptr_i32, row_count_lv, slot_lv};
213 }
214 
215 llvm::Value* HashJoin::codegenHashTableLoad(const size_t table_idx, Executor* executor) {
216  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
217  llvm::Value* hash_ptr = nullptr;
218  const auto total_table_count =
219  executor->plan_state_->join_info_.join_hash_tables_.size();
220  CHECK_LT(table_idx, total_table_count);
221  if (total_table_count > 1) {
222  auto hash_tables_ptr =
223  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
224  auto hash_pptr =
225  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
226  hash_tables_ptr,
227  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
228  : hash_tables_ptr;
229  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
230  } else {
231  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
232  }
233  CHECK(hash_ptr);
234  return hash_ptr;
235 }
236 
238 std::shared_ptr<HashJoin> HashJoin::getInstance(
239  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
240  const std::vector<InputTableInfo>& query_infos,
241  const Data_Namespace::MemoryLevel memory_level,
242  const JoinType join_type,
243  const HashType preferred_hash_type,
244  const int device_count,
245  ColumnCacheMap& column_cache,
246  Executor* executor,
247  const HashTableBuildDagMap& hashtable_build_dag_map,
248  const RegisteredQueryHint& query_hint,
249  const TableIdToNodeMap& table_id_to_node_map) {
250  auto timer = DEBUG_TIMER(__func__);
251  std::shared_ptr<HashJoin> join_hash_table;
252  CHECK_GT(device_count, 0);
253  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
254  throw std::runtime_error(
255  "Overlaps hash join disabled, attempting to fall back to loop join");
256  }
257  if (qual_bin_oper->is_overlaps_oper()) {
258  VLOG(1) << "Trying to build geo hash table:";
259  join_hash_table = OverlapsJoinHashTable::getInstance(qual_bin_oper,
260  query_infos,
261  memory_level,
262  join_type,
263  device_count,
264  column_cache,
265  executor,
266  hashtable_build_dag_map,
267  query_hint,
268  table_id_to_node_map);
269  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
270  qual_bin_oper->get_left_operand())) {
271  VLOG(1) << "Trying to build keyed hash table:";
272  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
273  query_infos,
274  memory_level,
275  join_type,
276  preferred_hash_type,
277  device_count,
278  column_cache,
279  executor,
280  hashtable_build_dag_map,
281  table_id_to_node_map);
282  } else {
283  try {
284  VLOG(1) << "Trying to build perfect hash table:";
285  join_hash_table = PerfectJoinHashTable::getInstance(qual_bin_oper,
286  query_infos,
287  memory_level,
288  join_type,
289  preferred_hash_type,
290  device_count,
291  column_cache,
292  executor,
293  hashtable_build_dag_map,
294  table_id_to_node_map);
295  } catch (TooManyHashEntries&) {
296  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
297  CHECK_EQ(join_quals.size(), size_t(1));
298  const auto join_qual =
299  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
300  VLOG(1) << "Trying to build keyed hash table after perfect hash table:";
301  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
302  query_infos,
303  memory_level,
304  join_type,
305  preferred_hash_type,
306  device_count,
307  column_cache,
308  executor,
309  hashtable_build_dag_map,
310  table_id_to_node_map);
311  }
312  }
313  CHECK(join_hash_table);
314  if (VLOGGING(2)) {
315  if (join_hash_table->getMemoryLevel() == Data_Namespace::MemoryLevel::GPU_LEVEL) {
316  for (int device_id = 0; device_id < join_hash_table->getDeviceCount();
317  ++device_id) {
318  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::GPU, device_id) <=
319  1000) {
320  VLOG(2) << "Built GPU hash table: "
321  << join_hash_table->toString(ExecutorDeviceType::GPU, device_id);
322  }
323  }
324  } else {
325  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::CPU) <= 1000) {
326  VLOG(2) << "Built CPU hash table: "
327  << join_hash_table->toString(ExecutorDeviceType::CPU);
328  }
329  }
330  }
331  return join_hash_table;
332 }
333 
335  const std::vector<InnerOuter>& inner_outer_pairs,
336  const Executor* executor) {
337  CHECK(executor);
338  std::vector<const void*> sd_inner_proxy_per_key;
339  std::vector<const void*> sd_outer_proxy_per_key;
340  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
341  const auto db_id = executor->getCatalog()->getCurrentDB().dbId;
342  for (const auto& inner_outer_pair : inner_outer_pairs) {
343  const auto inner_col = inner_outer_pair.first;
344  const auto outer_col = inner_outer_pair.second;
345  const auto& inner_ti = inner_col->get_type_info();
346  const auto& outer_ti = outer_col->get_type_info();
347  ChunkKey cache_key_chunks_for_column{
348  db_id, inner_col->get_table_id(), inner_col->get_column_id()};
349  if (inner_ti.is_string() &&
350  !(inner_ti.get_comp_param() == outer_ti.get_comp_param())) {
351  CHECK(outer_ti.is_string());
352  CHECK(inner_ti.get_compression() == kENCODING_DICT &&
353  outer_ti.get_compression() == kENCODING_DICT);
354  const auto sd_inner_proxy = executor->getStringDictionaryProxy(
355  inner_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
356  const auto sd_outer_proxy = executor->getStringDictionaryProxy(
357  outer_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
358  CHECK(sd_inner_proxy && sd_outer_proxy);
359  sd_inner_proxy_per_key.push_back(sd_inner_proxy);
360  sd_outer_proxy_per_key.push_back(sd_outer_proxy);
361  cache_key_chunks_for_column.push_back(sd_outer_proxy->getGeneration());
362  } else {
363  sd_inner_proxy_per_key.emplace_back();
364  sd_outer_proxy_per_key.emplace_back();
365  }
366  cache_key_chunks.push_back(cache_key_chunks_for_column);
367  }
368  return {sd_inner_proxy_per_key, sd_outer_proxy_per_key, cache_key_chunks};
369 }
370 
371 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
372  std::string_view column,
373  int rte_idx,
374  Executor* executor) {
375  auto catalog = executor->getCatalog();
376  CHECK(catalog);
377 
378  auto tmeta = catalog->getMetadataForTable(std::string(table));
379  CHECK(tmeta);
380 
381  auto cmeta = catalog->getMetadataForColumn(tmeta->tableId, std::string(column));
382  CHECK(cmeta);
383 
384  auto ti = cmeta->columnType;
385 
386  if (ti.is_geometry() && ti.get_type() != kPOINT) {
387  int geoColumnId{0};
388  switch (ti.get_type()) {
389  case kLINESTRING: {
390  geoColumnId = cmeta->columnId + 2;
391  break;
392  }
393  case kPOLYGON: {
394  geoColumnId = cmeta->columnId + 3;
395  break;
396  }
397  case kMULTIPOLYGON: {
398  geoColumnId = cmeta->columnId + 4;
399  break;
400  }
401  default:
402  CHECK(false);
403  }
404  cmeta = catalog->getMetadataForColumn(tmeta->tableId, geoColumnId);
405  CHECK(cmeta);
406  ti = cmeta->columnType;
407  }
408 
409  auto cv =
410  std::make_shared<Analyzer::ColumnVar>(ti, tmeta->tableId, cmeta->columnId, rte_idx);
411  return cv;
412 }
413 
415  : public ScalarExprVisitor<std::set<const Analyzer::ColumnVar*>> {
416  protected:
417  std::set<const Analyzer::ColumnVar*> visitColumnVar(
418  const Analyzer::ColumnVar* column) const override {
419  return {column};
420  }
421 
422  std::set<const Analyzer::ColumnVar*> visitColumnVarTuple(
423  const Analyzer::ExpressionTuple* expr_tuple) const override {
424  AllColumnVarsVisitor visitor;
425  std::set<const Analyzer::ColumnVar*> result;
426  for (const auto& expr_component : expr_tuple->getTuple()) {
427  const auto component_rte_set = visitor.visit(expr_component.get());
428  result.insert(component_rte_set.begin(), component_rte_set.end());
429  }
430  return result;
431  }
432 
433  std::set<const Analyzer::ColumnVar*> aggregateResult(
434  const std::set<const Analyzer::ColumnVar*>& aggregate,
435  const std::set<const Analyzer::ColumnVar*>& next_result) const override {
436  auto result = aggregate;
437  result.insert(next_result.begin(), next_result.end());
438  return result;
439  }
440 };
441 
442 void setupSyntheticCaching(std::set<const Analyzer::ColumnVar*> cvs, Executor* executor) {
443  std::unordered_set<int> phys_table_ids;
444  for (auto cv : cvs) {
445  phys_table_ids.insert(cv->get_table_id());
446  }
447 
448  std::unordered_set<PhysicalInput> phys_inputs;
449  for (auto cv : cvs) {
450  phys_inputs.emplace(PhysicalInput{cv->get_column_id(), cv->get_table_id()});
451  }
452 
453  executor->setupCaching(phys_inputs, phys_table_ids);
454 }
455 
456 std::vector<InputTableInfo> getSyntheticInputTableInfo(
457  std::set<const Analyzer::ColumnVar*> cvs,
458  Executor* executor) {
459  auto catalog = executor->getCatalog();
460  CHECK(catalog);
461 
462  std::unordered_set<int> phys_table_ids;
463  for (auto cv : cvs) {
464  phys_table_ids.insert(cv->get_table_id());
465  }
466 
467  // NOTE(sy): This vector ordering seems to work for now, but maybe we need to
468  // review how rte_idx is assigned for ColumnVars. See for example Analyzer.h
469  // and RelAlgExecutor.cpp and rte_idx there.
470  std::vector<InputTableInfo> query_infos(phys_table_ids.size());
471  size_t i = 0;
472  for (auto id : phys_table_ids) {
473  auto tmeta = catalog->getMetadataForTable(id);
474  query_infos[i].table_id = id;
475  query_infos[i].info = tmeta->fragmenter->getFragmentsForQuery();
476  ++i;
477  }
478 
479  return query_infos;
480 }
481 
483 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
484  std::string_view table1,
485  std::string_view column1,
486  std::string_view table2,
487  std::string_view column2,
488  const Data_Namespace::MemoryLevel memory_level,
489  const HashType preferred_hash_type,
490  const int device_count,
491  ColumnCacheMap& column_cache,
492  Executor* executor) {
493  auto a1 = getSyntheticColumnVar(table1, column1, 0, executor);
494  auto a2 = getSyntheticColumnVar(table2, column2, 1, executor);
495 
496  auto qual_bin_oper = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kEQ, kONE, a1, a2);
497 
498  std::set<const Analyzer::ColumnVar*> cvs =
499  AllColumnVarsVisitor().visit(qual_bin_oper.get());
500  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
501  setupSyntheticCaching(cvs, executor);
503 
504  auto hash_table = HashJoin::getInstance(qual_bin_oper,
505  query_infos,
506  memory_level,
508  preferred_hash_type,
509  device_count,
510  column_cache,
511  executor,
512  {},
513  query_hint,
514  {});
515  return hash_table;
516 }
517 
519 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
520  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
521  const Data_Namespace::MemoryLevel memory_level,
522  const HashType preferred_hash_type,
523  const int device_count,
524  ColumnCacheMap& column_cache,
525  Executor* executor) {
526  std::set<const Analyzer::ColumnVar*> cvs =
527  AllColumnVarsVisitor().visit(qual_bin_oper.get());
528  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
529  setupSyntheticCaching(cvs, executor);
531 
532  auto hash_table = HashJoin::getInstance(qual_bin_oper,
533  query_infos,
534  memory_level,
536  preferred_hash_type,
537  device_count,
538  column_cache,
539  executor,
540  {},
541  query_hint,
542  {});
543  return hash_table;
544 }
545 
546 std::pair<std::string, std::shared_ptr<HashJoin>> HashJoin::getSyntheticInstance(
547  std::vector<std::shared_ptr<Analyzer::BinOper>> qual_bin_opers,
548  const Data_Namespace::MemoryLevel memory_level,
549  const HashType preferred_hash_type,
550  const int device_count,
551  ColumnCacheMap& column_cache,
552  Executor* executor) {
553  std::set<const Analyzer::ColumnVar*> cvs;
554  for (auto& qual : qual_bin_opers) {
555  auto cv = AllColumnVarsVisitor().visit(qual.get());
556  cvs.insert(cv.begin(), cv.end());
557  }
558  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
559  setupSyntheticCaching(cvs, executor);
561  std::shared_ptr<HashJoin> hash_table;
562  std::string error_msg;
563  for (auto& qual : qual_bin_opers) {
564  try {
565  auto candidate_hash_table = HashJoin::getInstance(qual,
566  query_infos,
567  memory_level,
569  preferred_hash_type,
570  device_count,
571  column_cache,
572  executor,
573  {},
574  query_hint,
575  {});
576  if (candidate_hash_table) {
577  hash_table = candidate_hash_table;
578  }
579  } catch (HashJoinFail& e) {
580  error_msg = e.what();
581  continue;
582  }
583  }
584  return std::make_pair(error_msg, hash_table);
585 }
586 
588  const size_t shard_count,
589  const Executor* executor) {
590  if (!g_cluster) {
591  return;
592  }
593  if (table_id >= 0) {
594  CHECK(executor);
595  const auto inner_td = executor->getCatalog()->getMetadataForTable(table_id);
596  CHECK(inner_td);
597  if (!shard_count && !table_is_replicated(inner_td)) {
598  throw TableMustBeReplicated(inner_td->tableName);
599  }
600  }
601 }
602 
604  const Analyzer::Expr* rhs,
606  const TemporaryTables* temporary_tables,
607  const bool is_overlaps_join) {
608  const auto& lhs_ti = lhs->get_type_info();
609  const auto& rhs_ti = rhs->get_type_info();
610  if (!is_overlaps_join) {
611  if (lhs_ti.get_type() != rhs_ti.get_type()) {
612  throw HashJoinFail("Equijoin types must be identical, found: " +
613  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
614  }
615  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
616  !lhs_ti.is_decimal()) {
617  throw HashJoinFail("Cannot apply hash join to inner column type " +
618  lhs_ti.get_type_name());
619  }
620  // Decimal types should be identical.
621  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
622  lhs_ti.get_precision() != rhs_ti.get_precision())) {
623  throw HashJoinFail("Equijoin with different decimal types");
624  }
625  }
626 
627  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
628  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
629  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
630  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
631  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
632  throw HashJoinFail("Cannot use hash join for given expression");
633  }
634  // Casts to decimal are not suported.
635  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
636  throw HashJoinFail("Cannot use hash join for given expression");
637  }
638  const auto lhs_col =
639  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
640  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
641  const auto rhs_col =
642  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
643  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
644  if (!lhs_col && !rhs_col) {
645  throw HashJoinFail("Cannot use hash join for given expression");
646  }
647  const Analyzer::ColumnVar* inner_col{nullptr};
648  const Analyzer::ColumnVar* outer_col{nullptr};
649  auto outer_ti = lhs_ti;
650  auto inner_ti = rhs_ti;
651  const Analyzer::Expr* outer_expr{lhs};
652  if (!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) {
653  inner_col = rhs_col;
654  outer_col = lhs_col;
655  } else {
656  if (lhs_col && lhs_col->get_rte_idx() == 0) {
657  throw HashJoinFail("Cannot use hash join for given expression");
658  }
659  inner_col = lhs_col;
660  outer_col = rhs_col;
661  std::swap(outer_ti, inner_ti);
662  outer_expr = rhs;
663  }
664  if (!inner_col) {
665  throw HashJoinFail("Cannot use hash join for given expression");
666  }
667  if (!outer_col) {
668  // check whether outer_col is a constant, i.e., inner_col = K;
669  const auto outer_constant_col = dynamic_cast<const Analyzer::Constant*>(outer_expr);
670  if (outer_constant_col) {
671  throw HashJoinFail(
672  "Cannot use hash join for given expression: try to join with a constant "
673  "value");
674  }
675  MaxRangeTableIndexVisitor rte_idx_visitor;
676  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
677  // The inner column candidate is not actually inner; the outer
678  // expression contains columns which are at least as deep.
679  if (inner_col->get_rte_idx() <= outer_rte_idx) {
680  throw HashJoinFail("Cannot use hash join for given expression");
681  }
682  }
683  // We need to fetch the actual type information from the catalog since Analyzer
684  // always reports nullable as true for inner table columns in left joins.
685  const auto inner_col_cd = get_column_descriptor_maybe(
686  inner_col->get_column_id(), inner_col->get_table_id(), cat);
687  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
688  inner_col->get_table_id(),
689  inner_col_cd,
690  temporary_tables);
691  const auto& outer_col_ti =
692  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
693  ? outer_col->get_type_info()
694  : outer_ti;
695  // Casts from decimal are not supported.
696  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
697  (lhs_cast || rhs_cast)) {
698  throw HashJoinFail("Cannot use hash join for given expression");
699  }
700  if (is_overlaps_join) {
701  if (!inner_col_real_ti.is_array()) {
702  throw HashJoinFail(
703  "Overlaps join only supported for inner columns with array type");
704  }
705  auto is_bounds_array = [](const auto ti) {
706  return ti.is_fixlen_array() && ti.get_size() == 32;
707  };
708  if (!is_bounds_array(inner_col_real_ti)) {
709  throw HashJoinFail(
710  "Overlaps join only supported for 4-element double fixed length arrays");
711  }
712  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti) ||
713  is_constructed_point(outer_expr))) {
714  throw HashJoinFail(
715  "Overlaps join only supported for geometry outer columns of type point, "
716  "geometry columns with bounds or constructed points");
717  }
718  } else {
719  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
720  inner_col_real_ti.is_decimal() ||
721  (inner_col_real_ti.is_string() &&
722  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
723  throw HashJoinFail(
724  "Can only apply hash join to integer-like types and dictionary encoded "
725  "strings");
726  }
727  }
728 
729  auto normalized_inner_col = inner_col;
730  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
731 
732  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
733  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
734 
735  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
736  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
737  normalized_inner_ti.get_type_name() + " and " +
738  normalized_outer_ti.get_type_name()));
739  }
740 
741  return {normalized_inner_col, normalized_outer_col};
742 }
743 
744 std::vector<InnerOuter> HashJoin::normalizeColumnPairs(
745  const Analyzer::BinOper* condition,
747  const TemporaryTables* temporary_tables) {
748  std::vector<InnerOuter> result;
749  const auto lhs_tuple_expr =
750  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
751  const auto rhs_tuple_expr =
752  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
753 
754  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
755  if (lhs_tuple_expr) {
756  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
757  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
758  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
759  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
760  result.push_back(normalizeColumnPair(lhs_tuple[i].get(),
761  rhs_tuple[i].get(),
762  cat,
763  temporary_tables,
764  condition->is_overlaps_oper()));
765  }
766  } else {
767  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
768  result.push_back(normalizeColumnPair(condition->get_left_operand(),
769  condition->get_right_operand(),
770  cat,
771  temporary_tables,
772  condition->is_overlaps_oper()));
773  }
774 
775  return result;
776 }
777 
778 namespace {
779 
780 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
782  const TemporaryTables* temporary_tables) {
783  const auto lhs = qual_bin_oper->get_left_operand();
784  const auto rhs = qual_bin_oper->get_right_operand();
785  return HashJoin::normalizeColumnPair(lhs, rhs, cat, temporary_tables);
786 }
787 
788 } // namespace
789 
790 size_t get_shard_count(const Analyzer::BinOper* join_condition,
791  const Executor* executor) {
792  const Analyzer::ColumnVar* inner_col{nullptr};
793  const Analyzer::Expr* outer_col{nullptr};
794  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
795  try {
796  std::tie(inner_col, outer_col) =
797  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
798  } catch (...) {
799  return 0;
800  }
801  if (!inner_col || !outer_col) {
802  return 0;
803  }
804  return get_shard_count({inner_col, outer_col}, executor);
805 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< InputTableInfo > getSyntheticInputTableInfo(std::set< const Analyzer::ColumnVar * > cvs, Executor *executor)
Definition: HashJoin.cpp:456
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:108
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
std::string toStringFlat(const HashJoin *hash_table, const ExecutorDeviceType device_type, const int device_id)
Definition: HashJoin.cpp:94
ExecutorDeviceType
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:112
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:31
std::set< const Analyzer::ColumnVar * > aggregateResult(const std::set< const Analyzer::ColumnVar * > &aggregate, const std::set< const Analyzer::ColumnVar * > &next_result) const override
Definition: HashJoin.cpp:433
const Expr * get_right_operand() const
Definition: Analyzer.h:442
bool is_constructed_point(const Analyzer::Expr *expr)
Definition: Execute.h:1226
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:234
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Definition: HashTable.h:21
virtual int8_t * alloc(const size_t num_bytes)=0
T visit(const Analyzer::Expr *expr) const
#define CHECK_GT(x, y)
Definition: Logger.h:221
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:117
std::string to_string(char const *&&v)
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:98
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:81
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
size_t col_chunks_buff_sz
const std::vector< std::shared_ptr< Analyzer::Expr > > & getTuple() const
Definition: Analyzer.h:243
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:164
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:218
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:258
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const int8_t * col_chunks_buff
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:587
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
void setupSyntheticCaching(std::set< const Analyzer::ColumnVar * > cvs, Executor *executor)
Definition: HashJoin.cpp:442
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
#define VLOGGING(n)
Definition: Logger.h:207
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:334
#define CHECK_LT(x, y)
Definition: Logger.h:219
static RegisteredQueryHint defaults()
Definition: QueryHint.h:187
Definition: sqldefs.h:69
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:244
bool table_is_replicated(const TableDescriptor *td)
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
std::set< const Analyzer::ColumnVar * > visitColumnVarTuple(const Analyzer::ExpressionTuple *expr_tuple) const override
Definition: HashJoin.cpp:422
static InnerOuter normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:603
std::set< const Analyzer::ColumnVar * > visitColumnVar(const Analyzer::ColumnVar *column) const override
Definition: HashJoin.cpp:417
static std::vector< InnerOuter > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:744
#define CHECK(condition)
Definition: Logger.h:209
std::set< int32_t > payload
Definition: HashTable.h:23
#define DEBUG_TIMER(name)
Definition: Logger.h:352
bool g_cluster
const Expr * get_left_operand() const
Definition: Analyzer.h:441
bool is_overlaps_oper() const
Definition: Analyzer.h:439
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:483
std::vector< int64_t > key
Definition: HashTable.h:22
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:83
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:790
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:371
HashType
Definition: HashTable.h:19
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:780
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
#define VLOG(n)
Definition: Logger.h:303
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap