OmniSciDB  0264ff685a
HashJoin.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <llvm/IR/Value.h>
20 #include <cstdint>
21 #include <set>
22 #include <string>
23 
24 #include "Analyzer/Analyzer.h"
31 
32 class TooManyHashEntries : public std::runtime_error {
33  public:
35  : std::runtime_error("Hash tables with more than 2B entries not supported yet") {}
36 
37  TooManyHashEntries(const std::string& reason) : std::runtime_error(reason) {}
38 };
39 
40 class TableMustBeReplicated : public std::runtime_error {
41  public:
42  TableMustBeReplicated(const std::string& table_name)
43  : std::runtime_error("Hash join failed: Table '" + table_name +
44  "' must be replicated.") {}
45 };
46 
47 class HashJoinFail : public std::runtime_error {
48  public:
49  HashJoinFail(const std::string& reason) : std::runtime_error(reason) {}
50 };
51 
53  public:
54  NeedsOneToManyHash() : HashJoinFail("Needs one to many hash") {}
55 };
56 
58  public:
60  : HashJoinFail("Not enough memory for columns involved in join") {}
61 };
62 
64  public:
65  FailedToJoinOnVirtualColumn() : HashJoinFail("Cannot join on rowid") {}
66 };
67 
69  const std::vector<JoinColumn> join_columns;
70  const std::vector<JoinColumnTypeInfo> join_column_types;
71  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
72  std::vector<JoinBucketInfo> join_buckets;
73  const std::vector<std::shared_ptr<void>> malloc_owner;
74 };
75 
77  llvm::Value* elements;
78  llvm::Value* count;
79  llvm::Value* slot;
80 };
81 
82 using InnerOuter = std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>;
83 
85  std::vector<const void*> sd_inner_proxy_per_key;
86  std::vector<const void*> sd_outer_proxy_per_key;
87  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
88 };
89 
90 class DeviceAllocator;
91 
92 class HashJoin {
93  public:
94  virtual std::string toString(const ExecutorDeviceType device_type,
95  const int device_id = 0,
96  bool raw = false) const = 0;
97 
98  virtual std::string toStringFlat64(const ExecutorDeviceType device_type,
99  const int device_id) const;
100 
101  virtual std::string toStringFlat32(const ExecutorDeviceType device_type,
102  const int device_id) const;
103 
104  virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type,
105  const int device_id) const = 0;
106 
107  virtual llvm::Value* codegenSlot(const CompilationOptions&, const size_t) = 0;
108 
109  virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions&,
110  const size_t) = 0;
111 
112  virtual int getInnerTableId() const noexcept = 0;
113 
114  virtual int getInnerTableRteIdx() const noexcept = 0;
115 
116  virtual HashType getHashType() const noexcept = 0;
117 
118  static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept {
119  return (layout == HashType::ManyToMany || layout == HashType::OneToMany);
120  }
121 
122  static std::string getHashTypeString(HashType ht) noexcept {
123  const char* HashTypeStrings[3] = {"OneToOne", "OneToMany", "ManyToMany"};
124  return HashTypeStrings[static_cast<int>(ht)];
125  };
126 
127  static HashJoinMatchingSet codegenMatchingSet(
128  const std::vector<llvm::Value*>& hash_join_idx_args_in,
129  const bool is_sharded,
130  const bool col_is_nullable,
131  const bool is_bw_eq,
132  const int64_t sub_buff_size,
133  Executor* executor,
134  const bool is_bucketized = false);
135 
136  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
137 
138  virtual Data_Namespace::MemoryLevel getMemoryLevel() const noexcept = 0;
139 
140  virtual int getDeviceCount() const noexcept = 0;
141 
142  virtual size_t offsetBufferOff() const noexcept = 0;
143 
144  virtual size_t countBufferOff() const noexcept = 0;
145 
146  virtual size_t payloadBufferOff() const noexcept = 0;
147 
148  virtual std::string getHashJoinType() const = 0;
149 
150  JoinColumn fetchJoinColumn(
151  const Analyzer::ColumnVar* hash_col,
152  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
153  const Data_Namespace::MemoryLevel effective_memory_level,
154  const int device_id,
155  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
156  DeviceAllocator* dev_buff_owner,
157  std::vector<std::shared_ptr<void>>& malloc_owner,
158  Executor* executor,
159  ColumnCacheMap* column_cache);
160 
162  static std::shared_ptr<HashJoin> getInstance(
163  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
164  const std::vector<InputTableInfo>& query_infos,
165  const Data_Namespace::MemoryLevel memory_level,
166  const HashType preferred_hash_type,
167  const int device_count,
168  ColumnCacheMap& column_cache,
169  Executor* executor,
170  const QueryHint& query_hint);
171 
173  static std::shared_ptr<HashJoin> getSyntheticInstance(
174  std::string_view table1,
175  std::string_view column1,
176  std::string_view table2,
177  std::string_view column2,
178  const Data_Namespace::MemoryLevel memory_level,
179  const HashType preferred_hash_type,
180  const int device_count,
181  ColumnCacheMap& column_cache,
182  Executor* executor);
183 
185  static std::shared_ptr<HashJoin> getSyntheticInstance(
186  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
187  const Data_Namespace::MemoryLevel memory_level,
188  const HashType preferred_hash_type,
189  const int device_count,
190  ColumnCacheMap& column_cache,
191  Executor* executor);
192 
193  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs) {
194  CHECK(!inner_outer_pairs.empty());
195  const auto first_inner_col = inner_outer_pairs.front().first;
196  return first_inner_col->get_table_id();
197  }
198 
199  static void checkHashJoinReplicationConstraint(const int table_id,
200  const size_t shard_count,
201  const Executor* executor);
202 
203  HashTable* getHashTableForDevice(const size_t device_id) const {
204  CHECK_LT(device_id, hash_tables_for_device_.size());
205  return hash_tables_for_device_[device_id].get();
206  }
207 
208  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type) {
209  CHECK(device_type == ExecutorDeviceType::CPU);
210  return getJoinHashBufferSize(device_type, 0);
211  }
212 
213  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
214  const int device_id) const {
215  auto hash_table = getHashTableForDevice(device_id);
216  if (!hash_table) {
217  return 0;
218  }
219  return hash_table->getHashTableBufferSize(device_type);
220  }
221 
222  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
223  const int device_id) const {
224  // TODO: just make device_id a size_t
225  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
226  if (!hash_tables_for_device_[device_id]) {
227  return 0;
228  }
229  CHECK(hash_tables_for_device_[device_id]);
230  auto hash_table = hash_tables_for_device_[device_id].get();
231 #ifdef HAVE_CUDA
232  if (device_type == ExecutorDeviceType::CPU) {
233  return reinterpret_cast<int64_t>(hash_table->getCpuBuffer());
234  } else {
235  CHECK(hash_table);
236  const auto gpu_buff = hash_table->getGpuBuffer();
237  return reinterpret_cast<CUdeviceptr>(gpu_buff);
238  }
239 #else
240  CHECK(device_type == ExecutorDeviceType::CPU);
241  return reinterpret_cast<int64_t>(hash_table->getCpuBuffer());
242 #endif
243  }
244 
246  auto empty_hash_tables =
247  decltype(hash_tables_for_device_)(hash_tables_for_device_.size());
248  hash_tables_for_device_.swap(empty_hash_tables);
249  }
250 
251  static CompositeKeyInfo getCompositeKeyInfo(
252  const std::vector<InnerOuter>& inner_outer_pairs,
253  const Executor* executor);
254 
255  protected:
256  virtual size_t getComponentBufferSize() const noexcept = 0;
257 
258  std::vector<std::shared_ptr<HashTable>> hash_tables_for_device_;
259 };
260 
261 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferEntry& e);
262 
263 std::ostream& operator<<(std::ostream& os, const DecodedJoinHashBufferSet& s);
264 
265 std::shared_ptr<Analyzer::ColumnVar> getSyntheticColumnVar(std::string_view table,
266  std::string_view column,
267  int rte_idx,
268  Executor* executor);
269 
270 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
271 
272 size_t get_shard_count(
273  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
274  const Executor* executor);
275 
276 // Swap the columns if needed and make the inner column the first component.
278  const Analyzer::Expr* rhs,
280  const TemporaryTables* temporary_tables,
281  const bool is_overlaps_join = false);
282 
283 // Normalize each expression tuple
284 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
286  const TemporaryTables* temporary_tables);
Defines data structures for the semantic analysis phase of query processing.
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:213
std::string toString(const ExtArgumentType &sig_type)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:533
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:82
ExecutorDeviceType
std::vector< ChunkKey > cache_key_chunks
Definition: HashJoin.h:87
std::vector< const void * > sd_inner_proxy_per_key
Definition: HashJoin.h:85
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
llvm::Value * elements
Definition: HashJoin.h:77
llvm::Value * count
Definition: HashJoin.h:78
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:258
Definition: HashTable.h:21
TableMustBeReplicated(const std::string &table_name)
Definition: HashJoin.h:42
void freeHashBufferMemory()
Definition: HashJoin.h:245
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
Definition: HashJoin.cpp:550
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
Definition: HashJoin.cpp:336
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:70
std::ostream & operator<<(std::ostream &os, const DecodedJoinHashBufferEntry &e)
Definition: HashJoin.cpp:102
HashJoinFail(const std::string &reason)
Definition: HashJoin.h:49
std::string cat(Ts &&... args)
std::vector< const void * > sd_outer_proxy_per_key
Definition: HashJoin.h:86
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:222
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:203
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
Definition: HashJoin.h:71
#define CHECK_LT(x, y)
Definition: Logger.h:207
TooManyHashEntries(const std::string &reason)
Definition: HashJoin.h:37
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:122
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
Definition: HashJoin.h:208
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
static int getInnerTableId(const std::vector< InnerOuter > &inner_outer_pairs)
Definition: HashJoin.h:193
#define CHECK(condition)
Definition: Logger.h:197
llvm::Value * slot
Definition: HashJoin.h:79
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:684
const std::vector< std::shared_ptr< void > > malloc_owner
Definition: HashJoin.h:73
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:72
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:69
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:118