OmniSciDB  04ee39c94c
JoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file JoinHashTable.h
19  * @author Alex Suhan <alex@mapd.com>
20  *
21  * Copyright (c) 2015 MapD Technologies, Inc. All rights reserved.
22  */
23 
24 #ifndef QUERYENGINE_JOINHASHTABLE_H
25 #define QUERYENGINE_JOINHASHTABLE_H
26 
27 #include "../Analyzer/Analyzer.h"
28 #include "../Catalog/Catalog.h"
29 #include "../Chunk/Chunk.h"
30 #include "../Shared/ExperimentalTypeUtilities.h"
32 #include "ColumnarResults.h"
35 #include "ExpressionRange.h"
36 #include "InputMetadata.h"
37 #include "JoinHashTableInterface.h"
38 
39 #include <llvm/IR/Value.h>
40 
41 #ifdef HAVE_CUDA
42 #include <cuda.h>
43 #endif
44 #include <functional>
45 #include <memory>
46 #include <mutex>
47 #include <stdexcept>
48 
49 class Executor;
50 struct HashEntryInfo;
51 
53  public:
54  static std::shared_ptr<JoinHashTable> getInstance(
55  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
56  const std::vector<InputTableInfo>& query_infos,
57  const Data_Namespace::MemoryLevel memory_level,
58  const HashType preferred_hash_type,
59  const int device_count,
60  ColumnCacheMap& column_cache,
61  Executor* executor);
62 
63  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
64  const int device_id) noexcept override {
65  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
66  return 0;
67  }
68 #ifdef HAVE_CUDA
69  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
70  if (device_type == ExecutorDeviceType::CPU) {
71  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
72  } else {
73  return gpu_hash_table_buff_[device_id]
74  ? reinterpret_cast<CUdeviceptr>(
75  gpu_hash_table_buff_[device_id]->getMemoryPtr())
76  : reinterpret_cast<CUdeviceptr>(nullptr);
77  }
78 #else
79  CHECK(device_type == ExecutorDeviceType::CPU);
80  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
81 #endif
82  }
83 
84  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
85 
87  const size_t) override;
88 
89  int getInnerTableId() const noexcept override {
90  return col_var_.get()->get_table_id();
91  };
92 
93  int getInnerTableRteIdx() const noexcept override {
94  return col_var_.get()->get_rte_idx();
95  };
96 
97  HashType getHashType() const noexcept override { return hash_type_; }
98 
99  size_t offsetBufferOff() const noexcept override;
100 
101  size_t countBufferOff() const noexcept override;
102 
103  size_t payloadBufferOff() const noexcept override;
104 
106  const std::vector<llvm::Value*>& hash_join_idx_args_in,
107  const bool is_sharded,
108  const bool col_is_nullable,
109  const bool is_bw_eq,
110  const int64_t sub_buff_size,
111  Executor* executor,
112  const bool is_bucketized = false);
113 
114  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
115 
116  static auto yieldCacheInvalidator() -> std::function<void()> {
117  return []() -> void {
118  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
119  join_hash_table_cache_.clear();
120  };
121  }
122 
123  virtual ~JoinHashTable() {}
124 
125  private:
126  JoinHashTable(const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
127  const Analyzer::ColumnVar* col_var,
128  const std::vector<InputTableInfo>& query_infos,
129  const Data_Namespace::MemoryLevel memory_level,
130  const HashType preferred_hash_type,
131  const ExpressionRange& col_range,
132  ColumnCacheMap& column_cache,
133  Executor* executor,
134  const int device_count)
135  : qual_bin_oper_(qual_bin_oper)
136  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
137  , query_infos_(query_infos)
138  , memory_level_(memory_level)
139  , hash_type_(preferred_hash_type)
140  , hash_entry_count_(0)
141  , col_range_(col_range)
142  , executor_(executor)
143  , column_cache_(column_cache)
144  , device_count_(device_count) {
146  }
147 
148  std::pair<const int8_t*, size_t> getOneColumnFragment(
149  const Analyzer::ColumnVar& hash_col,
150  const Fragmenter_Namespace::FragmentInfo& fragment,
151  const Data_Namespace::MemoryLevel effective_mem_lvl,
152  const int device_id,
153  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
154 
155  std::pair<const int8_t*, size_t> getAllColumnFragments(
156  const Analyzer::ColumnVar& hash_col,
157  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
158  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
159 
161  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
162  const Analyzer::Expr* outer_col,
163  const Analyzer::ColumnVar* inner_col) const;
164 
165  void reify(const int device_count);
167  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
168  const int device_id);
170  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
171  const int device_id);
172  void checkHashJoinReplicationConstraint(const int table_id) const;
174  const ChunkKey& chunk_key,
175  const int8_t* col_buff,
176  const size_t num_elements,
177  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
178  const Data_Namespace::MemoryLevel effective_memory_level,
179  const int device_id);
181  const ChunkKey& chunk_key,
182  const int8_t* col_buff,
183  const size_t num_elements,
184  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
185  const Data_Namespace::MemoryLevel effective_memory_level,
186  const int device_id);
188  const ChunkKey& chunk_key,
189  const size_t num_elements,
190  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
192  const ChunkKey& chunk_key,
193  const size_t num_elements,
194  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
195  void initHashTableOnCpu(
196  const int8_t* col_buff,
197  const size_t num_elements,
198  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
199  const HashEntryInfo hash_entry_info,
200  const int32_t hash_join_invalid_val);
202  const int8_t* col_buff,
203  const size_t num_elements,
204  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
205  const HashEntryInfo hash_entry_info,
206  const int32_t hash_join_invalid_val);
207 
208  const InputTableInfo& getInnerQueryInfo(const Analyzer::ColumnVar* inner_col) const;
209 
210  size_t shardCount() const;
211 
212  llvm::Value* codegenHashTableLoad(const size_t table_idx);
213 
214  std::vector<llvm::Value*> getHashJoinArgs(llvm::Value* hash_ptr,
215  const Analyzer::Expr* key_col,
216  const int shard_count,
217  const CompilationOptions& co);
218 
219  std::pair<const int8_t*, size_t> fetchFragments(
220  const Analyzer::ColumnVar* hash_col,
221  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragment_info,
222  const Data_Namespace::MemoryLevel effective_memory_level,
223  const int device_id,
224  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
225  ThrustAllocator& dev_buff_owner);
226 
227  bool isBitwiseEq() const;
228 
229  void freeHashBufferMemory();
232 
233  size_t getComponentBufferSize() const noexcept;
234 
235  std::shared_ptr<Analyzer::BinOper> qual_bin_oper_;
236  std::shared_ptr<Analyzer::ColumnVar> col_var_;
237  const std::vector<InputTableInfo>& query_infos_;
241  std::shared_ptr<std::vector<int32_t>> cpu_hash_table_buff_;
243 #ifdef HAVE_CUDA
244  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
245  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_err_buff_;
246 #endif
248  Executor* executor_;
250  const int device_count_;
251  std::pair<const int8_t*, size_t> linearized_multifrag_column_;
254 
259  const size_t num_elements;
261  const SQLOps optype;
262 
263  bool operator==(const struct JoinHashTableCacheKey& that) const {
264  return col_range == that.col_range && inner_col == that.inner_col &&
265  outer_col == that.outer_col && num_elements == that.num_elements &&
266  chunk_key == that.chunk_key && optype == that.optype;
267  }
268  };
269 
270  static std::vector<
271  std::pair<JoinHashTableCacheKey, std::shared_ptr<std::vector<int32_t>>>>
273  static std::mutex join_hash_table_cache_mutex_;
274 };
275 
276 inline std::string get_table_name_by_id(const int table_id,
277  const Catalog_Namespace::Catalog& cat) {
278  if (table_id >= 1) {
279  const auto td = cat.getMetadataForTable(table_id);
280  CHECK(td);
281  return td->tableName;
282  }
283  return "$TEMPORARY_TABLE" + std::to_string(-table_id);
284 }
285 
286 // TODO(alex): Functions below need to be moved to a separate translation unit, they don't
287 // belong here.
288 
289 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
290 
291 size_t get_shard_count(
292  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
293  const Executor* executor);
294 
296  const Analyzer::Expr* outer_col,
297  const Executor* executor);
298 
299 // Swap the columns if needed and make the inner column the first component.
301  const Analyzer::Expr* rhs,
302  const Catalog_Namespace::Catalog& cat,
303  const TemporaryTables* temporary_tables,
304  const bool is_overlaps_join = false);
305 
306 // Normalize each expression tuple
307 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
308  const Catalog_Namespace::Catalog& cat,
309  const TemporaryTables* temporary_tables);
310 
311 std::deque<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
312  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
313  const int device_id,
314  const int device_count);
315 
317  const int inner_table_id,
318  const std::vector<InputTableInfo>& query_infos);
319 
320 size_t get_entries_per_device(const size_t total_entries,
321  const size_t shard_count,
322  const size_t device_count,
323  const Data_Namespace::MemoryLevel memory_level);
324 
325 #endif // QUERYENGINE_JOINHASHTABLE_H
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
const std::vector< InputTableInfo > & query_infos_
std::pair< const int8_t *, size_t > linearized_multifrag_column_
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
ExecutorDeviceType
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
const int device_count_
bool isBitwiseEq() const
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
SQLOps
Definition: sqldefs.h:29
void freeHashBufferMemory()
unsigned long long CUdeviceptr
Definition: nocuda.h:27
size_t countBufferOff() const noexcept override
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void checkHashJoinReplicationConstraint(const int table_id) const
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
ColumnCacheMap & column_cache_
HashType hash_type_
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
ExpressionRangeType getType() const
std::string to_string(char const *&&v)
std::string get_table_name_by_id(const int table_id, const Catalog_Namespace::Catalog &cat)
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::mutex linearized_multifrag_column_mutex_
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
int getInnerTableRteIdx() const noexcept override
Definition: JoinHashTable.h:93
const Analyzer::ColumnVar inner_col
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
size_t payloadBufferOff() const noexcept override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col, const Executor *executor)
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
size_t shardCount() const
bool operator==(const struct JoinHashTableCacheKey &that) const
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) noexcept override
Definition: JoinHashTable.h:63
#define CHECK_LT(x, y)
Definition: Logger.h:197
Executor * executor_
RowSetMemoryOwner linearized_multifrag_column_owner_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
ExpressionRange col_range_
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
size_t getComponentBufferSize() const noexcept
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t offsetBufferOff() const noexcept override
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr *> &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file, ::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:101
#define CHECK(condition)
Definition: Logger.h:187
std::shared_ptr< Analyzer::ColumnVar > col_var_
std::vector< int > ChunkKey
Definition: types.h:35
void reify(const int device_count)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
virtual ~JoinHashTable()
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
HashType getHashType() const noexcept override
Definition: JoinHashTable.h:97
int getInnerTableId() const noexcept override
Definition: JoinHashTable.h:89
const Data_Namespace::MemoryLevel memory_level_
static auto yieldCacheInvalidator() -> std::function< void()>
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t hash_entry_count_
const Analyzer::ColumnVar outer_col
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)