OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
31 
32 class TooManyHashEntries : public std::runtime_error {
33  public:
35  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
36 
37  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
38 };
39 
40 class TableMustBeReplicated : public std::runtime_error {
41  public:
42  TableMustBeReplicated(const std::string& table_name)
43  : std::runtime_error("Hash join failed: Table '" + table_name +
44  "' must be replicated.") {}
45 };
46 
47 class HashJoinFail : public std::runtime_error {
48  public:
49  HashJoinFail(const std::string& reason) : std::runtime_error(reason) {}
50 };
51 
53  public:
54  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
55 };
56 
58  public:
60  : HashJoinFail("Not enough memory for columns involved in join") {}
61 };
62 
64  public:
65  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
66 };
67 
69  public:
70  OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
71  : HashJoinFail(
72  "Could not create overlaps hash table with less than max allowed size of " +
73  std::to_string(overlaps_hash_table_max_bytes) + " bytes") {}
74 };
75 
76 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
77 
79  const std::vector<JoinColumn> join_columns;
80  const std::vector<JoinColumnTypeInfo> join_column_types;
81  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
82  std::vector<JoinBucketInfo> join_buckets;
83  const std::vector<std::shared_ptr<void>> malloc_owner;
84 
85  void setBucketInfo(const std::vector<double>& bucket_sizes_for_dimension,
86  const std::vector<InnerOuter> inner_outer_pairs);
87 };
88 
90  llvm::Value* elements;
91  llvm::Value* count;
92  llvm::Value* slot;
93 };
94 
96  std::vector<const void*> sd_inner_proxy_per_key;
97  std::vector<const void*> sd_outer_proxy_per_key;
98  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
99 };
100 
101 class DeviceAllocator;
102 
103 class HashJoin {
104  public:
105  virtual std::string toString(const ExecutorDeviceType device_type,
106  const int device_id = 0,
107  bool raw = false) const = 0;
108 
109  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
110  const int device_id) const;
111 
112  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
113  const int device_id) const;
114 
115  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
116  const int device_id) const = 0;
117 
118  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
119 
121  const size_t) = 0;
122 
123  virtual int getInnerTableId() const noexcept = 0;
124 
125  virtual int getInnerTableRteIdx() const noexcept = 0;
126 
127  virtual HashType getHashType() const noexcept = 0;
128 
129  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
130  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
131  }
132 
133  static std::string getHashTypeString(HashType ht) noexcept {
134  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
135  return HashTypeStrings[static_cast<int>(ht)];
136  };
137 
139  const std::vector<llvm::Value*>& hash_join_idx_args_in,
140  const bool is_sharded,
141  const bool col_is_nullable,
142  const bool is_bw_eq,
143  const int64_t sub_buff_size,
144  Executor* executor,
145  const bool is_bucketized = false);
146 
147  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
148 
149  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
150 
151  virtual int getDeviceCount() const noexcept = 0;
152 
153  virtual size_t offsetBufferOff() const noexcept = 0;
154 
155  virtual size_t countBufferOff() const noexcept = 0;
156 
157  virtual size_t payloadBufferOff() const noexcept = 0;
158 
159  virtual std::string getHashJoinType() const = 0;
160 
162  const Analyzer::ColumnVar* hash_col,
163  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
164  const Data_Namespace::MemoryLevel effective_memory_level,
165  const int device_id,
166  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
167  DeviceAllocator* dev_buff_owner,
168  std::vector<std::shared_ptr<void>>& malloc_owner,
169  Executor* executor,
170  ColumnCacheMap* column_cache);
171 
173  static std::shared_ptr<HashJoin> getInstance(
174  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
175  const std::vector<InputTableInfo>& query_infos,
176  const Data_Namespace::MemoryLevel memory_level,
177  const HashType preferred_hash_type,
178  const int device_count,
179  ColumnCacheMap& column_cache,
180  Executor* executor,
181  const RegisteredQueryHint& query_hint);
182 
184  static std::shared_ptr<HashJoin> getSyntheticInstance(
185  std::string_view table1,
186  std::string_view column1,
187  std::string_view table2,
188  std::string_view column2,
189  const Data_Namespace::MemoryLevel memory_level,
190  const HashType preferred_hash_type,
191  const int device_count,
192  ColumnCacheMap& column_cache,
193  Executor* executor);
194 
196  static std::shared_ptr<HashJoin> getSyntheticInstance(
197  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
198  const Data_Namespace::MemoryLevel memory_level,
199  const HashType preferred_hash_type,
200  const int device_count,
201  ColumnCacheMap& column_cache,
202  Executor* executor);
203 
204  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs) {
205  CHECK(!inner_outer_pairs.empty());
206  const auto first_inner_col = inner_outer_pairs.front().first;
207  return first_inner_col->get_table_id();
208  }
209 
210  static void checkHashJoinReplicationConstraint(const int table_id,
211  const size_t shard_count,
212  const Executor* executor);
213 
214  HashTable* getHashTableForDevice(const size_t device_id) const {
215  CHECK_LT(device_id, hash_tables_for_device_.size());
216  return hash_tables_for_device_[device_id].get();
217  }
218 
219  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
220  CHECK(device_type == ExecutorDeviceType::CPU);
221  return getJoinHashBufferSize(device_type, 0);
222  }
223 
224  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
225  const int device_id) const {
226  auto hash_table = getHashTableForDevice(device_id);
227  if (!hash_table) {
228  return 0;
229  }
230  return hash_table->getHashTableBufferSize(device_type);
231  }
232 
233  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
234  const int device_id) const {
235  // TODO: just make device_id a size_t
236  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
237  if (!hash_tables_for_device_[device_id]) {
238  return 0;
239  }
240  CHECK(hash_tables_for_device_[device_id]);
241  auto hash_table = hash_tables_for_device_[device_id].get();
242 #ifdef HAVE_CUDA
243  if (device_type == ExecutorDeviceType::CPU) {
244  return reinterpret_cast<int64_t>(hash_table->getCpuBuffer());
245  } else {
246  CHECK(hash_table);
247  const auto gpu_buff = hash_table->getGpuBuffer();
248  return reinterpret_cast<CUdeviceptr>(gpu_buff);
249  }
250 #else
251  CHECK(device_type == ExecutorDeviceType::CPU);
252  return reinterpret_cast<int64_t>(hash_table->getCpuBuffer());
253 #endif
254  }
255 
257  auto empty_hash_tables =
259  hash_tables_for_device_.swap(empty_hash_tables);
260  }
261 
263  const std::vector<InnerOuter>& inner_outer_pairs,
264  const Executor* executor);
265 
266  protected:
267  virtual size_t getComponentBufferSize() const noexcept = 0;
268 
269  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
270 };
271 
272 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
273 
274 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
275 
276 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
277  std::string_view column,
278  int rte_idx,
279  Executor* executor);
280 
281 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
282 
283 size_t get_shard_count(
284  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
285  const Executor* executor);
286 
287 // Swap the columns if needed and make the inner column the first component.
288 InnerOuter normalize_column_pair(const Analyzer::Expr* lhs,
289  const Analyzer::Expr* rhs,
290  const Catalog_Namespace::Catalog& cat,
291  const TemporaryTables* temporary_tables,
292  const bool is_overlaps_join = false);
293 
294 // Normalize each expression tuple
295 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
296  const Catalog_Namespace::Catalog& cat,
297  const TemporaryTables* temporary_tables);
Defines data structures for the semantic analysis phase of query processing.
virtual int getInnerTableRteIdx() const noexcept=0
virtual size_t payloadBufferOff() const noexcept=0
virtual std::string getHashJoinType() const =0
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:233
std::string cat(Ts &&...args)
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:215
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:76
virtual HashType getHashType() const noexcept=0
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:98
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:96
virtual int getDeviceCount() const noexcept=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:112
#define const
void setBucketInfo(const std::vector< double > &bucket_sizes_for_dimension, const std::vector< InnerOuter > inner_outer_pairs)
Definition: HashJoin.cpp:31
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
llvm::Value * elements
Definition: HashJoin.h:90
llvm::Value * count
Definition: HashJoin.h:91
unsigned long long CUdeviceptr
Definition: nocuda.h:27
virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept=0
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:269
Definition: HashTable.h:21
OverlapsHashTableTooBig(const size_t overlaps_hash_table_max_bytes)
Definition: HashJoin.h:70
virtual llvm::Value * codegenSlot(const CompilationOptions &, const size_t)=0
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:42
void freeHashBufferMemory()
Definition: HashJoin.h:256
virtual size_t offsetBufferOff() const noexcept=0
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.cpp:117
std::string to_string(char const *&&v)
virtual size_t countBufferOff() const noexcept=0
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:80
HashJoinFail(const std::string &reason)
Definition: HashJoin.h:49
std::vector< const void * > sd_outer_proxy_per_key
Definition: HashJoin.h:97
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
Definition: HashJoin.cpp:570
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:705
virtual int getInnerTableId() const noexcept=0
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:224
virtual size_t getComponentBufferSize() const noexcept=0
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:525
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:81
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:214
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:319
#define CHECK_LT(x, y)
Definition: Logger.h:213
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:37
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
Definition: HashJoin.cpp:238
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:133
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:219
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:203
llvm::Value * slot
Definition: HashJoin.h:92
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:68
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:83
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
Definition: HashJoin.cpp:468
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:82
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:553
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:356
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:79
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:129