OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file JoinHashTable.h
19  * @author Alex Suhan <alex@mapd.com>
20  *
21  * Copyright (c) 2015 MapD Technologies, Inc. All rights reserved.
22  */
23 
24 #ifndef QUERYENGINE_JOINHASHTABLE_H
25 #define QUERYENGINE_JOINHASHTABLE_H
26 
27 #include "../Analyzer/Analyzer.h"
28 #include "../Catalog/Catalog.h"
29 #include "../Chunk/Chunk.h"
30 #include "../Shared/ExperimentalTypeUtilities.h"
32 #include "ColumnarResults.h"
35 #include "ExpressionRange.h"
36 #include "InputMetadata.h"
37 #include "JoinHashTableInterface.h"
38 
39 #include <llvm/IR/Value.h>
40 
41 #ifdef HAVE_CUDA
42 #include <cuda.h>
43 #endif
44 #include <functional>
45 #include <memory>
46 #include <mutex>
47 #include <stdexcept>
48 
49 class Executor;
50 struct HashEntryInfo;
51 
53  public:
55  static std::shared_ptr<JoinHashTable> getInstance(
56  const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
57  const std::vector<InputTableInfo>& query_infos,
58  const Data_Namespace::MemoryLevel memory_level,
59  const HashType preferred_hash_type,
60  const int device_count,
61  ColumnCacheMap& column_cache,
62  Executor* executor);
63 
65  static std::shared_ptr<JoinHashTable> getSyntheticInstance(
66  std::string_view table1,
67  std::string_view column1,
68  std::string_view table2,
69  std::string_view column2,
70  const Data_Namespace::MemoryLevel memory_level,
71  const HashType preferred_hash_type,
72  const int device_count,
73  ColumnCacheMap& column_cache,
74  Executor* executor);
75 
76  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
77  const int device_id) const noexcept override;
78 
79  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
80  const int device_id) const noexcept override;
81 
82  std::string toString(const ExecutorDeviceType device_type,
83  const int device_id,
84  bool raw = false) const noexcept override;
85 
86  std::set<DecodedJoinHashBufferEntry> decodeJoinHashBuffer(
87  const ExecutorDeviceType device_type,
88  const int device_id) const noexcept override;
89 
90  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
91 
93  const size_t) override;
94 
95  int getInnerTableId() const noexcept override {
96  return col_var_.get()->get_table_id();
97  };
98 
99  int getInnerTableRteIdx() const noexcept override {
100  return col_var_.get()->get_rte_idx();
101  };
102 
103  HashType getHashType() const noexcept override { return hash_type_; }
104 
105  size_t offsetBufferOff() const noexcept override;
106 
107  size_t countBufferOff() const noexcept override;
108 
109  size_t payloadBufferOff() const noexcept override;
110 
111  static HashJoinMatchingSet codegenMatchingSet(
112  const std::vector<llvm::Value*>& hash_join_idx_args_in,
113  const bool is_sharded,
114  const bool col_is_nullable,
115  const bool is_bw_eq,
116  const int64_t sub_buff_size,
117  Executor* executor,
118  const bool is_bucketized = false);
119 
120  static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor);
121 
122  static auto yieldCacheInvalidator() -> std::function<void()> {
123  return []() -> void {
124  std::lock_guard<std::mutex> guard(join_hash_table_cache_mutex_);
125  join_hash_table_cache_.clear();
126  };
127  }
128 
129  virtual ~JoinHashTable() {}
130 
131  private:
132  JoinHashTable(const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
133  const Analyzer::ColumnVar* col_var,
134  const std::vector<InputTableInfo>& query_infos,
135  const Data_Namespace::MemoryLevel memory_level,
136  const HashType preferred_hash_type,
137  const ExpressionRange& col_range,
138  ColumnCacheMap& column_cache,
139  Executor* executor,
140  const int device_count)
141  : qual_bin_oper_(qual_bin_oper)
142  , col_var_(std::dynamic_pointer_cast<Analyzer::ColumnVar>(col_var->deep_copy()))
143  , query_infos_(query_infos)
144  , memory_level_(memory_level)
145  , hash_type_(preferred_hash_type)
146  , hash_entry_count_(0)
147  , col_range_(col_range)
148  , executor_(executor)
149  , column_cache_(column_cache)
150  , device_count_(device_count) {
152  }
153 
154  std::pair<const int8_t*, size_t> getOneColumnFragment(
155  const Analyzer::ColumnVar& hash_col,
156  const Fragmenter_Namespace::FragmentInfo& fragment,
157  const Data_Namespace::MemoryLevel effective_mem_lvl,
158  const int device_id,
159  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
160 
161  std::pair<const int8_t*, size_t> getAllColumnFragments(
162  const Analyzer::ColumnVar& hash_col,
163  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
164  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
165 
167  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
168  const Analyzer::Expr* outer_col,
169  const Analyzer::ColumnVar* inner_col) const;
170 
171  void reify(const int device_count);
173  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
174  const int device_id);
176  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
177  const int device_id);
178  void checkHashJoinReplicationConstraint(const int table_id) const;
180  const ChunkKey& chunk_key,
181  const int8_t* col_buff,
182  const size_t num_elements,
183  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
184  const Data_Namespace::MemoryLevel effective_memory_level,
185  const int device_id);
187  const ChunkKey& chunk_key,
188  const int8_t* col_buff,
189  const size_t num_elements,
190  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
191  const Data_Namespace::MemoryLevel effective_memory_level,
192  const int device_id);
194  const ChunkKey& chunk_key,
195  const size_t num_elements,
196  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
198  const ChunkKey& chunk_key,
199  const size_t num_elements,
200  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols);
201  void initHashTableOnCpu(
202  const int8_t* col_buff,
203  const size_t num_elements,
204  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
205  const HashEntryInfo hash_entry_info,
206  const int32_t hash_join_invalid_val);
208  const int8_t* col_buff,
209  const size_t num_elements,
210  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
211  const HashEntryInfo hash_entry_info,
212  const int32_t hash_join_invalid_val);
213 
214  const InputTableInfo& getInnerQueryInfo(const Analyzer::ColumnVar* inner_col) const;
215 
216  size_t shardCount() const;
217 
218  llvm::Value* codegenHashTableLoad(const size_t table_idx);
219 
220  std::vector<llvm::Value*> getHashJoinArgs(llvm::Value* hash_ptr,
221  const Analyzer::Expr* key_col,
222  const int shard_count,
223  const CompilationOptions& co);
224 
225  std::pair<const int8_t*, size_t> fetchFragments(
226  const Analyzer::ColumnVar* hash_col,
227  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragment_info,
228  const Data_Namespace::MemoryLevel effective_memory_level,
229  const int device_id,
230  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
231  ThrustAllocator& dev_buff_owner);
232 
233  bool isBitwiseEq() const;
234 
235  void freeHashBufferMemory();
238 
239  size_t getComponentBufferSize() const noexcept;
240 
241  std::shared_ptr<Analyzer::BinOper> qual_bin_oper_;
242  std::shared_ptr<Analyzer::ColumnVar> col_var_;
243  const std::vector<InputTableInfo>& query_infos_;
244  const Data_Namespace::MemoryLevel memory_level_;
247  std::shared_ptr<std::vector<int32_t>> cpu_hash_table_buff_;
249 #ifdef HAVE_CUDA
250  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
251  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_err_buff_;
252 #endif
254  Executor* executor_;
256  const int device_count_;
257  std::pair<const int8_t*, size_t> linearized_multifrag_column_;
260 
265  const size_t num_elements;
267  const SQLOps optype;
268 
269  bool operator==(const struct JoinHashTableCacheKey& that) const {
270  return col_range == that.col_range && inner_col == that.inner_col &&
271  outer_col == that.outer_col && num_elements == that.num_elements &&
272  chunk_key == that.chunk_key && optype == that.optype;
273  }
274  };
275 
276  static std::vector<
277  std::pair<JoinHashTableCacheKey, std::shared_ptr<std::vector<int32_t>>>>
279  static std::mutex join_hash_table_cache_mutex_;
280 };
281 
282 // TODO(alex): Functions below need to be moved to a separate translation unit, they don't
283 // belong here.
284 
285 size_t get_shard_count(const Analyzer::BinOper* join_condition, const Executor* executor);
286 
287 size_t get_shard_count(
288  std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
289  const Executor* executor);
290 
292  const Analyzer::Expr* outer_col,
293  const Executor* executor);
294 
295 // Swap the columns if needed and make the inner column the first component.
297  const Analyzer::Expr* rhs,
298  const Catalog_Namespace::Catalog& cat,
299  const TemporaryTables* temporary_tables,
300  const bool is_overlaps_join = false);
301 
302 // Normalize each expression tuple
303 std::vector<InnerOuter> normalize_column_pairs(const Analyzer::BinOper* condition,
304  const Catalog_Namespace::Catalog& cat,
305  const TemporaryTables* temporary_tables);
306 
307 std::deque<Fragmenter_Namespace::FragmentInfo> only_shards_for_device(
308  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
309  const int device_id,
310  const int device_count);
311 
313  const int inner_table_id,
314  const std::vector<InputTableInfo>& query_infos);
315 
316 size_t get_entries_per_device(const size_t total_entries,
317  const size_t shard_count,
318  const size_t device_count,
319  const Data_Namespace::MemoryLevel memory_level);
320 
321 #endif // QUERYENGINE_JOINHASHTABLE_H
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file,::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:106
bool isBitwiseEq() const
const std::vector< InputTableInfo > & query_infos_
std::vector< int > ChunkKey
Definition: types.h:35
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::pair< const int8_t *, size_t > linearized_multifrag_column_
void freeHashBufferGpuMemory()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
ExecutorDeviceType
void reifyOneToManyForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
const int device_count_
void initHashTableOnCpuFromCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
SQLOps
Definition: sqldefs.h:29
void freeHashBufferMemory()
size_t payloadBufferOff() const noexceptoverride
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< std::vector< int32_t > > cpu_hash_table_buff_
void initOneToManyHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
void putHashTableOnCpuToCache(const ChunkKey &chunk_key, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols)
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
HashType hash_type_
JoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange &col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count)
static std::mutex join_hash_table_cache_mutex_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
void checkHashJoinReplicationConstraint(const int table_id) const
std::mutex linearized_multifrag_column_mutex_
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::pair< const int8_t *, size_t > getOneColumnFragment(const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
const Analyzer::ColumnVar inner_col
CHECK(cgen_state)
static std::shared_ptr< JoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
void initHashTableForDevice(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
size_t offsetBufferOff() const noexceptoverride
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
static std::vector< std::pair< JoinHashTableCacheKey, std::shared_ptr< std::vector< int32_t > > > > join_hash_table_cache_
void reifyOneToOneForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
int getInnerTableId() const noexceptoverride
Definition: JoinHashTable.h:95
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
size_t shardCount() const
Executor * executor_
RowSetMemoryOwner linearized_multifrag_column_owner_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
ExpressionRange col_range_
int getInnerTableRteIdx() const noexceptoverride
Definition: JoinHashTable.h:99
std::mutex cpu_hash_table_buff_mutex_
void freeHashBufferCpuMemory()
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
size_t getComponentBufferSize() const noexcept
bool operator==(const struct JoinHashTableCacheKey &that) const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
ExpressionRangeType getType() const
void initHashTableOnCpu(const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val)
bool g_enable_watchdog false
Definition: Execute.cpp:71
std::set< DecodedJoinHashBufferEntry > decodeJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::shared_ptr< Analyzer::ColumnVar > col_var_
size_t countBufferOff() const noexceptoverride
void reify(const int device_count)
std::string toString(const ExecutorDeviceType device_type, const int device_id, bool raw=false) const noexceptoverride
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
virtual ~JoinHashTable()
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
void initOneToManyHashTable(const ChunkKey &chunk_key, const int8_t *col_buff, const size_t num_elements, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
HashType getHashType() const noexceptoverride
const Data_Namespace::MemoryLevel memory_level_
static std::shared_ptr< JoinHashTable > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
static auto yieldCacheInvalidator() -> std::function< void()>
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
ChunkKey genHashTableKey(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
size_t hash_entry_count_
const Analyzer::ColumnVar outer_col
std::pair< const int8_t *, size_t > fetchFragments(const Analyzer::ColumnVar *hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ThrustAllocator &dev_buff_owner)