OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
33 #include "Shared/DbObjectKeys.h"
34 #include "StringOps/StringOpInfo.h"
35 
36 class CodeGenerator;
37 
38 class JoinHashTableTooBig : public std::runtime_error {
39  public:
40  JoinHashTableTooBig(size_t cur_hash_table_size, size_t query_hint_size)
41  : std::runtime_error(
42  "The size of hash table is larger than a threshold defined in the query hint "
43  "(" +
44  ::toString(cur_hash_table_size) + " > " + ::toString(query_hint_size) + ")") {
45  }
46 };
47 
48 class TooManyHashEntries : public std::runtime_error {
49  public:
51  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
52 
53  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
54 };
55 
56 class TableMustBeReplicated : public std::runtime_error {
57  public:
58  TableMustBeReplicated(const std::string& table_name)
59  : std::runtime_error("Hash join failed: Table '" + table_name +
60  "' must be replicated.") {}
61 };
62 
63 enum class InnerQualDecision { IGNORE = 0, UNKNOWN, LHS, RHS };
64 
65 #ifndef __CUDACC__
66 inline std::ostream& operator<<(std::ostream& os, InnerQualDecision const decision) {
67  constexpr char const* strings[]{"IGNORE", "UNKNOWN", "LHS", "RHS"};
68  return os << strings[static_cast<int>(decision)];
69 }
70 #endif
71 
72 class HashJoinFail : public std::runtime_error {
73  public:
74  HashJoinFail(const std::string& err_msg)
75  : std::runtime_error(err_msg), inner_qual_decision(InnerQualDecision::UNKNOWN) {}
76  HashJoinFail(const std::string& err_msg, InnerQualDecision qual_decision)
77  : std::runtime_error(err_msg), inner_qual_decision(qual_decision) {}
78 
80 };
81 
83  public:
84  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
85 };
86 
88  public:
90  : HashJoinFail("Not enough memory for columns involved in join") {}
91 };
92 
94  public:
95  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
96 };
97 
99  public:
100  OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
101  : HashJoinFail(
102  "Could not create overlaps hash table with less than max allowed size of " +
103  std::to_string(overlaps_hash_table_max_bytes) + " bytes") {}
104 };
105 
106 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
107 using InnerOuterStringOpInfos = std::pair<std::vector<StringOps_Namespace::StringOpInfo>,
108  std::vector<StringOps_Namespace::StringOpInfo>>;
109 
111  const std::vector<JoinColumn> join_columns;
112  const std::vector<JoinColumnTypeInfo> join_column_types;
113  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
114  std::vector<JoinBucketInfo> join_buckets;
115  const std::vector<std::shared_ptr<void>> malloc_owner;
116 
117  void setBucketInfo(const std::vector<double>& bucket_sizes_for_dimension,
118  const std::vector<InnerOuter> inner_outer_pairs);
119 };
120 
122  llvm::Value* elements;
123  llvm::Value* count;
124  llvm::Value* slot;
125 };
126 
128  std::vector<const void*> sd_inner_proxy_per_key;
129  std::vector<void*> sd_outer_proxy_per_key;
130  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
131 };
132 
133 class DeviceAllocator;
134 
135 class HashJoin {
136  public:
137  virtual std::string toString(const ExecutorDeviceType device_type,
138  const int device_id = 0,
139  bool raw = false) const = 0;
140 
141  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
142  const int device_id) const;
143 
144  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
145  const int device_id) const;
146 
147  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
148  const int device_id) const = 0;
149 
150  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
151 
153  const size_t) = 0;
154 
155  virtual shared::TableKey getInnerTableId() const noexcept = 0;
156 
157  virtual int getInnerTableRteIdx() const noexcept = 0;
158 
159  virtual HashType getHashType() const noexcept = 0;
160 
161  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
162  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
163  }
164 
165  static std::string getHashTypeString(HashType ht) noexcept {
166  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
167  return HashTypeStrings[static_cast<int>(ht)];
168  };
169 
171  const std::vector<llvm::Value*>& hash_join_idx_args_in,
172  const bool is_sharded,
173  const bool col_is_nullable,
174  const bool is_bw_eq,
175  const int64_t sub_buff_size,
176  Executor* executor,
177  const bool is_bucketized = false);
178 
179  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
180 
181  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
182 
183  virtual int getDeviceCount() const noexcept = 0;
184 
185  virtual size_t offsetBufferOff() const noexcept = 0;
186 
187  virtual size_t countBufferOff() const noexcept = 0;
188 
189  virtual size_t payloadBufferOff() const noexcept = 0;
190 
191  virtual std::string getHashJoinType() const = 0;
192 
193  virtual bool isBitwiseEq() const = 0;
194 
196  const Analyzer::ColumnVar* hash_col,
197  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
198  const Data_Namespace::MemoryLevel effective_memory_level,
199  const int device_id,
200  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
201  DeviceAllocator* dev_buff_owner,
202  std::vector<std::shared_ptr<void>>& malloc_owner,
203  Executor* executor,
204  ColumnCacheMap* column_cache);
205 
207  static std::shared_ptr<HashJoin> getInstance(
208  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
209  const std::vector<InputTableInfo>& query_infos,
210  const Data_Namespace::MemoryLevel memory_level,
211  const JoinType join_type,
212  const HashType preferred_hash_type,
213  const int device_count,
214  ColumnCacheMap& column_cache,
215  Executor* executor,
216  const HashTableBuildDagMap& hashtable_build_dag_map,
217  const RegisteredQueryHint& query_hint,
218  const TableIdToNodeMap& table_id_to_node_map);
219 
221  static std::shared_ptr<HashJoin> getSyntheticInstance(
222  std::string_view table1,
223  std::string_view column1,
224  const Catalog_Namespace::Catalog& catalog1,
225  std::string_view table2,
226  std::string_view column2,
227  const Catalog_Namespace::Catalog& catalog2,
228  const Data_Namespace::MemoryLevel memory_level,
229  const HashType preferred_hash_type,
230  const int device_count,
231  ColumnCacheMap& column_cache,
232  Executor* executor);
233 
235  static std::shared_ptr<HashJoin> getSyntheticInstance(
236  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
237  const Data_Namespace::MemoryLevel memory_level,
238  const HashType preferred_hash_type,
239  const int device_count,
240  ColumnCacheMap& column_cache,
241  Executor* executor);
242 
243  static std::pair<std::string, std::shared_ptr<HashJoin>> getSyntheticInstance(
244  std::vector<std::shared_ptr<Analyzer::BinOper>>,
245  const Data_Namespace::MemoryLevel memory_level,
246  const HashType preferred_hash_type,
247  const int device_count,
248  ColumnCacheMap& column_cache,
249  Executor* executor);
250 
251  static shared::TableKey getInnerTableId(
252  const std::vector<InnerOuter>& inner_outer_pairs) {
253  CHECK(!inner_outer_pairs.empty());
254  const auto first_inner_col = inner_outer_pairs.front().first;
255  return first_inner_col->getTableKey();
256  }
257 
258  static bool canAccessHashTable(bool allow_hash_table_recycling,
259  bool invalid_cache_key,
260  JoinType join_type);
261 
262  static void checkHashJoinReplicationConstraint(const shared::TableKey& table_key,
263  const size_t shard_count,
264  const Executor* executor);
265 
266  // Swap the columns if needed and make the inner column the first component.
267  static std::pair<InnerOuter, InnerOuterStringOpInfos> normalizeColumnPair(
268  const Analyzer::Expr* lhs,
269  const Analyzer::Expr* rhs,
270  const TemporaryTables* temporary_tables,
271  const bool is_overlaps_join = false);
272 
273  template <typename T>
274  static const T* getHashJoinColumn(const Analyzer::Expr* expr);
275 
276  // Normalize each expression tuple
277  static std::pair<std::vector<InnerOuter>, std::vector<InnerOuterStringOpInfos>>
278  normalizeColumnPairs(const Analyzer::BinOper* condition,
279  const TemporaryTables* temporary_tables);
280 
281  HashTable* getHashTableForDevice(const size_t device_id) const {
282  CHECK_LT(device_id, hash_tables_for_device_.size());
283  return hash_tables_for_device_[device_id].get();
284  }
285 
286  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
287  CHECK(device_type == ExecutorDeviceType::CPU);
288  return getJoinHashBufferSize(device_type, 0);
289  }
290 
291  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
292  const int device_id) const {
293  auto hash_table = getHashTableForDevice(device_id);
294  if (!hash_table) {
295  return 0;
296  }
297  return hash_table->getHashTableBufferSize(device_type);
298  }
299 
300  int8_t* getJoinHashBuffer(const ExecutorDeviceType device_type,
301  const int device_id) const {
302  // TODO: just make device_id a size_t
303  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
304  if (!hash_tables_for_device_[device_id]) {
305  return nullptr;
306  }
307  CHECK(hash_tables_for_device_[device_id]);
308  auto hash_table = hash_tables_for_device_[device_id].get();
309 #ifdef HAVE_CUDA
310  if (device_type == ExecutorDeviceType::CPU) {
311  return hash_table->getCpuBuffer();
312  } else {
313  CHECK(hash_table);
314  const auto gpu_buff = hash_table->getGpuBuffer();
315  return gpu_buff;
316  }
317 #else
318  CHECK(device_type == ExecutorDeviceType::CPU);
319  return hash_table->getCpuBuffer();
320 #endif
321  }
322 
324  auto empty_hash_tables =
326  hash_tables_for_device_.swap(empty_hash_tables);
327  }
328 
329  static std::vector<int> collectFragmentIds(
330  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments);
331 
333  const std::vector<InnerOuter>& inner_outer_pairs,
334  const Executor* executor,
335  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs = {});
336 
337  static std::vector<const StringDictionaryProxy::IdMap*>
339  const CompositeKeyInfo& composite_key_info,
340  const std::vector<InnerOuterStringOpInfos>& string_op_infos_for_keys,
341  const Executor* executor);
342 
343  static std::pair<const StringDictionaryProxy*, StringDictionaryProxy*>
344  getStrDictProxies(const InnerOuter& cols,
345  const Executor* executor,
346  const bool has_string_ops);
347 
349  const InnerOuter& cols,
350  const InnerOuterStringOpInfos& inner_outer_string_op_infos,
351  ExpressionRange& old_col_range,
352  const Executor* executor);
353 
354  protected:
355  static llvm::Value* codegenColOrStringOper(
356  const Analyzer::Expr* col_or_string_oper,
357  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
358  CodeGenerator& code_generator,
359  const CompilationOptions& co);
360 
361  virtual size_t getComponentBufferSize() const noexcept = 0;
362 
363  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
364 };
365 
366 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
367 
368 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
369 
370 std::ostream& operator<<(std::ostream& os,
371  const InnerOuterStringOpInfos& inner_outer_string_op_infos);
372 std::ostream& operator<<(
373  std::ostream& os,
374  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
375 
376 std::string toString(const InnerOuterStringOpInfos& inner_outer_string_op_infos);
377 
378 std::string toString(
379  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
380 
381 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(
382  std::string_view table,
383  std::string_view column,
384  int rte_idx,
385  const Catalog_Namespace::Catalog& catalog);
386 
387 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
388 
389 size_t get_shard_count(
390  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
391  const Executor* executor);
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:452
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, const Catalog_Namespace::Catalog &catalog1, std::string_view table2, std::string_view column2, const Catalog_Namespace::Catalog &catalog2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:673
Defines data structures for the semantic analysis phase of query processing.
virtual int getInnerTableRteIdx() const noexcept=0
virtual size_t payloadBufferOff() const noexcept=0
virtual std::string getHashJoinType() const =0
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:165
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:106
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1030
virtual HashType getHashType() const noexcept=0
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:130
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:128
virtual int getDeviceCount() const noexcept=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:116
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:779
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:35
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
llvm::Value * elements
Definition: HashJoin.h:122
llvm::Value * count
Definition: HashJoin.h:123
virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept=0
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:363
static std::pair< const StringDictionaryProxy *, StringDictionaryProxy * > getStrDictProxies(const InnerOuter &cols, const Executor *executor, const bool has_string_ops)
Definition: HashJoin.cpp:385
Definition: HashTable.h:21
OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
Definition: HashJoin.h:100
virtual llvm::Value * codegenSlot(const CompilationOptions &, const size_t)=0
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:58
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:545
void freeHashBufferMemory()
Definition: HashJoin.h:323
virtual size_t offsetBufferOff() const noexcept=0
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:121
std::string to_string(char const *&&v)
virtual size_t countBufferOff() const noexcept=0
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
JoinHashTableTooBig(size_t cur_hash_table_size, size_t query_hint_size)
Definition: HashJoin.h:40
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:112
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< void * > sd_outer_proxy_per_key
Definition: HashJoin.h:129
HashJoinFail(const std::string &err_msg, InnerQualDecision qual_decision)
Definition: HashJoin.h:76
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:300
static std::vector< const StringDictionaryProxy::IdMap * > translateCompositeStrDictProxies(const CompositeKeyInfo &composite_key_info, const std::vector< InnerOuterStringOpInfos > &string_op_infos_for_keys, const Executor *executor)
Definition: HashJoin.cpp:509
std::string toString(const ExecutorDeviceType &device_type)
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:291
virtual size_t getComponentBufferSize() const noexcept=0
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:113
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
Definition: HashJoin.cpp:414
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:805
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:281
virtual shared::TableKey getInnerTableId() const noexcept=0
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
#define CHECK_LT(x, y)
Definition: Logger.h:303
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:53
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:165
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:286
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashJoinFail(const std::string &err_msg)
Definition: HashJoin.h:74
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, const Catalog_Namespace::Catalog &catalog)
Definition: HashJoin.cpp:561
#define CHECK(condition)
Definition: Logger.h:291
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
Definition: HashJoin.h:108
llvm::Value * slot
Definition: HashJoin.h:124
static const T * getHashJoinColumn(const Analyzer::Expr *expr)
Definition: HashJoin.cpp:796
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:996
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:73
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:115
InnerQualDecision
Definition: HashJoin.h:63
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:114
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1048
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:283
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
HashType
Definition: HashTable.h:19
InnerQualDecision inner_qual_decision
Definition: HashJoin.h:79
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:111
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:161
virtual bool isBitwiseEq() const =0
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:461