OmniSciDB  0264ff685a
HashJoin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 #include "QueryEngine/Execute.h"
28 
29 extern bool g_enable_overlaps_hashjoin;
30 
36  const Analyzer::ColumnVar* hash_col,
37  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
38  const Data_Namespace::MemoryLevel effective_memory_level,
39  const int device_id,
40  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
41  DeviceAllocator* dev_buff_owner,
42  std::vector<std::shared_ptr<void>>& malloc_owner,
43  Executor* executor,
44  ColumnCacheMap* column_cache) {
45  static std::mutex fragment_fetch_mutex;
46  std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
47  try {
48  JoinColumn join_column = ColumnFetcher::makeJoinColumn(executor,
49  *hash_col,
50  fragment_info,
51  effective_memory_level,
52  device_id,
53  dev_buff_owner,
54  chunks_owner,
55  malloc_owner,
56  *column_cache);
57  if (effective_memory_level == Data_Namespace::GPU_LEVEL) {
58  CHECK(dev_buff_owner);
59  auto device_col_chunks_buff = dev_buff_owner->alloc(join_column.col_chunks_buff_sz);
60  dev_buff_owner->copyToDevice(device_col_chunks_buff,
61  join_column.col_chunks_buff,
62  join_column.col_chunks_buff_sz);
63  join_column.col_chunks_buff = device_col_chunks_buff;
64  }
65  return join_column;
66  } catch (...) {
67  throw FailedToFetchColumn();
68  }
69 }
70 
71 namespace {
72 
73 template <typename T>
74 std::string toStringFlat(const HashJoin* hash_table,
75  const ExecutorDeviceType device_type,
76  const int device_id) {
77  auto mem =
78  reinterpret_cast<const T*>(hash_table->getJoinHashBuffer(device_type, device_id));
79  auto memsz = hash_table->getJoinHashBufferSize(device_type, device_id) / sizeof(T);
80  std::string txt;
81  for (size_t i = 0; i < memsz; ++i) {
82  if (i > 0) {
83  txt += ", ";
84  }
85  txt += std::to_string(mem[i]);
86  }
87  return txt;
88 }
89 
90 } // anonymous namespace
91 
92 std::string HashJoin::toStringFlat64(const ExecutorDeviceType device_type,
93  const int device_id) const {
94  return toStringFlat<int64_t>(this, device_type, device_id);
95 }
96 
97 std::string HashJoin::toStringFlat32(const ExecutorDeviceType device_type,
98  const int device_id) const {
99  return toStringFlat<int32_t>(this, device_type, device_id);
100 }
101 
102 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e) {
103  os << " {{";
104  bool first = true;
105  for (auto k : e.key) {
106  if (!first) {
107  os << ",";
108  } else {
109  first = false;
110  }
111  os << k;
112  }
113  os << "}, ";
114  os << "{";
115  first = true;
116  for (auto p : e.payload) {
117  if (!first) {
118  os << ", ";
119  } else {
120  first = false;
121  }
122  os << p;
123  }
124  os << "}}";
125  return os;
126 }
127 
128 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s) {
129  os << "{\n";
130  bool first = true;
131  for (auto e : s) {
132  if (!first) {
133  os << ",\n";
134  } else {
135  first = false;
136  }
137  os << e;
138  }
139  if (!s.empty()) {
140  os << "\n";
141  }
142  os << "}\n";
143  return os;
144 }
145 
147  const std::vector<llvm::Value*>& hash_join_idx_args_in,
148  const bool is_sharded,
149  const bool col_is_nullable,
150  const bool is_bw_eq,
151  const int64_t sub_buff_size,
152  Executor* executor,
153  bool is_bucketized) {
154  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
155  using namespace std::string_literals;
156 
157  std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s);
158 
159  if (is_bw_eq) {
160  fname += "_bitwise";
161  }
162  if (is_sharded) {
163  fname += "_sharded";
164  }
165  if (!is_bw_eq && col_is_nullable) {
166  fname += "_nullable";
167  }
168 
169  const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
170  const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
171  slot_lv, executor->cgen_state_->llInt(int64_t(0)));
172 
173  auto pos_ptr = hash_join_idx_args_in[0];
174  CHECK(pos_ptr);
175 
176  auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
177  pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
178  auto hash_join_idx_args = hash_join_idx_args_in;
179  hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
180  count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
181 
182  const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
183  slot_valid_lv,
184  executor->cgen_state_->emitCall(fname, hash_join_idx_args),
185  executor->cgen_state_->llInt(int64_t(0)));
186  auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
187  executor->cgen_state_->ir_builder_.CreateAdd(
188  pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
189  llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
190  auto rowid_ptr_i32 =
191  executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
192  return {rowid_ptr_i32, row_count_lv, slot_lv};
193 }
194 
195 llvm::Value* HashJoin::codegenHashTableLoad(const size_t table_idx, Executor* executor) {
196  AUTOMATIC_IR_METADATA(executor->cgen_state_.get());
197  llvm::Value* hash_ptr = nullptr;
198  const auto total_table_count =
199  executor->plan_state_->join_info_.join_hash_tables_.size();
200  CHECK_LT(table_idx, total_table_count);
201  if (total_table_count > 1) {
202  auto hash_tables_ptr =
203  get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
204  auto hash_pptr =
205  table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
206  hash_tables_ptr,
207  executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
208  : hash_tables_ptr;
209  hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
210  } else {
211  hash_ptr = get_arg_by_name(executor->cgen_state_->row_func_, "join_hash_tables");
212  }
213  CHECK(hash_ptr);
214  return hash_ptr;
215 }
216 
218 std::shared_ptr<HashJoin> HashJoin::getInstance(
219  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
220  const std::vector<InputTableInfo>& query_infos,
221  const Data_Namespace::MemoryLevel memory_level,
222  const HashType preferred_hash_type,
223  const int device_count,
224  ColumnCacheMap& column_cache,
225  Executor* executor,
226  const QueryHint& query_hint) {
227  auto timer = DEBUG_TIMER(__func__);
228  std::shared_ptr<HashJoin> join_hash_table;
229  CHECK_GT(device_count, 0);
230  if (!g_enable_overlaps_hashjoin && qual_bin_oper->is_overlaps_oper()) {
231  throw std::runtime_error(
232  "Overlaps hash join disabled, attempting to fall back to loop join");
233  }
234  if (qual_bin_oper->is_overlaps_oper()) {
235  VLOG(1) << "Trying to build geo hash table:";
236  join_hash_table = OverlapsJoinHashTable::getInstance(qual_bin_oper,
237  query_infos,
238  memory_level,
239  device_count,
240  column_cache,
241  executor,
242  query_hint);
243  } else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
244  qual_bin_oper->get_left_operand())) {
245  VLOG(1) << "Trying to build keyed hash table:";
246  join_hash_table = BaselineJoinHashTable::getInstance(qual_bin_oper,
247  query_infos,
248  memory_level,
249  preferred_hash_type,
250  device_count,
251  column_cache,
252  executor);
253  } else {
254  try {
255  VLOG(1) << "Trying to build perfect hash table:";
256  join_hash_table = PerfectJoinHashTable::getInstance(qual_bin_oper,
257  query_infos,
258  memory_level,
259  preferred_hash_type,
260  device_count,
261  column_cache,
262  executor);
263  } catch (TooManyHashEntries&) {
264  const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
265  CHECK_EQ(join_quals.size(), size_t(1));
266  const auto join_qual =
267  std::dynamic_pointer_cast<Analyzer::BinOper>(join_quals.front());
268  VLOG(1) << "Trying to build keyed hash table after perfect hash table:";
269  join_hash_table = BaselineJoinHashTable::getInstance(join_qual,
270  query_infos,
271  memory_level,
272  preferred_hash_type,
273  device_count,
274  column_cache,
275  executor);
276  }
277  }
278  CHECK(join_hash_table);
279  if (VLOGGING(2)) {
280  if (join_hash_table->getMemoryLevel() == Data_Namespace::MemoryLevel::GPU_LEVEL) {
281  for (int device_id = 0; device_id < join_hash_table->getDeviceCount();
282  ++device_id) {
283  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::GPU, device_id) <=
284  1000) {
285  VLOG(2) << "Built GPU hash table: "
286  << join_hash_table->toString(ExecutorDeviceType::GPU, device_id);
287  }
288  }
289  } else {
290  if (join_hash_table->getJoinHashBufferSize(ExecutorDeviceType::CPU) <= 1000) {
291  VLOG(2) << "Built CPU hash table: "
292  << join_hash_table->toString(ExecutorDeviceType::CPU);
293  }
294  }
295  }
296  return join_hash_table;
297 }
298 
300  const std::vector<InnerOuter>& inner_outer_pairs,
301  const Executor* executor) {
302  CHECK(executor);
303  std::vector<const void*> sd_inner_proxy_per_key;
304  std::vector<const void*> sd_outer_proxy_per_key;
305  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
306  const auto db_id = executor->getCatalog()->getCurrentDB().dbId;
307  for (const auto& inner_outer_pair : inner_outer_pairs) {
308  const auto inner_col = inner_outer_pair.first;
309  const auto outer_col = inner_outer_pair.second;
310  const auto& inner_ti = inner_col->get_type_info();
311  const auto& outer_ti = outer_col->get_type_info();
312  ChunkKey cache_key_chunks_for_column{
313  db_id, inner_col->get_table_id(), inner_col->get_column_id()};
314  if (inner_ti.is_string() &&
315  !(inner_ti.get_comp_param() == outer_ti.get_comp_param())) {
316  CHECK(outer_ti.is_string());
317  CHECK(inner_ti.get_compression() == kENCODING_DICT &&
318  outer_ti.get_compression() == kENCODING_DICT);
319  const auto sd_inner_proxy = executor->getStringDictionaryProxy(
320  inner_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
321  const auto sd_outer_proxy = executor->getStringDictionaryProxy(
322  outer_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
323  CHECK(sd_inner_proxy && sd_outer_proxy);
324  sd_inner_proxy_per_key.push_back(sd_inner_proxy);
325  sd_outer_proxy_per_key.push_back(sd_outer_proxy);
326  cache_key_chunks_for_column.push_back(sd_outer_proxy->getGeneration());
327  } else {
328  sd_inner_proxy_per_key.emplace_back();
329  sd_outer_proxy_per_key.emplace_back();
330  }
331  cache_key_chunks.push_back(cache_key_chunks_for_column);
332  }
333  return {sd_inner_proxy_per_key, sd_outer_proxy_per_key, cache_key_chunks};
334 }
335 
336 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
337  std::string_view column,
338  int rte_idx,
339  Executor* executor) {
340  auto catalog = executor->getCatalog();
341  CHECK(catalog);
342 
343  auto tmeta = catalog->getMetadataForTable(std::string(table));
344  CHECK(tmeta);
345 
346  auto cmeta = catalog->getMetadataForColumn(tmeta->tableId, std::string(column));
347  CHECK(cmeta);
348 
349  auto ti = cmeta->columnType;
350 
351  if (ti.is_geometry() && ti.get_type() != kPOINT) {
352  int geoColumnId{0};
353  switch (ti.get_type()) {
354  case kLINESTRING: {
355  geoColumnId = cmeta->columnId + 2;
356  break;
357  }
358  case kPOLYGON: {
359  geoColumnId = cmeta->columnId + 3;
360  break;
361  }
362  case kMULTIPOLYGON: {
363  geoColumnId = cmeta->columnId + 4;
364  break;
365  }
366  default:
367  CHECK(false);
368  }
369  cmeta = catalog->getMetadataForColumn(tmeta->tableId, geoColumnId);
370  CHECK(cmeta);
371  ti = cmeta->columnType;
372  }
373 
374  auto cv =
375  std::make_shared<Analyzer::ColumnVar>(ti, tmeta->tableId, cmeta->columnId, rte_idx);
376  return cv;
377 }
378 
380  : public ScalarExprVisitor<std::set<const Analyzer::ColumnVar*>> {
381  protected:
382  std::set<const Analyzer::ColumnVar*> visitColumnVar(
383  const Analyzer::ColumnVar* column) const override {
384  return {column};
385  }
386 
387  std::set<const Analyzer::ColumnVar*> visitColumnVarTuple(
388  const Analyzer::ExpressionTuple* expr_tuple) const override {
389  AllColumnVarsVisitor visitor;
390  std::set<const Analyzer::ColumnVar*> result;
391  for (const auto& expr_component : expr_tuple->getTuple()) {
392  const auto component_rte_set = visitor.visit(expr_component.get());
393  result.insert(component_rte_set.begin(), component_rte_set.end());
394  }
395  return result;
396  }
397 
398  std::set<const Analyzer::ColumnVar*> aggregateResult(
399  const std::set<const Analyzer::ColumnVar*>& aggregate,
400  const std::set<const Analyzer::ColumnVar*>& next_result) const override {
401  auto result = aggregate;
402  result.insert(next_result.begin(), next_result.end());
403  return result;
404  }
405 };
406 
407 void setupSyntheticCaching(std::set<const Analyzer::ColumnVar*> cvs, Executor* executor) {
408  std::unordered_set<int> phys_table_ids;
409  for (auto cv : cvs) {
410  phys_table_ids.insert(cv->get_table_id());
411  }
412 
413  std::unordered_set<PhysicalInput> phys_inputs;
414  for (auto cv : cvs) {
415  phys_inputs.emplace(PhysicalInput{cv->get_column_id(), cv->get_table_id()});
416  }
417 
418  executor->setupCaching(phys_inputs, phys_table_ids);
419 }
420 
421 std::vector<InputTableInfo> getSyntheticInputTableInfo(
422  std::set<const Analyzer::ColumnVar*> cvs,
423  Executor* executor) {
424  auto catalog = executor->getCatalog();
425  CHECK(catalog);
426 
427  std::unordered_set<int> phys_table_ids;
428  for (auto cv : cvs) {
429  phys_table_ids.insert(cv->get_table_id());
430  }
431 
432  // NOTE(sy): This vector ordering seems to work for now, but maybe we need to
433  // review how rte_idx is assigned for ColumnVars. See for example Analyzer.h
434  // and RelAlgExecutor.cpp and rte_idx there.
435  std::vector<InputTableInfo> query_infos(phys_table_ids.size());
436  size_t i = 0;
437  for (auto id : phys_table_ids) {
438  auto tmeta = catalog->getMetadataForTable(id);
439  query_infos[i].table_id = id;
440  query_infos[i].info = tmeta->fragmenter->getFragmentsForQuery();
441  ++i;
442  }
443 
444  return query_infos;
445 }
446 
448 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
449  std::string_view table1,
450  std::string_view column1,
451  std::string_view table2,
452  std::string_view column2,
453  const Data_Namespace::MemoryLevel memory_level,
454  const HashType preferred_hash_type,
455  const int device_count,
456  ColumnCacheMap& column_cache,
457  Executor* executor) {
458  auto a1 = getSyntheticColumnVar(table1, column1, 0, executor);
459  auto a2 = getSyntheticColumnVar(table2, column2, 1, executor);
460 
461  auto qual_bin_oper = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kEQ, kONE, a1, a2);
462 
463  std::set<const Analyzer::ColumnVar*> cvs =
464  AllColumnVarsVisitor().visit(qual_bin_oper.get());
465  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
466  setupSyntheticCaching(cvs, executor);
467  QueryHint query_hint = QueryHint::defaults();
468 
469  auto hash_table = HashJoin::getInstance(qual_bin_oper,
470  query_infos,
471  memory_level,
472  preferred_hash_type,
473  device_count,
474  column_cache,
475  executor,
476  query_hint);
477  return hash_table;
478 }
479 
481 std::shared_ptr<HashJoin> HashJoin::getSyntheticInstance(
482  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
483  const Data_Namespace::MemoryLevel memory_level,
484  const HashType preferred_hash_type,
485  const int device_count,
486  ColumnCacheMap& column_cache,
487  Executor* executor) {
488  std::set<const Analyzer::ColumnVar*> cvs =
489  AllColumnVarsVisitor().visit(qual_bin_oper.get());
490  auto query_infos = getSyntheticInputTableInfo(cvs, executor);
491  setupSyntheticCaching(cvs, executor);
492  QueryHint query_hint = QueryHint::defaults();
493 
494  auto hash_table = HashJoin::getInstance(qual_bin_oper,
495  query_infos,
496  memory_level,
497  preferred_hash_type,
498  device_count,
499  column_cache,
500  executor,
501  query_hint);
502  return hash_table;
503 }
504 
506  const size_t shard_count,
507  const Executor* executor) {
508  if (!g_cluster) {
509  return;
510  }
511  if (table_id >= 0) {
512  CHECK(executor);
513  const auto inner_td = executor->getCatalog()->getMetadataForTable(table_id);
514  CHECK(inner_td);
515  if (!shard_count && !table_is_replicated(inner_td)) {
516  throw TableMustBeReplicated(inner_td->tableName);
517  }
518  }
519 }
520 
521 namespace {
522 
523 InnerOuter get_cols(const Analyzer::BinOper* qual_bin_oper,
525  const TemporaryTables* temporary_tables) {
526  const auto lhs = qual_bin_oper->get_left_operand();
527  const auto rhs = qual_bin_oper->get_right_operand();
528  return normalize_column_pair(lhs, rhs, cat, temporary_tables);
529 }
530 
531 } // namespace
532 
533 size_t get_shard_count(const Analyzer::BinOper* join_condition,
534  const Executor* executor) {
535  const Analyzer::ColumnVar* inner_col{nullptr};
536  const Analyzer::Expr* outer_col{nullptr};
537  std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
538  try {
539  std::tie(inner_col, outer_col) =
540  get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
541  } catch (...) {
542  return 0;
543  }
544  if (!inner_col || !outer_col) {
545  return 0;
546  }
547  return get_shard_count({inner_col, outer_col}, executor);
548 }
549 
551  const Analyzer::Expr* rhs,
553  const TemporaryTables* temporary_tables,
554  const bool is_overlaps_join) {
555  const auto& lhs_ti = lhs->get_type_info();
556  const auto& rhs_ti = rhs->get_type_info();
557  if (!is_overlaps_join) {
558  if (lhs_ti.get_type() != rhs_ti.get_type()) {
559  throw HashJoinFail("Equijoin types must be identical, found: " +
560  lhs_ti.get_type_name() + ", " + rhs_ti.get_type_name());
561  }
562  if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
563  !lhs_ti.is_decimal()) {
564  throw HashJoinFail("Cannot apply hash join to inner column type " +
565  lhs_ti.get_type_name());
566  }
567  // Decimal types should be identical.
568  if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
569  lhs_ti.get_precision() != rhs_ti.get_precision())) {
570  throw HashJoinFail("Equijoin with different decimal types");
571  }
572  }
573 
574  const auto lhs_cast = dynamic_cast<const Analyzer::UOper*>(lhs);
575  const auto rhs_cast = dynamic_cast<const Analyzer::UOper*>(rhs);
576  if (lhs_ti.is_string() && (static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
577  (lhs_cast && lhs_cast->get_optype() != kCAST) ||
578  (rhs_cast && rhs_cast->get_optype() != kCAST))) {
579  throw HashJoinFail("Cannot use hash join for given expression");
580  }
581  // Casts to decimal are not suported.
582  if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
583  throw HashJoinFail("Cannot use hash join for given expression");
584  }
585  const auto lhs_col =
586  lhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(lhs_cast->get_operand())
587  : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
588  const auto rhs_col =
589  rhs_cast ? dynamic_cast<const Analyzer::ColumnVar*>(rhs_cast->get_operand())
590  : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
591  if (!lhs_col && !rhs_col) {
592  throw HashJoinFail("Cannot use hash join for given expression");
593  }
594  const Analyzer::ColumnVar* inner_col{nullptr};
595  const Analyzer::ColumnVar* outer_col{nullptr};
596  auto outer_ti = lhs_ti;
597  auto inner_ti = rhs_ti;
598  const Analyzer::Expr* outer_expr{lhs};
599  if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
600  (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
601  inner_col = rhs_col;
602  outer_col = lhs_col;
603  } else {
604  if (lhs_col && lhs_col->get_rte_idx() == 0) {
605  throw HashJoinFail("Cannot use hash join for given expression");
606  }
607  inner_col = lhs_col;
608  outer_col = rhs_col;
609  std::swap(outer_ti, inner_ti);
610  outer_expr = rhs;
611  }
612  if (!inner_col) {
613  throw HashJoinFail("Cannot use hash join for given expression");
614  }
615  if (!outer_col) {
616  MaxRangeTableIndexVisitor rte_idx_visitor;
617  int outer_rte_idx = rte_idx_visitor.visit(outer_expr);
618  // The inner column candidate is not actually inner; the outer
619  // expression contains columns which are at least as deep.
620  if (inner_col->get_rte_idx() <= outer_rte_idx) {
621  throw HashJoinFail("Cannot use hash join for given expression");
622  }
623  }
624  // We need to fetch the actual type information from the catalog since Analyzer
625  // always reports nullable as true for inner table columns in left joins.
626  const auto inner_col_cd = get_column_descriptor_maybe(
627  inner_col->get_column_id(), inner_col->get_table_id(), cat);
628  const auto inner_col_real_ti = get_column_type(inner_col->get_column_id(),
629  inner_col->get_table_id(),
630  inner_col_cd,
631  temporary_tables);
632  const auto& outer_col_ti =
633  !(dynamic_cast<const Analyzer::FunctionOper*>(lhs)) && outer_col
634  ? outer_col->get_type_info()
635  : outer_ti;
636  // Casts from decimal are not supported.
637  if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
638  (lhs_cast || rhs_cast)) {
639  throw HashJoinFail("Cannot use hash join for given expression");
640  }
641  if (is_overlaps_join) {
642  if (!inner_col_real_ti.is_array()) {
643  throw HashJoinFail(
644  "Overlaps join only supported for inner columns with array type");
645  }
646  auto is_bounds_array = [](const auto ti) {
647  return ti.is_fixlen_array() && ti.get_size() == 32;
648  };
649  if (!is_bounds_array(inner_col_real_ti)) {
650  throw HashJoinFail(
651  "Overlaps join only supported for 4-element double fixed length arrays");
652  }
653  if (!(outer_col_ti.get_type() == kPOINT || is_bounds_array(outer_col_ti))) {
654  throw HashJoinFail(
655  "Overlaps join only supported for geometry outer columns of type point or "
656  "geometry columns with bounds");
657  }
658  } else {
659  if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
660  inner_col_real_ti.is_decimal() ||
661  (inner_col_real_ti.is_string() &&
662  inner_col_real_ti.get_compression() == kENCODING_DICT))) {
663  throw HashJoinFail(
664  "Can only apply hash join to integer-like types and dictionary encoded "
665  "strings");
666  }
667  }
668 
669  auto normalized_inner_col = inner_col;
670  auto normalized_outer_col = outer_col ? outer_col : outer_expr;
671 
672  const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
673  const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
674 
675  if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
676  throw HashJoinFail(std::string("Could not build hash tables for incompatible types " +
677  normalized_inner_ti.get_type_name() + " and " +
678  normalized_outer_ti.get_type_name()));
679  }
680 
681  return {normalized_inner_col, normalized_outer_col};
682 }
683 
684 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
686  const TemporaryTables* temporary_tables) {
687  std::vector<InnerOuter> result;
688  const auto lhs_tuple_expr =
689  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_left_operand());
690  const auto rhs_tuple_expr =
691  dynamic_cast<const Analyzer::ExpressionTuple*>(condition->get_right_operand());
692 
693  CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
694  if (lhs_tuple_expr) {
695  const auto& lhs_tuple = lhs_tuple_expr->getTuple();
696  const auto& rhs_tuple = rhs_tuple_expr->getTuple();
697  CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
698  for (size_t i = 0; i < lhs_tuple.size(); ++i) {
699  result.push_back(normalize_column_pair(lhs_tuple[i].get(),
700  rhs_tuple[i].get(),
701  cat,
702  temporary_tables,
703  condition->is_overlaps_oper()));
704  }
705  } else {
706  CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
707  result.push_back(normalize_column_pair(condition->get_left_operand(),
708  condition->get_right_operand(),
709  cat,
710  temporary_tables,
711  condition->is_overlaps_oper()));
712  }
713 
714  return result;
715 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:195
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:82
std::string toStringFlat(const HashJoin *hash_table, const ExecutorDeviceType device_type, const int device_id)
Definition: HashJoin.cpp:74
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
ExecutorDeviceType
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
void setupSyntheticCaching(std::set< const Analyzer::ColumnVar *> cvs, Executor *executor)
Definition: HashJoin.cpp:407
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:35
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
Definition: Execute.h:232
std::vector< InputTableInfo > getSyntheticInputTableInfo(std::set< const Analyzer::ColumnVar *> cvs, Executor *executor)
Definition: HashJoin.cpp:421
Definition: sqldefs.h:49
Definition: sqldefs.h:30
Definition: HashTable.h:21
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
virtual int8_t * alloc(const size_t num_bytes)=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:92
#define CHECK_GT(x, y)
Definition: Logger.h:209
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
std::string to_string(char const *&&v)
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:218
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::set< const Analyzer::ColumnVar * > aggregateResult(const std::set< const Analyzer::ColumnVar *> &aggregate, const std::set< const Analyzer::ColumnVar *> &next_result) const override
Definition: HashJoin.cpp:398
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:97
size_t col_chunks_buff_sz
std::string cat(Ts &&... args)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
Definition: Execute.h:162
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:216
bool is_overlaps_oper() const
Definition: Analyzer.h:440
static QueryHint defaults()
Definition: QueryHint.h:70
const std::vector< std::shared_ptr< Analyzer::Expr > > & getTuple() const
Definition: Analyzer.h:244
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
Definition: HashJoin.cpp:550
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:222
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:684
#define AUTOMATIC_IR_METADATA(CGENSTATE)
const int8_t * col_chunks_buff
std::ostream & operator<<(std::ostream &os, const DecodedJoinHashBufferEntry &e)
Definition: HashJoin.cpp:102
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:505
bool g_enable_overlaps_hashjoin
Definition: Execute.cpp:94
#define VLOGGING(n)
Definition: Logger.h:195
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:299
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqldefs.h:69
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:208
bool table_is_replicated(const TableDescriptor *td)
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
std::set< const Analyzer::ColumnVar * > visitColumnVarTuple(const Analyzer::ExpressionTuple *expr_tuple) const override
Definition: HashJoin.cpp:387
T visit(const Analyzer::Expr *expr) const
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::set< const Analyzer::ColumnVar * > visitColumnVar(const Analyzer::ColumnVar *column) const override
Definition: HashJoin.cpp:382
#define CHECK(condition)
Definition: Logger.h:197
std::set< int32_t > payload
Definition: HashTable.h:23
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int > ChunkKey
Definition: types.h:37
bool g_cluster
DEVICE void swap(ARGS &&... args)
Definition: gpu_enabled.h:114
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:448
std::vector< int64_t > key
Definition: HashTable.h:22
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:533
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:336
HashType
Definition: HashTable.h:19
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:523
#define VLOG(n)
Definition: Logger.h:291
const Expr * get_right_operand() const
Definition: Analyzer.h:443
const Expr * get_left_operand() const
Definition: Analyzer.h:442