OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaselineJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef QUERYENGINE_BASELINEJOINHASHTABLE_H
17 #define QUERYENGINE_BASELINEJOINHASHTABLE_H
18 
19 #include "../Analyzer/Analyzer.h"
20 #include "../DataMgr/MemoryLevel.h"
21 #include "ColumnarResults.h"
23 #include "HashJoinRuntime.h"
24 #include "InputMetadata.h"
25 #include "JoinHashTableInterface.h"
26 
27 #ifdef HAVE_CUDA
28 #include <cuda.h>
29 #endif
30 #include <cstdint>
31 #include <map>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_set>
35 #include <vector>
36 
37 class Executor;
38 
39 // Representation for a hash table using the baseline layout: an open-addressing
40 // hash with a fill rate of 50%. It is used for equi-joins on multiple columns and
41 // on single sparse columns (with very wide range), typically big integer. As of
42 // now, such tuples must be unique within the inner table.
44  public:
46  static std::shared_ptr<BaselineJoinHashTable> getInstance(
47  const std::shared_ptr<Analyzer::BinOper> condition,
48  const std::vector<InputTableInfo>& query_infos,
49  const Data_Namespace::MemoryLevel memory_level,
50  const HashType preferred_hash_type,
51  const int device_count,
52  ColumnCacheMap& column_cache,
53  Executor* executor);
54 
55  static size_t getShardCountForCondition(
56  const Analyzer::BinOper* condition,
57  const Executor* executor,
58  const std::vector<InnerOuter>& inner_outer_pairs);
59 
60  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
61  const int device_id) const noexcept override;
62 
63  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
64  const int device_id) const noexcept override;
65 
66  std::string toString(const ExecutorDeviceType device_type,
67  const int device_id = 0,
68  bool raw = false) const override;
69 
70  std::set<DecodedJoinHashBufferEntry> toSet(const ExecutorDeviceType device_type,
71  const int device_id) const override;
72 
73  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
74 
76  const size_t) override;
77 
78  int getInnerTableId() const noexcept override;
79 
80  int getInnerTableRteIdx() const noexcept override;
81 
82  JoinHashTableInterface::HashType getHashType() const noexcept override;
83 
84  Data_Namespace::MemoryLevel getMemoryLevel() const noexcept override {
85  return memory_level_;
86  };
87 
88  int getDeviceCount() const noexcept override { return device_count_; };
89 
90  size_t offsetBufferOff() const noexcept override;
91 
92  size_t countBufferOff() const noexcept override;
93 
94  size_t payloadBufferOff() const noexcept override;
95 
96  static auto yieldCacheInvalidator() -> std::function<void()> {
97  VLOG(1) << "Invalidate " << hash_table_cache_.size() << " cached baseline hashtable.";
98  return []() -> void {
99  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
100  hash_table_cache_.clear();
101  };
102  }
103 
104  static const std::shared_ptr<std::vector<int8_t>>& getCachedHashTable(size_t idx) {
105  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
106  CHECK(!hash_table_cache_.empty());
107  CHECK_LT(idx, hash_table_cache_.size());
108  return hash_table_cache_.at(idx).second.buffer;
109  }
110 
111  static size_t getEntryCntCachedHashTable(size_t idx) {
112  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
113  CHECK(!hash_table_cache_.empty());
114  CHECK_LT(idx, hash_table_cache_.size());
115  return hash_table_cache_.at(idx).second.entry_count;
116  }
117 
118  static uint64_t getNumberOfCachedHashTables() {
119  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
120  return hash_table_cache_.size();
121  }
122 
123  virtual ~BaselineJoinHashTable();
124 
125  private:
126  size_t getKeyBufferSize() const noexcept;
127  size_t getComponentBufferSize() const noexcept;
128 
129  protected:
130  BaselineJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
131  const std::vector<InputTableInfo>& query_infos,
132  const Data_Namespace::MemoryLevel memory_level,
133  const HashType preferred_hash_type,
134  const size_t entry_count,
135  ColumnCacheMap& column_cache,
136  Executor* executor,
137  const std::vector<InnerOuter>& inner_outer_pairs,
138  const int device_count);
139 
140  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs);
141 
142  virtual void reifyWithLayout(const JoinHashTableInterface::HashType layout);
143 
145  const std::vector<JoinColumn> join_columns;
146  const std::vector<JoinColumnTypeInfo> join_column_types;
147  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
148  std::vector<JoinBucketInfo> join_buckets;
149  const std::vector<std::shared_ptr<void>> malloc_owner;
150  };
151 
153  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
154  const int device_id,
155  DeviceAllocator* dev_buff_owner);
156 
157  virtual std::pair<size_t, size_t> approximateTupleCount(
158  const std::vector<ColumnsForDevice>&) const;
159 
160  virtual size_t getKeyComponentWidth() const;
161 
162  virtual size_t getKeyComponentCount() const;
163 
164  virtual int initHashTableOnCpu(const std::vector<JoinColumn>& join_columns,
165  const std::vector<JoinColumnTypeInfo>& join_column_types,
166  const std::vector<JoinBucketInfo>& join_bucket_info,
167  const JoinHashTableInterface::HashType layout);
168 
169  virtual int initHashTableOnGpu(const std::vector<JoinColumn>& join_columns,
170  const std::vector<JoinColumnTypeInfo>& join_column_types,
171  const std::vector<JoinBucketInfo>& join_bucket_info,
173  const size_t key_component_width,
174  const size_t key_component_count,
175  const int device_id);
176 
177  virtual llvm::Value* codegenKey(const CompilationOptions&);
178 
179  size_t shardCount() const;
180 
182  const std::vector<InnerOuter>& inner_outer_pairs) const;
183 
185  std::vector<const void*> sd_inner_proxy_per_key;
186  std::vector<const void*> sd_outer_proxy_per_key;
187  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
188  };
189 
191 
192  void reify();
193 
194  void reifyForDevice(const ColumnsForDevice& columns_for_device,
196  const int device_id,
197  const logger::ThreadId parent_thread_id);
198 
199  void checkHashJoinReplicationConstraint(const int table_id) const;
200 
201  int initHashTableForDevice(const std::vector<JoinColumn>& join_columns,
202  const std::vector<JoinColumnTypeInfo>& join_column_types,
203  const std::vector<JoinBucketInfo>& join_buckets,
205  const Data_Namespace::MemoryLevel effective_memory_level,
206  const int device_id);
207 
208  llvm::Value* hashPtr(const size_t index);
209 
211  const size_t num_elements;
212  const std::vector<ChunkKey> chunk_keys;
213  const SQLOps optype;
214  const boost::optional<double> overlaps_hashjoin_bucket_threshold;
215 
216  bool operator==(const struct HashTableCacheKey& that) const {
217  bool oeq;
219  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
220  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
221  } else {
224  }
225  return num_elements == that.num_elements && chunk_keys == that.chunk_keys &&
226  optype == that.optype && oeq;
227  }
228 
229  bool operator<(const struct HashTableCacheKey& that) const {
230  bool oeq;
232  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
233  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
234  } else {
237  }
238  return num_elements < that.num_elements && chunk_keys < that.chunk_keys &&
239  optype < that.optype && !oeq &&
241  }
242  };
243 
244  void initHashTableOnCpuFromCache(const HashTableCacheKey&);
245 
246  void putHashTableOnCpuToCache(const HashTableCacheKey&);
247 
248  std::pair<ssize_t, size_t> getApproximateTupleCountFromCache(
249  const HashTableCacheKey&) const;
250 
251  bool isBitwiseEq() const;
252 
253  void freeHashBufferMemory();
256 
258  noexcept override {
261  };
262 
263  const std::shared_ptr<Analyzer::BinOper> condition_;
264  const std::vector<InputTableInfo>& query_infos_;
267  size_t entry_count_; // number of keys in the hash table
268  size_t emitted_keys_count_; // number of keys emitted across all rows
269  Executor* executor_;
271  std::shared_ptr<std::vector<int8_t>> cpu_hash_table_buff_;
273 #ifdef HAVE_CUDA
274  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
275 #endif
276  std::vector<InnerOuter> inner_outer_pairs_;
278  const int device_count_;
279 #ifdef HAVE_CUDA
280  unsigned block_size_;
281  unsigned grid_size_;
282 #endif // HAVE_CUDA
283 
285  const std::shared_ptr<std::vector<int8_t>> buffer;
287  const size_t entry_count;
288  const size_t emitted_keys_count;
289  };
290 
292 
293  static std::vector<std::pair<HashTableCacheKey, HashTableCacheValue>> hash_table_cache_;
294  static std::mutex hash_table_cache_mutex_;
295 
296  static const int ERR_FAILED_TO_FETCH_COLUMN{-3};
298 };
299 
301  public:
302  static void set(const std::vector<ChunkKey>& key,
303  const JoinHashTableInterface::HashType hash_type);
304 
305  static std::pair<JoinHashTableInterface::HashType, bool> get(
306  const std::vector<ChunkKey>& key);
307 
308  private:
309  static std::map<std::vector<ChunkKey>, JoinHashTableInterface::HashType>
311  static std::mutex hash_type_cache_mutex_;
312 };
313 
314 #endif // QUERYENGINE_BASELINEJOINHASHTABLE_H
size_t offsetBufferOff() const noexceptoverride
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static void set(const std::vector< ChunkKey > &key, const JoinHashTableInterface::HashType hash_type)
const boost::optional< double > overlaps_hashjoin_bucket_threshold
static std::map< std::vector< ChunkKey >, JoinHashTableInterface::HashType > hash_type_cache_
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
void putHashTableOnCpuToCache(const HashTableCacheKey &)
virtual int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id)
static const std::shared_ptr< std::vector< int8_t > > & getCachedHashTable(size_t idx)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex hash_type_cache_mutex_
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
ExecutorDeviceType
size_t getComponentBufferSize() const noexcept
virtual ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static auto yieldCacheInvalidator() -> std::function< void()>
SQLOps
Definition: sqldefs.h:29
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
size_t getKeyBufferSize() const noexcept
JoinHashTableInterface::HashType layout_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
int getInnerTableRteIdx() const noexceptoverride
const HashTableCacheValue * findHashTableOnCpuInCache(const HashTableCacheKey &)
static uint64_t getNumberOfCachedHashTables()
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
virtual void reifyWithLayout(const JoinHashTableInterface::HashType layout)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
static const int ERR_FAILED_TO_FETCH_COLUMN
CHECK(cgen_state)
CompositeKeyInfo getCompositeKeyInfo() const
ColumnCacheMap & column_cache_
JoinHashTableInterface::HashType getHashType() const noexceptoverride
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
const std::vector< JoinColumnTypeInfo > join_column_types
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
const std::shared_ptr< std::vector< int8_t > > buffer
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define CHECK_LT(x, y)
Definition: Logger.h:207
int getInnerTableId() const noexceptoverride
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
void checkHashJoinReplicationConstraint(const int table_id) const
bool operator<(const struct HashTableCacheKey &that) const
const Catalog_Namespace::Catalog * catalog_
const std::vector< JoinColumn > join_columns
std::vector< JoinBucketInfo > join_buckets
uint64_t ThreadId
Definition: Logger.h:306
static std::mutex hash_table_cache_mutex_
const Data_Namespace::MemoryLevel memory_level_
virtual int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
llvm::Value * hashPtr(const size_t index)
static size_t getEntryCntCachedHashTable(size_t idx)
std::vector< const void * > sd_outer_proxy_per_key
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
virtual size_t getKeyComponentCount() const
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
const std::vector< std::shared_ptr< void > > malloc_owner
virtual size_t getKeyComponentWidth() const
Executor(const ExecutorId id, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:129
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
int getDeviceCount() const noexceptoverride
std::vector< const void * > sd_inner_proxy_per_key
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_
const JoinHashTableInterface::HashType type
bool operator==(const struct HashTableCacheKey &that) const
size_t countBufferOff() const noexceptoverride