OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
32 
33 class TooManyHashEntries : public std::runtime_error {
34  public:
36  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
37 
38  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
39 };
40 
41 class TableMustBeReplicated : public std::runtime_error {
42  public:
43  TableMustBeReplicated(const std::string& table_name)
44  : std::runtime_error("Hash join failed: Table '" + table_name +
45  "' must be replicated.") {}
46 };
47 
48 class HashJoinFail : public std::runtime_error {
49  public:
50  HashJoinFail(const std::string& reason) : std::runtime_error(reason) {}
51 };
52 
54  public:
55  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
56 };
57 
59  public:
61  : HashJoinFail("Not enough memory for columns involved in join") {}
62 };
63 
65  public:
66  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
67 };
68 
70  public:
71  OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
72  : HashJoinFail(
73  "Could not create overlaps hash table with less than max allowed size of " +
74  std::to_string(overlaps_hash_table_max_bytes) + " bytes") {}
75 };
76 
77 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
78 
80  const std::vector<JoinColumn> join_columns;
81  const std::vector<JoinColumnTypeInfo> join_column_types;
82  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
83  std::vector<JoinBucketInfo> join_buckets;
84  const std::vector<std::shared_ptr<void>> malloc_owner;
85 
86  void setBucketInfo(const std::vector<double>& bucket_sizes_for_dimension,
87  const std::vector<InnerOuter> inner_outer_pairs);
88 };
89 
91  llvm::Value* elements;
92  llvm::Value* count;
93  llvm::Value* slot;
94 };
95 
97  std::vector<const void*> sd_inner_proxy_per_key;
98  std::vector<const void*> sd_outer_proxy_per_key;
99  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
100 };
101 
102 class DeviceAllocator;
103 
104 class HashJoin {
105  public:
106  virtual std::string toString(const ExecutorDeviceType device_type,
107  const int device_id = 0,
108  bool raw = false) const = 0;
109 
110  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
111  const int device_id) const;
112 
113  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
114  const int device_id) const;
115 
116  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
117  const int device_id) const = 0;
118 
119  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
120 
122  const size_t) = 0;
123 
124  virtual int getInnerTableId() const noexcept = 0;
125 
126  virtual int getInnerTableRteIdx() const noexcept = 0;
127 
128  virtual HashType getHashType() const noexcept = 0;
129 
130  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
131  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
132  }
133 
134  static std::string getHashTypeString(HashType ht) noexcept {
135  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
136  return HashTypeStrings[static_cast<int>(ht)];
137  };
138 
140  const std::vector<llvm::Value*>& hash_join_idx_args_in,
141  const bool is_sharded,
142  const bool col_is_nullable,
143  const bool is_bw_eq,
144  const int64_t sub_buff_size,
145  Executor* executor,
146  const bool is_bucketized = false);
147 
148  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
149 
150  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
151 
152  virtual int getDeviceCount() const noexcept = 0;
153 
154  virtual size_t offsetBufferOff() const noexcept = 0;
155 
156  virtual size_t countBufferOff() const noexcept = 0;
157 
158  virtual size_t payloadBufferOff() const noexcept = 0;
159 
160  virtual std::string getHashJoinType() const = 0;
161 
163  const Analyzer::ColumnVar* hash_col,
164  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
165  const Data_Namespace::MemoryLevel effective_memory_level,
166  const int device_id,
167  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
168  DeviceAllocator* dev_buff_owner,
169  std::vector<std::shared_ptr<void>>& malloc_owner,
170  Executor* executor,
171  ColumnCacheMap* column_cache);
172 
174  static std::shared_ptr<HashJoin> getInstance(
175  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
176  const std::vector<InputTableInfo>& query_infos,
177  const Data_Namespace::MemoryLevel memory_level,
178  const JoinType join_type,
179  const HashType preferred_hash_type,
180  const int device_count,
181  ColumnCacheMap& column_cache,
182  Executor* executor,
183  const HashTableBuildDagMap& hashtable_build_dag_map,
184  const RegisteredQueryHint& query_hint,
185  const TableIdToNodeMap& table_id_to_node_map);
186 
188  static std::shared_ptr<HashJoin> getSyntheticInstance(
189  std::string_view table1,
190  std::string_view column1,
191  std::string_view table2,
192  std::string_view column2,
193  const Data_Namespace::MemoryLevel memory_level,
194  const HashType preferred_hash_type,
195  const int device_count,
196  ColumnCacheMap& column_cache,
197  Executor* executor);
198 
200  static std::shared_ptr<HashJoin> getSyntheticInstance(
201  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
202  const Data_Namespace::MemoryLevel memory_level,
203  const HashType preferred_hash_type,
204  const int device_count,
205  ColumnCacheMap& column_cache,
206  Executor* executor);
207 
208  static std::pair<std::string, std::shared_ptr<HashJoin>> getSyntheticInstance(
209  std::vector<std::shared_ptr<Analyzer::BinOper>>,
210  const Data_Namespace::MemoryLevel memory_level,
211  const HashType preferred_hash_type,
212  const int device_count,
213  ColumnCacheMap& column_cache,
214  Executor* executor);
215 
216  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs) {
217  CHECK(!inner_outer_pairs.empty());
218  const auto first_inner_col = inner_outer_pairs.front().first;
219  return first_inner_col->get_table_id();
220  }
221 
222  static void checkHashJoinReplicationConstraint(const int table_id,
223  const size_t shard_count,
224  const Executor* executor);
225 
226  // Swap the columns if needed and make the inner column the first component.
228  const Analyzer::Expr* rhs,
230  const TemporaryTables* temporary_tables,
231  const bool is_overlaps_join = false);
232 
233  // Normalize each expression tuple
234  static std::vector<InnerOuter> normalizeColumnPairs(
235  const Analyzer::BinOper* condition,
237  const TemporaryTables* temporary_tables);
238 
239  HashTable* getHashTableForDevice(const size_t device_id) const {
240  CHECK_LT(device_id, hash_tables_for_device_.size());
241  return hash_tables_for_device_[device_id].get();
242  }
243 
244  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
245  CHECK(device_type == ExecutorDeviceType::CPU);
246  return getJoinHashBufferSize(device_type, 0);
247  }
248 
249  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
250  const int device_id) const {
251  auto hash_table = getHashTableForDevice(device_id);
252  if (!hash_table) {
253  return 0;
254  }
255  return hash_table->getHashTableBufferSize(device_type);
256  }
257 
258  int8_t* getJoinHashBuffer(const ExecutorDeviceType device_type,
259  const int device_id) const {
260  // TODO: just make device_id a size_t
261  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
262  if (!hash_tables_for_device_[device_id]) {
263  return nullptr;
264  }
265  CHECK(hash_tables_for_device_[device_id]);
266  auto hash_table = hash_tables_for_device_[device_id].get();
267 #ifdef HAVE_CUDA
268  if (device_type == ExecutorDeviceType::CPU) {
269  return hash_table->getCpuBuffer();
270  } else {
271  CHECK(hash_table);
272  const auto gpu_buff = hash_table->getGpuBuffer();
273  return gpu_buff;
274  }
275 #else
276  CHECK(device_type == ExecutorDeviceType::CPU);
277  return hash_table->getCpuBuffer();
278 #endif
279  }
280 
282  auto empty_hash_tables =
284  hash_tables_for_device_.swap(empty_hash_tables);
285  }
286 
288  const std::vector<InnerOuter>& inner_outer_pairs,
289  const Executor* executor);
290 
291  protected:
292  virtual size_t getComponentBufferSize() const noexcept = 0;
293 
294  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
295 };
296 
297 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
298 
299 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
300 
301 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
302  std::string_view column,
303  int rte_idx,
304  Executor* executor);
305 
306 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
307 
308 size_t get_shard_count(
309  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
310  const Executor* executor);
Defines data structures for the semantic analysis phase of query processing.
virtual int getInnerTableRteIdx() const noexcept=0
virtual size_t payloadBufferOff() const noexcept=0
virtual std::string getHashJoinType() const =0
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
JoinType
Definition: sqldefs.h:108
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
virtual HashType getHashType() const noexcept=0
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:99
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:97
virtual int getDeviceCount() const noexcept=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:112
#define const
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:31
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
llvm::Value * elements
Definition: HashJoin.h:91
llvm::Value * count
Definition: HashJoin.h:92
virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept=0
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:294
Definition: HashTable.h:21
OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
Definition: HashJoin.h:71
virtual llvm::Value * codegenSlot(const CompilationOptions &, const size_t)=0
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:43
void freeHashBufferMemory()
Definition: HashJoin.h:281
virtual size_t offsetBufferOff() const noexcept=0
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:117
std::string to_string(char const *&&v)
virtual size_t countBufferOff() const noexcept=0
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:81
HashJoinFail(const std::string &reason)
Definition: HashJoin.h:50
std::vector< const void * > sd_outer_proxy_per_key
Definition: HashJoin.h:98
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:258
virtual int getInnerTableId() const noexcept=0
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:249
virtual size_t getComponentBufferSize() const noexcept=0
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:587
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:82
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:239
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:334
#define CHECK_LT(x, y)
Definition: Logger.h:219
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:38
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:134
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:244
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
static InnerOuter normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:603
static std::vector< InnerOuter > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:744
#define CHECK(condition)
Definition: Logger.h:209
llvm::Value * slot
Definition: HashJoin.h:93
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:68
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:84
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:483
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:83
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:790
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:371
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:80
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:130
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap