OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaselineJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef QUERYENGINE_BASELINEJOINHASHTABLE_H
17 #define QUERYENGINE_BASELINEJOINHASHTABLE_H
18 
19 #include "../Analyzer/Analyzer.h"
20 #include "../DataMgr/MemoryLevel.h"
21 #include "ColumnarResults.h"
23 #include "HashJoinRuntime.h"
24 #include "InputMetadata.h"
25 #include "JoinHashTableInterface.h"
26 
27 #ifdef HAVE_CUDA
28 #include <cuda.h>
29 #endif
30 #include <cstdint>
31 #include <map>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_set>
35 #include <vector>
36 
37 class Executor;
38 
39 // Representation for a hash table using the baseline layout: an open-addressing
40 // hash with a fill rate of 50%. It is used for equi-joins on multiple columns and
41 // on single sparse columns (with very wide range), typically big integer. As of
42 // now, such tuples must be unique within the inner table.
44  public:
46  static std::shared_ptr<BaselineJoinHashTable> getInstance(
47  const std::shared_ptr<Analyzer::BinOper> condition,
48  const std::vector<InputTableInfo>& query_infos,
49  const Data_Namespace::MemoryLevel memory_level,
50  const HashType preferred_hash_type,
51  const int device_count,
52  ColumnCacheMap& column_cache,
53  Executor* executor);
54 
55  static size_t getShardCountForCondition(
56  const Analyzer::BinOper* condition,
57  const Executor* executor,
58  const std::vector<InnerOuter>& inner_outer_pairs);
59 
60  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
61  const int device_id) const noexcept override;
62 
63  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
64  const int device_id) const noexcept override;
65 
66  std::string toString(const ExecutorDeviceType device_type,
67  const int device_id = 0,
68  bool raw = false) const override;
69 
70  std::set<DecodedJoinHashBufferEntry> toSet(const ExecutorDeviceType device_type,
71  const int device_id) const override;
72 
73  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
74 
76  const size_t) override;
77 
78  int getInnerTableId() const noexcept override;
79 
80  int getInnerTableRteIdx() const noexcept override;
81 
82  JoinHashTableInterface::HashType getHashType() const noexcept override;
83 
84  Data_Namespace::MemoryLevel getMemoryLevel() const noexcept override {
85  return memory_level_;
86  };
87 
88  int getDeviceCount() const noexcept override { return device_count_; };
89 
90  size_t offsetBufferOff() const noexcept override;
91 
92  size_t countBufferOff() const noexcept override;
93 
94  size_t payloadBufferOff() const noexcept override;
95 
96  static auto yieldCacheInvalidator() -> std::function<void()> {
97  return []() -> void {
98  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
99  hash_table_cache_.clear();
100  };
101  }
102 
104 
105  private:
106  size_t getKeyBufferSize() const noexcept;
107  size_t getComponentBufferSize() const noexcept;
108 
109  protected:
110  BaselineJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
111  const std::vector<InputTableInfo>& query_infos,
112  const Data_Namespace::MemoryLevel memory_level,
113  const HashType preferred_hash_type,
114  const size_t entry_count,
115  ColumnCacheMap& column_cache,
116  Executor* executor,
117  const std::vector<InnerOuter>& inner_outer_pairs,
118  const int device_count);
119 
120  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs);
121 
122  virtual void reifyWithLayout(const JoinHashTableInterface::HashType layout);
123 
125  const std::vector<JoinColumn> join_columns;
126  const std::vector<JoinColumnTypeInfo> join_column_types;
127  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
128  const std::vector<JoinBucketInfo> join_buckets;
129  const std::vector<std::shared_ptr<void>> malloc_owner;
130  };
131 
133  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
134  const int device_id,
135  ThrustAllocator& dev_buff_owner);
136 
137  virtual std::pair<size_t, size_t> approximateTupleCount(
138  const std::vector<ColumnsForDevice>&) const;
139 
140  virtual size_t getKeyComponentWidth() const;
141 
142  virtual size_t getKeyComponentCount() const;
143 
144  virtual int initHashTableOnCpu(const std::vector<JoinColumn>& join_columns,
145  const std::vector<JoinColumnTypeInfo>& join_column_types,
146  const std::vector<JoinBucketInfo>& join_bucket_info,
147  const JoinHashTableInterface::HashType layout);
148 
149  virtual int initHashTableOnGpu(const std::vector<JoinColumn>& join_columns,
150  const std::vector<JoinColumnTypeInfo>& join_column_types,
151  const std::vector<JoinBucketInfo>& join_bucket_info,
153  const size_t key_component_width,
154  const size_t key_component_count,
155  const int device_id);
156 
157  virtual llvm::Value* codegenKey(const CompilationOptions&);
158 
159  size_t shardCount() const;
160 
162  const std::vector<InnerOuter>& inner_outer_pairs) const;
163 
165  std::vector<const void*> sd_inner_proxy_per_key;
166  std::vector<const void*> sd_outer_proxy_per_key;
167  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
168  };
169 
171 
172  void reify();
173 
174  void reifyForDevice(const ColumnsForDevice& columns_for_device,
176  const int device_id,
177  const logger::ThreadId parent_thread_id);
178 
179  void checkHashJoinReplicationConstraint(const int table_id) const;
180 
181  int initHashTableForDevice(const std::vector<JoinColumn>& join_columns,
182  const std::vector<JoinColumnTypeInfo>& join_column_types,
183  const std::vector<JoinBucketInfo>& join_buckets,
185  const Data_Namespace::MemoryLevel effective_memory_level,
186  const int device_id);
187 
188  llvm::Value* hashPtr(const size_t index);
189 
191  const size_t num_elements;
192  const std::vector<ChunkKey> chunk_keys;
193  const SQLOps optype;
194  const boost::optional<double> overlaps_hashjoin_bucket_threshold;
195 
196  bool operator==(const struct HashTableCacheKey& that) const {
197  bool oeq;
199  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
200  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
201  } else {
204  }
205  return num_elements == that.num_elements && chunk_keys == that.chunk_keys &&
206  optype == that.optype && oeq;
207  }
208 
209  bool operator<(const struct HashTableCacheKey& that) const {
210  bool oeq;
212  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
213  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
214  } else {
217  }
218  return num_elements < that.num_elements && chunk_keys < that.chunk_keys &&
219  optype < that.optype && !oeq &&
221  }
222  };
223 
224  void initHashTableOnCpuFromCache(const HashTableCacheKey&);
225 
226  void putHashTableOnCpuToCache(const HashTableCacheKey&);
227 
228  std::pair<ssize_t, size_t> getApproximateTupleCountFromCache(
229  const HashTableCacheKey&) const;
230 
231  bool isBitwiseEq() const;
232 
233  void freeHashBufferMemory();
236 
237  const std::shared_ptr<Analyzer::BinOper> condition_;
238  const std::vector<InputTableInfo>& query_infos_;
241  size_t entry_count_; // number of keys in the hash table
242  size_t emitted_keys_count_; // number of keys emitted across all rows
243  Executor* executor_;
245  std::shared_ptr<std::vector<int8_t>> cpu_hash_table_buff_;
247 #ifdef HAVE_CUDA
248  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
249 #endif
250  std::vector<InnerOuter> inner_outer_pairs_;
252  const int device_count_;
253 #ifdef HAVE_CUDA
254  unsigned block_size_;
255  unsigned grid_size_;
256 #endif // HAVE_CUDA
257 
259  const std::shared_ptr<std::vector<int8_t>> buffer;
261  const size_t entry_count;
262  const size_t emitted_keys_count;
263  };
264 
266 
267  static std::vector<std::pair<HashTableCacheKey, HashTableCacheValue>> hash_table_cache_;
268  static std::mutex hash_table_cache_mutex_;
269 
270  static const int ERR_FAILED_TO_FETCH_COLUMN{-3};
272 };
273 
275  public:
276  static void set(const std::vector<ChunkKey>& key,
277  const JoinHashTableInterface::HashType hash_type);
278 
279  static std::pair<JoinHashTableInterface::HashType, bool> get(
280  const std::vector<ChunkKey>& key);
281 
282  private:
283  static std::map<std::vector<ChunkKey>, JoinHashTableInterface::HashType>
285  static std::mutex hash_type_cache_mutex_;
286 };
287 
288 #endif // QUERYENGINE_BASELINEJOINHASHTABLE_H
size_t offsetBufferOff() const noexceptoverride
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
static void set(const std::vector< ChunkKey > &key, const JoinHashTableInterface::HashType hash_type)
const boost::optional< double > overlaps_hashjoin_bucket_threshold
static std::map< std::vector< ChunkKey >, JoinHashTableInterface::HashType > hash_type_cache_
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, ThrustAllocator &dev_buff_owner)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
void putHashTableOnCpuToCache(const HashTableCacheKey &)
virtual int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex hash_type_cache_mutex_
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
ExecutorDeviceType
size_t getComponentBufferSize() const noexcept
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static auto yieldCacheInvalidator() -> std::function< void()>
SQLOps
Definition: sqldefs.h:29
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
size_t getKeyBufferSize() const noexcept
JoinHashTableInterface::HashType layout_
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
int getInnerTableRteIdx() const noexceptoverride
const HashTableCacheValue * findHashTableOnCpuInCache(const HashTableCacheKey &)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:111
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
virtual void reifyWithLayout(const JoinHashTableInterface::HashType layout)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
static const int ERR_FAILED_TO_FETCH_COLUMN
CompositeKeyInfo getCompositeKeyInfo() const
ColumnCacheMap & column_cache_
JoinHashTableInterface::HashType getHashType() const noexceptoverride
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
const std::vector< JoinColumnTypeInfo > join_column_types
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
const std::shared_ptr< std::vector< int8_t > > buffer
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
int getInnerTableId() const noexceptoverride
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
void checkHashJoinReplicationConstraint(const int table_id) const
bool operator<(const struct HashTableCacheKey &that) const
const Catalog_Namespace::Catalog * catalog_
const std::vector< JoinColumn > join_columns
uint64_t ThreadId
Definition: Logger.h:306
static std::mutex hash_table_cache_mutex_
const Data_Namespace::MemoryLevel memory_level_
virtual int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
llvm::Value * hashPtr(const size_t index)
std::vector< const void * > sd_outer_proxy_per_key
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
virtual size_t getKeyComponentCount() const
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
const std::vector< JoinBucketInfo > join_buckets
const std::vector< std::shared_ptr< void > > malloc_owner
virtual size_t getKeyComponentWidth() const
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
int getDeviceCount() const noexceptoverride
std::vector< const void * > sd_inner_proxy_per_key
const std::shared_ptr< Analyzer::BinOper > condition_
const JoinHashTableInterface::HashType type
bool operator==(const struct HashTableCacheKey &that) const
size_t countBufferOff() const noexceptoverride