OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
33 #include "StringOps/StringOpInfo.h"
34 
35 class CodeGenerator;
36 
37 class TooManyHashEntries : public std::runtime_error {
38  public:
40  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
41 
42  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
43 };
44 
45 class TableMustBeReplicated : public std::runtime_error {
46  public:
47  TableMustBeReplicated(const std::string& table_name)
48  : std::runtime_error("Hash join failed: Table '" + table_name +
49  "' must be replicated.") {}
50 };
51 
52 enum class InnerQualDecision { IGNORE = 0, UNKNOWN, LHS, RHS };
53 
54 #ifndef __CUDACC__
55 inline std::ostream& operator<<(std::ostream& os, InnerQualDecision const decision) {
56  constexpr char const* strings[]{"IGNORE", "UNKNOWN", "LHS", "RHS"};
57  return os << strings[static_cast<int>(decision)];
58 }
59 #endif
60 
61 class HashJoinFail : public std::runtime_error {
62  public:
63  HashJoinFail(const std::string& err_msg)
64  : std::runtime_error(err_msg), inner_qual_decision(InnerQualDecision::UNKNOWN) {}
65  HashJoinFail(const std::string& err_msg, InnerQualDecision qual_decision)
66  : std::runtime_error(err_msg), inner_qual_decision(qual_decision) {}
67 
69 };
70 
72  public:
73  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
74 };
75 
77  public:
79  : HashJoinFail("Not enough memory for columns involved in join") {}
80 };
81 
83  public:
84  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
85 };
86 
88  public:
89  OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
90  : HashJoinFail(
91  "Could not create overlaps hash table with less than max allowed size of " +
92  std::to_string(overlaps_hash_table_max_bytes) + " bytes") {}
93 };
94 
95 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
96 using InnerOuterStringOpInfos = std::pair<std::vector<StringOps_Namespace::StringOpInfo>,
97  std::vector<StringOps_Namespace::StringOpInfo>>;
98 
100  const std::vector<JoinColumn> join_columns;
101  const std::vector<JoinColumnTypeInfo> join_column_types;
102  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
103  std::vector<JoinBucketInfo> join_buckets;
104  const std::vector<std::shared_ptr<void>> malloc_owner;
105 
106  void setBucketInfo(const std::vector<double>& bucket_sizes_for_dimension,
107  const std::vector<InnerOuter> inner_outer_pairs);
108 };
109 
111  llvm::Value* elements;
112  llvm::Value* count;
113  llvm::Value* slot;
114 };
115 
117  std::vector<const void*> sd_inner_proxy_per_key;
118  std::vector<void*> sd_outer_proxy_per_key;
119  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
120 };
121 
122 class DeviceAllocator;
123 
124 class HashJoin {
125  public:
126  virtual std::string toString(const ExecutorDeviceType device_type,
127  const int device_id = 0,
128  bool raw = false) const = 0;
129 
130  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
131  const int device_id) const;
132 
133  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
134  const int device_id) const;
135 
136  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
137  const int device_id) const = 0;
138 
139  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
140 
142  const size_t) = 0;
143 
144  virtual int getInnerTableId() const noexcept = 0;
145 
146  virtual int getInnerTableRteIdx() const noexcept = 0;
147 
148  virtual HashType getHashType() const noexcept = 0;
149 
150  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
151  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
152  }
153 
154  static std::string getHashTypeString(HashType ht) noexcept {
155  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
156  return HashTypeStrings[static_cast<int>(ht)];
157  };
158 
160  const std::vector<llvm::Value*>& hash_join_idx_args_in,
161  const bool is_sharded,
162  const bool col_is_nullable,
163  const bool is_bw_eq,
164  const int64_t sub_buff_size,
165  Executor* executor,
166  const bool is_bucketized = false);
167 
168  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
169 
170  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
171 
172  virtual int getDeviceCount() const noexcept = 0;
173 
174  virtual size_t offsetBufferOff() const noexcept = 0;
175 
176  virtual size_t countBufferOff() const noexcept = 0;
177 
178  virtual size_t payloadBufferOff() const noexcept = 0;
179 
180  virtual std::string getHashJoinType() const = 0;
181 
182  virtual bool isBitwiseEq() const = 0;
183 
185  const Analyzer::ColumnVar* hash_col,
186  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
187  const Data_Namespace::MemoryLevel effective_memory_level,
188  const int device_id,
189  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
190  DeviceAllocator* dev_buff_owner,
191  std::vector<std::shared_ptr<void>>& malloc_owner,
192  Executor* executor,
193  ColumnCacheMap* column_cache);
194 
196  static std::shared_ptr<HashJoin> getInstance(
197  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
198  const std::vector<InputTableInfo>& query_infos,
199  const Data_Namespace::MemoryLevel memory_level,
200  const JoinType join_type,
201  const HashType preferred_hash_type,
202  const int device_count,
203  ColumnCacheMap& column_cache,
204  Executor* executor,
205  const HashTableBuildDagMap& hashtable_build_dag_map,
206  const RegisteredQueryHint& query_hint,
207  const TableIdToNodeMap& table_id_to_node_map);
208 
210  static std::shared_ptr<HashJoin> getSyntheticInstance(
211  std::string_view table1,
212  std::string_view column1,
213  std::string_view table2,
214  std::string_view column2,
215  const Data_Namespace::MemoryLevel memory_level,
216  const HashType preferred_hash_type,
217  const int device_count,
218  ColumnCacheMap& column_cache,
219  Executor* executor);
220 
222  static std::shared_ptr<HashJoin> getSyntheticInstance(
223  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
224  const Data_Namespace::MemoryLevel memory_level,
225  const HashType preferred_hash_type,
226  const int device_count,
227  ColumnCacheMap& column_cache,
228  Executor* executor);
229 
230  static std::pair<std::string, std::shared_ptr<HashJoin>> getSyntheticInstance(
231  std::vector<std::shared_ptr<Analyzer::BinOper>>,
232  const Data_Namespace::MemoryLevel memory_level,
233  const HashType preferred_hash_type,
234  const int device_count,
235  ColumnCacheMap& column_cache,
236  Executor* executor);
237 
238  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs) {
239  CHECK(!inner_outer_pairs.empty());
240  const auto first_inner_col = inner_outer_pairs.front().first;
241  return first_inner_col->get_table_id();
242  }
243 
244  static bool canAccessHashTable(bool allow_hash_table_recycling,
245  bool invalid_cache_key,
246  JoinType join_type);
247 
248  static void checkHashJoinReplicationConstraint(const int table_id,
249  const size_t shard_count,
250  const Executor* executor);
251 
252  // Swap the columns if needed and make the inner column the first component.
253  static std::pair<InnerOuter, InnerOuterStringOpInfos> normalizeColumnPair(
254  const Analyzer::Expr* lhs,
255  const Analyzer::Expr* rhs,
257  const TemporaryTables* temporary_tables,
258  const bool is_overlaps_join = false);
259 
260  template <typename T>
261  static const T* getHashJoinColumn(const Analyzer::Expr* expr);
262 
263  // Normalize each expression tuple
264  static std::pair<std::vector<InnerOuter>, std::vector<InnerOuterStringOpInfos>>
265  normalizeColumnPairs(const Analyzer::BinOper* condition,
267  const TemporaryTables* temporary_tables);
268 
269  HashTable* getHashTableForDevice(const size_t device_id) const {
270  CHECK_LT(device_id, hash_tables_for_device_.size());
271  return hash_tables_for_device_[device_id].get();
272  }
273 
274  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
275  CHECK(device_type == ExecutorDeviceType::CPU);
276  return getJoinHashBufferSize(device_type, 0);
277  }
278 
279  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
280  const int device_id) const {
281  auto hash_table = getHashTableForDevice(device_id);
282  if (!hash_table) {
283  return 0;
284  }
285  return hash_table->getHashTableBufferSize(device_type);
286  }
287 
288  int8_t* getJoinHashBuffer(const ExecutorDeviceType device_type,
289  const int device_id) const {
290  // TODO: just make device_id a size_t
291  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
292  if (!hash_tables_for_device_[device_id]) {
293  return nullptr;
294  }
295  CHECK(hash_tables_for_device_[device_id]);
296  auto hash_table = hash_tables_for_device_[device_id].get();
297 #ifdef HAVE_CUDA
298  if (device_type == ExecutorDeviceType::CPU) {
299  return hash_table->getCpuBuffer();
300  } else {
301  CHECK(hash_table);
302  const auto gpu_buff = hash_table->getGpuBuffer();
303  return gpu_buff;
304  }
305 #else
306  CHECK(device_type == ExecutorDeviceType::CPU);
307  return hash_table->getCpuBuffer();
308 #endif
309  }
310 
312  auto empty_hash_tables =
314  hash_tables_for_device_.swap(empty_hash_tables);
315  }
316 
317  static std::vector<int> collectFragmentIds(
318  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments);
319 
321  const std::vector<InnerOuter>& inner_outer_pairs,
322  const Executor* executor,
323  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs = {});
324 
325  static std::vector<const StringDictionaryProxy::IdMap*>
327  const CompositeKeyInfo& composite_key_info,
328  const std::vector<InnerOuterStringOpInfos>& string_op_infos_for_keys,
329  const Executor* executor);
330 
331  static std::pair<const StringDictionaryProxy*, StringDictionaryProxy*>
332  getStrDictProxies(const InnerOuter& cols,
333  const Executor* executor,
334  const bool has_string_ops);
335 
337  const InnerOuter& cols,
338  const InnerOuterStringOpInfos& inner_outer_string_op_infos,
339  ExpressionRange& old_col_range,
340  const Executor* executor);
341 
342  protected:
343  static llvm::Value* codegenColOrStringOper(
344  const Analyzer::Expr* col_or_string_oper,
345  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
346  CodeGenerator& code_generator,
347  const CompilationOptions& co);
348 
349  virtual size_t getComponentBufferSize() const noexcept = 0;
350 
351  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
352 };
353 
354 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
355 
356 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
357 
358 std::ostream& operator<<(std::ostream& os,
359  const InnerOuterStringOpInfos& inner_outer_string_op_infos);
360 std::ostream& operator<<(
361  std::ostream& os,
362  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
363 
364 std::string toString(const InnerOuterStringOpInfos& inner_outer_string_op_infos);
365 
366 std::string toString(
367  const std::vector<InnerOuterStringOpInfos>& inner_outer_string_op_infos_pairs);
368 
369 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
370  std::string_view column,
371  int rte_idx,
372  Executor* executor);
373 
374 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
375 
376 size_t get_shard_count(
377  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
378  const Executor* executor);
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:446
Defines data structures for the semantic analysis phase of query processing.
virtual int getInnerTableRteIdx() const noexcept=0
virtual size_t payloadBufferOff() const noexcept=0
virtual std::string getHashJoinType() const =0
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:151
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:257
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:95
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
Definition: HashJoin.cpp:1026
virtual HashType getHashType() const noexcept=0
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:119
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:117
virtual int getDeviceCount() const noexcept=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:116
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:989
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:35
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:58
llvm::Value * elements
Definition: HashJoin.h:111
llvm::Value * count
Definition: HashJoin.h:112
virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept=0
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:351
static std::pair< const StringDictionaryProxy *, StringDictionaryProxy * > getStrDictProxies(const InnerOuter &cols, const Executor *executor, const bool has_string_ops)
Definition: HashJoin.cpp:380
Definition: HashTable.h:21
OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
Definition: HashJoin.h:89
virtual llvm::Value * codegenSlot(const CompilationOptions &, const size_t)=0
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:47
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
Definition: HashJoin.cpp:539
void freeHashBufferMemory()
Definition: HashJoin.h:311
virtual size_t offsetBufferOff() const noexcept=0
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:121
std::string to_string(char const *&&v)
virtual size_t countBufferOff() const noexcept=0
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:101
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:797
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
std::vector< void * > sd_outer_proxy_per_key
Definition: HashJoin.h:118
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
HashJoinFail(const std::string &err_msg, InnerQualDecision qual_decision)
Definition: HashJoin.h:65
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:288
static std::vector< const StringDictionaryProxy::IdMap * > translateCompositeStrDictProxies(const CompositeKeyInfo &composite_key_info, const std::vector< InnerOuterStringOpInfos > &string_op_infos_for_keys, const Executor *executor)
Definition: HashJoin.cpp:503
virtual int getInnerTableId() const noexcept=0
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:279
virtual size_t getComponentBufferSize() const noexcept=0
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:771
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:102
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
Definition: HashJoin.cpp:408
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:269
#define CHECK_LT(x, y)
Definition: Logger.h:232
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:42
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:154
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:274
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
HashJoinFail(const std::string &err_msg)
Definition: HashJoin.h:63
#define CHECK(condition)
Definition: Logger.h:222
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
Definition: HashJoin.h:97
llvm::Value * slot
Definition: HashJoin.h:113
static const T * getHashJoinColumn(const Analyzer::Expr *expr)
Definition: HashJoin.cpp:788
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:73
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:104
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:667
InnerQualDecision
Definition: HashJoin.h:52
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:103
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:1045
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:283
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:555
HashType
Definition: HashTable.h:19
InnerQualDecision inner_qual_decision
Definition: HashJoin.h:68
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:100
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:150
virtual bool isBitwiseEq() const =0
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs={})
Definition: HashJoin.cpp:455