OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
33 #include "StringOps/StringOpInfo.h"
34 
35 class CodeGenerator;
36 
37 class JoinHashTableTooBig : public std::runtime_error {
38  public:
39  JoinHashTableTooBig(size_t cur_hash_table_size, size_t query_hint_size)
40  : std::runtime_error(
41  "The size of hash table is larger than a threshold defined in the query hint "
42  "(" +
43  ::toString(cur_hash_table_size) + " > " + ::toString(query_hint_size) + ")") {
44  }
45 };
46 
47 class TooManyHashEntries : public std::runtime_error {
48  public:
50  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
51 
52  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
53 };
54 
55 class TableMustBeReplicated : public std::runtime_error {
56  public:
57  TableMustBeReplicated(const std::string& table_name)
58  : std::runtime_error("Hash join failed: Table '" + table_name +
59  "' must be replicated.") {}
60 };
61 
62 enum class InnerQualDecision { IGNORE = 0, UNKNOWN, LHS, RHS };
63 
64 #ifndef __CUDACC__
65 inline std::ostream& operator<<(std::ostream& os, InnerQualDecision const decision) {
66  constexpr char const* strings[]{"IGNORE", "UNKNOWN", "LHS", "RHS"};
67  return os << strings[static_cast<int>(decision)];
68 }
69 #endif
70 
71 class HashJoinFail : public std::runtime_error {
72  public:
73  HashJoinFail(const std::string& err_msg)
74  : std::runtime_error(err_msg), inner_qual_decision(InnerQualDecision::UNKNOWN) {}
75  HashJoinFail(const std::string& err_msg, InnerQualDecision qual_decision)
76  : std::runtime_error(err_msg), inner_qual_decision(qual_decision) {}
77 
79 };
80 
82  public:
83  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
84 };
85 
87  public:
89  : HashJoinFail("Not enough memory for columns involved in join") {}
90 };
91 
93  public:
94  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
95 };
96 
98  public:
99  OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
100  : HashJoinFail(
101  "Could not create overlaps hash table with less than max allowed size of " +
102  std::to_string(overlaps_hash_table_max_bytes) + " bytes") {}
103 };
104 
105 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
106 using InnerOuterStringOpInfos = std::pair<std::vector<StringOps_Namespace::StringOpInfo>,
107  std::vector<StringOps_Namespace::StringOpInfo>>;
108 
110  const std::vector<JoinColumn> join_columns;
111  const std::vector<JoinColumnTypeInfo> join_column_types;
112  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
113  std::vector<JoinBucketInfo> join_buckets;
114  const std::vector<std::shared_ptr<void>> malloc_owner;
115 
116  void setBucketInfo(const std::vector<double>& bucket_sizes_for_dimension,
117  const std::vector<InnerOuter> inner_outer_pairs);
118 };
119 
121  llvm::Value* elements;
122  llvm::Value* count;
123  llvm::Value* slot;
124 };
125 
127  std::vector<const void*> sd_inner_proxy_per_key;
128  std::vector<void*> sd_outer_proxy_per_key;
129  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
130 };
131 
132 class DeviceAllocator;
133 
134 class HashJoin {
135  public:
136  virtual std::string toString(const ExecutorDeviceType device_type,
137  const int device_id = 0,
138  bool raw = false) const = 0;
139 
140  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
141  const int device_id) const;
142 
143  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
144  const int device_id) const;
145 
146  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
147  const int device_id) const = 0;
148 
149  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
150 
152  const size_t) = 0;
153 
154  virtual int getInnerTableId() const noexcept = 0;
155 
156  virtual int getInnerTableRteIdx() const noexcept = 0;
157 
158  virtual HashType getHashType() const noexcept = 0;
159 
160  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
161  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
162  }
163 
164  static std::string getHashTypeString(HashType ht) noexcept {
165  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
166  return HashTypeStrings[static_cast<int>(ht)];
167  };
168 
170  const std::vector<llvm::Value*>& hash_join_idx_args_in,
171  const bool is_sharded,
172  const bool col_is_nullable,
173  const bool is_bw_eq,
174  const int64_t sub_buff_size,
175  Executor* executor,
176  const bool is_bucketized = false);
177 
178  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
179 
180  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
181 
182  virtual int getDeviceCount() const noexcept = 0;
183 
184  virtual size_t offsetBufferOff() const noexcept = 0;
185 
186  virtual size_t countBufferOff() const noexcept = 0;
187 
188  virtual size_t payloadBufferOff() const noexcept = 0;
189 
190  virtual std::string getHashJoinType() const = 0;
191 
192  virtual bool isBitwiseEq() const = 0;
193 
195  const Analyzer::ColumnVar* hash_col,
196  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
197  const Data_Namespace::MemoryLevel effective_memory_level,
198  const int device_id,
199  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
200  DeviceAllocator* dev_buff_owner,
201  std::vector<std::shared_ptr<void>>& malloc_owner,
202  Executor* executor,
203  ColumnCacheMap* column_cache);
204 
206  static std::shared_ptr<HashJoin> getInstance(
207  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
208  const std::vector<InputTableInfo>& query_infos,
209  const Data_Namespace::MemoryLevel memory_level,
210  const JoinType join_type,
211  const HashType preferred_hash_type,
212  const int device_count,
213  ColumnCacheMap& column_cache,
214  Executor* executor,
215  const HashTableBuildDagMap& hashtable_build_dag_map,
216  const RegisteredQueryHint& query_hint,
217  const TableIdToNodeMap& table_id_to_node_map);
218 
220  static std::shared_ptr<HashJoin> getSyntheticInstance(
221  std::string_view table1,
222  std::string_view column1,
223  std::string_view table2,
224  std::string_view column2,
225  const Data_Namespace::MemoryLevel memory_level,
226  const HashType preferred_hash_type,
227  const int device_count,
228  ColumnCacheMap& column_cache,
229  Executor* executor);
230 
232  static std::shared_ptr<HashJoin> getSyntheticInstance(
233  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
234  const Data_Namespace::MemoryLevel memory_level,
235  const HashType preferred_hash_type,
236  const int device_count,
237  ColumnCacheMap& column_cache,
238  Executor* executor);
239 
240  static std::pair<std::string, std::shared_ptr<HashJoin>> getSyntheticInstance(
241  std::vector<std::shared_ptr<Analyzer::BinOper>>,
242  const Data_Namespace::MemoryLevel memory_level,
243  const HashType preferred_hash_type,
244  const int device_count,
245  ColumnCacheMap& column_cache,
246  Executor* executor);
247 
248  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs) {
249  CHECK(!inner_outer_pairs.empty());
250  const auto first_inner_col = inner_outer_pairs.front().first;
251  return first_inner_col->get_table_id();
252  }
253 
254  static bool canAccessHashTable(bool allow_hash_table_recycling,
255  bool invalid_cache_key,
256  JoinType join_type);
257 
258  static void checkHashJoinReplicationConstraint(const int table_id,
259  const size_t shard_count,
260  const Executor* executor);
261 
262  // Swap the columns if needed and make the inner column the first component.
263  static std::pair<InnerOuter, InnerOuterStringOpInfos> normalizeColumnPair(
264  const Analyzer::Expr* lhs,
265  const Analyzer::Expr* rhs,
267  const TemporaryTables* temporary_tables,
268  const bool is_overlaps_join = false);
269 
270  template <typename T>
271  static const T* getHashJoinColumn(const Analyzer::Expr* expr);
272 
273  // Normalize each expression tuple
274  static std::pair<std::vector<InnerOuter>, std::vector<InnerOuterStringOpInfos>>
275  normalizeColumnPairs(const Analyzer::BinOper* condition,
277  const TemporaryTables* temporary_tables);
278 
279  HashTable* getHashTableForDevice(const size_t device_id) const {
280  CHECK_LT(device_id, hash_tables_for_device_.size());
281  return hash_tables_for_device_[device_id].get();
282  }
283 
284  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
285  CHECK(device_type == ExecutorDeviceType::CPU);
286  return getJoinHashBufferSize(device_type, 0);
287  }
288 
289  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
290  const int device_id) const {
291  auto hash_table = getHashTableForDevice(device_id);
292  if (!hash_table) {
293  return 0;
294  }
295  return hash_table->getHashTableBufferSize(device_type);
296  }
297 
298  int8_t* getJoinHashBuffer(const ExecutorDeviceType device_type,
299  const int device_id) const {
300  // TODO: just make device_id a size_t
301  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
302  if (!hash_tables_for_device_[device_id]) {
303  return nullptr;
304  }
305  CHECK(hash_tables_for_device_[device_id]);
306  auto hash_table = hash_tables_for_device_[device_id].get();
307 #ifdef HAVE_CUDA
308  if (device_type == ExecutorDeviceType::CPU) {
309  return hash_table->getCpuBuffer();
310  } else {
311  CHECK(hash_table);
312  const auto gpu_buff = hash_table->getGpuBuffer();
313  return gpu_buff;
314  }
315 #else
316  CHECK(device_type == ExecutorDeviceType::CPU);
317  return hash_table->getCpuBuffer();
318 #endif
319  }
320 
322  auto empty_hash_tables =
324  hash_tables_for_device_.swap(empty_hash_tables);
325  }
326 
327  static std::vector<int> collectFragmentIds(
328  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments);
329 
331  const std::vector<InnerOuter>& inner_outer_pairs,
332  const Executor* executor,
333  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs = {});
334 
335  static std::vector<const StringDictionaryProxy::IdMap*>
337  const CompositeKeyInfo& composite_key_info,
338  const std::vector<InnerOuterStringOpInfos>& string_op_infos_for_keys,
339  const Executor* executor);
340 
341  static std::pair<const StringDictionaryProxy*, StringDictionaryProxy*>
342  getStrDictProxies(const InnerOuter& cols,
343  const Executor* executor,
344  const bool has_string_ops);
345 
347  const InnerOuter& cols,
348  const InnerOuterStringOpInfos& inner_outer_string_op_infos,
349  ExpressionRange& old_col_range,
350  const Executor* executor);
351 
352  protected:
353  static llvm::Value* codegenColOrStringOper(
354  const Analyzer::Expr* col_or_string_oper,
355  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
356  CodeGenerator& code_generator,
357  const CompilationOptions& co);
358 
359  virtual size_t getComponentBufferSize() const noexcept = 0;
360 
361  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
362 };
363 
364 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
365 
366 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
367 
368 std::ostream& operator<<(std::ostream& os,
369  const InnerOuterStringOpInfos& inner_outer_string_op_infos);
370 std::ostream& operator<<(
371  std::ostream& os,
372  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
373 
374 std::string toString(const InnerOuterStringOpInfos& inner_outer_string_op_infos);
375 
376 std::string toString(
377  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
378 
379 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
380  std::string_view column,
381  int rte_idx,
382  Executor* executor);
383 
384 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
385 
386 size_t get_shard_count(
387  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
388  const Executor* executor);
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:451
Defines data structures for the semantic analysis phase of query processing.
virtual int getInnerTableRteIdx() const noexcept=0
virtual size_t payloadBufferOff() const noexcept=0
virtual std::string getHashJoinType() const =0
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:164
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1033
virtual HashType getHashType() const noexcept=0
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:129
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:127
virtual int getDeviceCount() const noexcept=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:116
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:996
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:35
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
llvm::Value * elements
Definition: HashJoin.h:121
llvm::Value * count
Definition: HashJoin.h:122
virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept=0
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:361
static std::pair< const StringDictionaryProxy *, StringDictionaryProxy * > getStrDictProxies(const InnerOuter &cols, const Executor *executor, const bool has_string_ops)
Definition: HashJoin.cpp:385
Definition: HashTable.h:21
OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
Definition: HashJoin.h:99
virtual llvm::Value * codegenSlot(const CompilationOptions &, const size_t)=0
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:57
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:544
void freeHashBufferMemory()
Definition: HashJoin.h:321
virtual size_t offsetBufferOff() const noexcept=0
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:121
std::string to_string(char const *&&v)
virtual size_t countBufferOff() const noexcept=0
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
JoinHashTableTooBig(size_t cur_hash_table_size, size_t query_hint_size)
Definition: HashJoin.h:39
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:111
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:802
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< void * > sd_outer_proxy_per_key
Definition: HashJoin.h:128
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
HashJoinFail(const std::string &err_msg, InnerQualDecision qual_decision)
Definition: HashJoin.h:75
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:298
static std::vector< const StringDictionaryProxy::IdMap * > translateCompositeStrDictProxies(const CompositeKeyInfo &composite_key_info, const std::vector< InnerOuterStringOpInfos > &string_op_infos_for_keys, const Executor *executor)
Definition: HashJoin.cpp:508
std::string toString(const ExecutorDeviceType &device_type)
virtual int getInnerTableId() const noexcept=0
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:289
virtual size_t getComponentBufferSize() const noexcept=0
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:776
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:112
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
Definition: HashJoin.cpp:413
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:279
#define CHECK_LT(x, y)
Definition: Logger.h:299
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:52
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:164
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:284
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
HashJoinFail(const std::string &err_msg)
Definition: HashJoin.h:73
#define CHECK(condition)
Definition: Logger.h:289
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
Definition: HashJoin.h:107
llvm::Value * slot
Definition: HashJoin.h:123
static const T * getHashJoinColumn(const Analyzer::Expr *expr)
Definition: HashJoin.cpp:793
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:73
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:114
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:672
InnerQualDecision
Definition: HashJoin.h:62
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:113
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1052
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:283
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:560
HashType
Definition: HashTable.h:19
InnerQualDecision inner_qual_decision
Definition: HashJoin.h:78
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:110
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:160
virtual bool isBitwiseEq() const =0
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:460