OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file JoinHashTable.h
19  * @author Alex Suhan <alex@mapd.com>
20  *
21  * Copyright (c) 2015 MapD Technologies, Inc. All rights reserved.
22  */
23 
24 #ifndef QUERYENGINE_JOINHASHTABLE_H
25 #define QUERYENGINE_JOINHASHTABLE_H
26 
27 #include "Analyzer/Analyzer.h"
28 #include "Catalog/Catalog.h"
29 #include "ColumnarResults.h"
31 #include "DataMgr/Chunk/Chunk.h"
34 #include "ExpressionRange.h"
35 #include "InputMetadata.h"
36 #include "JoinHashTableInterface.h"
37 
38 #include <llvm/IR/Value.h>
39 
40 #ifdef HAVE_CUDA
41 #include <cuda.h>
42 #endif
43 #include <functional>
44 #include <memory>
45 #include <mutex>
46 #include <stdexcept>
47 
48 class Executor;
49 struct HashEntryInfo;
50 
52  public:
54  static std::shared_ptr<JoinHashTable> getInstance(
55  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
56  const std::vector<InputTableInfo>& query_infos,
57  const Data_Namespace::MemoryLevel memory_level,
58  const HashType preferred_hash_type,
59  const int device_count,
60  ColumnCacheMap& column_cache,
61  Executor* executor);
62 
63  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
64  const int device_id) const noexcept override;
65 
66  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
67  const int device_id) const noexcept override;
68 
69  std::string toString(const ExecutorDeviceType device_type,
70  const int device_id = 0,
71  bool raw = false) const override;
72 
73  std::set<DecodedJoinHashBufferEntry> toSet(const ExecutorDeviceType device_type,
74  const int device_id) const override;
75 
76  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
77 
79  const size_t) override;
80 
81  int getInnerTableId() const noexcept override {
82  return col_var_.get()->get_table_id();
83  };
84 
85  int getInnerTableRteIdx() const noexcept override {
86  return col_var_.get()->get_rte_idx();
87  };
88 
89  HashType getHashType() const noexcept override { return hash_type_; }
90 
91  Data_Namespace::MemoryLevel getMemoryLevel() const noexcept override {
92  return memory_level_;
93  };
94 
95  int getDeviceCount() const noexcept override { return device_count_; };
96 
97  size_t offsetBufferOff() const noexcept override;
98 
99  size_t countBufferOff() const noexcept override;
100 
101  size_t payloadBufferOff() const noexcept override;
102 
103  static HashJoinMatchingSet codegenMatchingSet(
104  const std::vector<llvm::Value*>& hash_join_idx_args_in,
105  const bool is_sharded,
106  const bool col_is_nullable,
107  const bool is_bw_eq,
108  const int64_t sub_buff_size,
109  Executor* executor,
110  const bool is_bucketized = false);
111 
112  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
113 
114  static auto yieldCacheInvalidator() -> std::function<void()> {
115  VLOG(1) << "Invalidate " << join_hash_table_cache_.size()
116  << " cached baseline hashtable.";
117  return []() -> void {
118  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
119  join_hash_table_cache_.clear();
120  };
121  }
122 
123  static const std::shared_ptr<std::vector<int32_t>>& getCachedHashTable(size_t idx) {
124  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
125  CHECK(!join_hash_table_cache_.empty());
126  CHECK_LT(idx, join_hash_table_cache_.size());
127  return join_hash_table_cache_.at(idx).second;
128  }
129 
130  static uint64_t getNumberOfCachedHashTables() {
131  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
132  return join_hash_table_cache_.size();
133  }
134 
135  virtual ~JoinHashTable();
136 
137  private:
138  JoinHashTable(const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
139  const Analyzer::ColumnVar* col_var,
140  const std::vector<InputTableInfo>& query_infos,
141  const Data_Namespace::MemoryLevel memory_level,
142  const HashType preferred_hash_type,
143  const ExpressionRange& col_range,
144  ColumnCacheMap& column_cache,
145  Executor* executor,
146  const int device_count)
147  : qual_bin_oper_(qual_bin_oper)
148  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
149  , query_infos_(query_infos)
150  , memory_level_(memory_level)
151  , hash_type_(preferred_hash_type)
152  , hash_entry_count_(0)
153  , col_range_(col_range)
154  , executor_(executor)
155  , column_cache_(column_cache)
156  , device_count_(device_count) {
159  }
160 
162  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
163  const Analyzer::Expr* outer_col,
164  const Analyzer::ColumnVar* inner_col) const;
165 
166  void reify();
168  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
169  const int device_id,
170  const logger::ThreadId parent_thread_id);
172  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
173  const int device_id,
174  const logger::ThreadId parent_thread_id);
175  void checkHashJoinReplicationConstraint(const int table_id) const;
177  const ChunkKey& chunk_key,
178  const JoinColumn& join_column,
179  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
180  const Data_Namespace::MemoryLevel effective_memory_level,
181  const int device_id);
183  const ChunkKey& chunk_key,
184  const JoinColumn& join_column,
185  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
186  const Data_Namespace::MemoryLevel effective_memory_level,
187  const int device_id);
189  const ChunkKey& chunk_key,
190  const size_t num_elements,
191  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
193  const ChunkKey& chunk_key,
194  const size_t num_elements,
195  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
197  const JoinColumn& join_column,
198  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
199  const HashEntryInfo hash_entry_info,
200  const int32_t hash_join_invalid_val);
202  const JoinColumn& join_column,
203  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
204  const HashEntryInfo hash_entry_info,
205  const int32_t hash_join_invalid_val);
206 
207  const InputTableInfo& getInnerQueryInfo(const Analyzer::ColumnVar* inner_col) const;
208 
209  size_t shardCount() const;
210 
211  llvm::Value* codegenHashTableLoad(const size_t table_idx);
212 
213  std::vector<llvm::Value*> getHashJoinArgs(llvm::Value* hash_ptr,
214  const Analyzer::Expr* key_col,
215  const int shard_count,
216  const CompilationOptions& co);
217 
218  bool isBitwiseEq() const;
219 
220  void freeHashBufferMemory();
223 
224  size_t getComponentBufferSize() const noexcept;
225 
227  noexcept override {
230  };
231 
232  std::shared_ptr<Analyzer::BinOper> qual_bin_oper_;
233  std::shared_ptr<Analyzer::ColumnVar> col_var_;
234  const std::vector<InputTableInfo>& query_infos_;
238  std::shared_ptr<std::vector<int32_t>> cpu_hash_table_buff_;
240 #ifdef HAVE_CUDA
241  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
242  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_err_buff_;
243 #endif
245  Executor* executor_;
247  const int device_count_;
248 
253  const size_t num_elements;
255  const SQLOps optype;
256 
257  bool operator==(const struct JoinHashTableCacheKey& that) const {
258  return col_range == that.col_range && inner_col == that.inner_col &&
259  outer_col == that.outer_col && num_elements == that.num_elements &&
260  chunk_key == that.chunk_key && optype == that.optype;
261  }
262  };
263 
264  static std::vector<
265  std::pair<JoinHashTableCacheKey, std::shared_ptr<std::vector<int32_t>>>>
267  static std::mutex join_hash_table_cache_mutex_;
268 };
269 
270 // TODO(alex): Functions below need to be moved to a separate translation unit, they don't
271 // belong here.
272 
273 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
274 
275 size_t get_shard_count(
276  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
277  const Executor* executor);
278 
280  const Analyzer::Expr* outer_col,
281  const Executor* executor);
282 
283 // Swap the columns if needed and make the inner column the first component.
285  const Analyzer::Expr* rhs,
287  const TemporaryTables* temporary_tables,
288  const bool is_overlaps_join = false);
289 
290 // Normalize each expression tuple
291 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
293  const TemporaryTables* temporary_tables);
294 
295 std::vector<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
296  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
297  const int device_id,
298  const int device_count);
299 
301  const int inner_table_id,
302  const std::vector<InputTableInfo>& query_infos);
303 
304 size_t get_entries_per_device(const size_t total_entries,
305  const size_t shard_count,
306  const size_t device_count,
307  const Data_Namespace::MemoryLevel memory_level);
308 
309 #endif // QUERYENGINE_JOINHASHTABLE_H
Defines data structures for the semantic analysis phase of query processing.
void initOneToManyHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
bool isBitwiseEq() const
const std::vector< InputTableInfo > & query_infos_
std::vector< int > ChunkKey
Definition: types.h:35
static const std::shared_ptr< std::vector< int32_t > > & getCachedHashTable(size_t idx)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
std::string cat(Ts &&...args)
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
ExecutorDeviceType
const int device_count_
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
SQLOps
Definition: sqldefs.h:29
void freeHashBufferMemory()
size_t payloadBufferOff() const noexceptoverride
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
ChunkKey genHashTableKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
HashType hash_type_
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
void checkHashJoinReplicationConstraint(const int table_id) const
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
This file contains the class specification and related data structures for Catalog.
const Analyzer::ColumnVar inner_col
CHECK(cgen_state)
void reifyOneToOneForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
size_t offsetBufferOff() const noexceptoverride
void reifyOneToManyForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const logger::ThreadId parent_thread_id)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
int getInnerTableId() const noexceptoverride
Definition: JoinHashTable.h:81
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
#define CHECK_LT(x, y)
Definition: Logger.h:207
size_t shardCount() const
Executor * executor_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
static uint64_t getNumberOfCachedHashTables()
ExpressionRange col_range_
int getInnerTableRteIdx() const noexceptoverride
Definition: JoinHashTable.h:85
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
uint64_t ThreadId
Definition: Logger.h:306
size_t getComponentBufferSize() const noexcept
bool operator==(const struct JoinHashTableCacheKey &that) const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
ExpressionRangeType getType() const
bool g_enable_watchdog false
Definition: Execute.cpp:74
std::shared_ptr< Analyzer::ColumnVar > col_var_
size_t countBufferOff() const noexceptoverride
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
virtual ~JoinHashTable()
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:129
void initOneToOneHashTable(const ChunkKey &chunk_key, const JoinColumn &join_column, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HashType getHashType() const noexceptoverride
Definition: JoinHashTable.h:89
const Data_Namespace::MemoryLevel memory_level_
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
static auto yieldCacheInvalidator() -> std::function< void()>
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
int getDeviceCount() const noexceptoverride
Definition: JoinHashTable.h:95
#define VLOG(n)
Definition: Logger.h:291
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t hash_entry_count_
const Analyzer::ColumnVar outer_col
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
Definition: JoinHashTable.h:91