OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaselineJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef QUERYENGINE_BASELINEJOINHASHTABLE_H
17 #define QUERYENGINE_BASELINEJOINHASHTABLE_H
18 
19 #include "../Analyzer/Analyzer.h"
20 #include "../DataMgr/MemoryLevel.h"
21 #include "ColumnarResults.h"
23 #include "HashJoinRuntime.h"
24 #include "InputMetadata.h"
25 #include "JoinHashTableInterface.h"
26 
27 #ifdef HAVE_CUDA
28 #include <cuda.h>
29 #endif
30 #include <cstdint>
31 #include <map>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_set>
35 #include <vector>
36 
37 class Executor;
38 
39 // Representation for a hash table using the baseline layout: an open-addressing
40 // hash with a fill rate of 50%. It is used for equi-joins on multiple columns and
41 // on single sparse columns (with very wide range), typically big integer. As of
42 // now, such tuples must be unique within the inner table.
44  public:
46  static std::shared_ptr<BaselineJoinHashTable> getInstance(
47  const std::shared_ptr<Analyzer::BinOper> condition,
48  const std::vector<InputTableInfo>& query_infos,
49  const Data_Namespace::MemoryLevel memory_level,
50  const HashType preferred_hash_type,
51  const int device_count,
52  ColumnCacheMap& column_cache,
53  Executor* executor);
54 
56  static std::shared_ptr<BaselineJoinHashTable> getSyntheticInstance(
57  std::string_view table1,
58  std::string_view column1,
59  std::string_view table2,
60  std::string_view column2,
61  const Data_Namespace::MemoryLevel memory_level,
62  const HashType preferred_hash_type,
63  const int device_count,
64  ColumnCacheMap& column_cache,
65  Executor* executor);
66 
67  static size_t getShardCountForCondition(
68  const Analyzer::BinOper* condition,
69  const Executor* executor,
70  const std::vector<InnerOuter>& inner_outer_pairs);
71 
72  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
73  const int device_id) const noexcept override;
74 
75  size_t getJoinHashBufferSize(const ExecutorDeviceType device_type,
76  const int device_id) const noexcept override;
77 
78  std::string toString(const ExecutorDeviceType device_type,
79  const int device_id,
80  bool raw = false) const noexcept override;
81 
82  std::set<DecodedJoinHashBufferEntry> decodeJoinHashBuffer(
83  const ExecutorDeviceType device_type,
84  const int device_id) const noexcept override;
85 
86  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
87 
89  const size_t) override;
90 
91  int getInnerTableId() const noexcept override;
92 
93  int getInnerTableRteIdx() const noexcept override;
94 
95  JoinHashTableInterface::HashType getHashType() const noexcept override;
96 
97  size_t offsetBufferOff() const noexcept override;
98 
99  size_t countBufferOff() const noexcept override;
100 
101  size_t payloadBufferOff() const noexcept override;
102 
103  static auto yieldCacheInvalidator() -> std::function<void()> {
104  return []() -> void {
105  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
106  hash_table_cache_.clear();
107  };
108  }
109 
111 
112  private:
113  size_t getComponentBufferSize() const noexcept;
114 
115  protected:
116  BaselineJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
117  const std::vector<InputTableInfo>& query_infos,
118  const Data_Namespace::MemoryLevel memory_level,
119  const HashType preferred_hash_type,
120  const size_t entry_count,
121  ColumnCacheMap& column_cache,
122  Executor* executor,
123  const std::vector<InnerOuter>& inner_outer_pairs);
124 
125  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs);
126 
127  virtual void reifyWithLayout(const int device_count,
128  const JoinHashTableInterface::HashType layout);
129 
131  const std::vector<JoinColumn> join_columns;
132  const std::vector<JoinColumnTypeInfo> join_column_types;
133  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
134  const std::vector<JoinBucketInfo> join_buckets;
135  };
136 
138  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
139  const int device_id);
140 
141  virtual std::pair<size_t, size_t> approximateTupleCount(
142  const std::vector<ColumnsForDevice>&) const;
143 
144  virtual size_t getKeyComponentWidth() const;
145 
146  virtual size_t getKeyComponentCount() const;
147 
148  virtual int initHashTableOnCpu(const std::vector<JoinColumn>& join_columns,
149  const std::vector<JoinColumnTypeInfo>& join_column_types,
150  const std::vector<JoinBucketInfo>& join_bucket_info,
151  const JoinHashTableInterface::HashType layout);
152 
153  virtual int initHashTableOnGpu(const std::vector<JoinColumn>& join_columns,
154  const std::vector<JoinColumnTypeInfo>& join_column_types,
155  const std::vector<JoinBucketInfo>& join_bucket_info,
157  const size_t key_component_width,
158  const size_t key_component_count,
159  const int device_id);
160 
161  virtual llvm::Value* codegenKey(const CompilationOptions&);
162 
163  std::pair<const int8_t*, size_t> getAllColumnFragments(
164  const Analyzer::ColumnVar& hash_col,
165  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
166  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
167 
168  size_t shardCount() const;
169 
171  const std::vector<InnerOuter>& inner_outer_pairs) const;
172 
174  std::vector<const void*> sd_inner_proxy_per_key;
175  std::vector<const void*> sd_outer_proxy_per_key;
176  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
177  };
178 
180 
181  void reify(const int device_count);
182 
183  JoinColumn fetchColumn(const Analyzer::ColumnVar* inner_col,
184  const Data_Namespace::MemoryLevel& effective_memory_level,
185  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
186  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
187  const int device_id);
188 
189  void reifyForDevice(const ColumnsForDevice& columns_for_device,
191  const int device_id);
192 
193  void checkHashJoinReplicationConstraint(const int table_id) const;
194 
195  int initHashTableForDevice(const std::vector<JoinColumn>& join_columns,
196  const std::vector<JoinColumnTypeInfo>& join_column_types,
197  const std::vector<JoinBucketInfo>& join_buckets,
199  const Data_Namespace::MemoryLevel effective_memory_level,
200  const int device_id);
201 
202  llvm::Value* hashPtr(const size_t index);
203 
205  const size_t num_elements;
206  const std::vector<ChunkKey> chunk_keys;
207  const SQLOps optype;
208  const boost::optional<double> overlaps_hashjoin_bucket_threshold;
209 
210  bool operator==(const struct HashTableCacheKey& that) const {
211  bool oeq;
213  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
214  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
215  } else {
218  }
219  return num_elements == that.num_elements && chunk_keys == that.chunk_keys &&
220  optype == that.optype && oeq;
221  }
222 
223  bool operator<(const struct HashTableCacheKey& that) const {
224  bool oeq;
226  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
227  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
228  } else {
231  }
232  return num_elements < that.num_elements && chunk_keys < that.chunk_keys &&
233  optype < that.optype && !oeq &&
235  }
236  };
237 
238  void initHashTableOnCpuFromCache(const HashTableCacheKey&);
239 
240  void putHashTableOnCpuToCache(const HashTableCacheKey&);
241 
242  std::pair<ssize_t, size_t> getApproximateTupleCountFromCache(
243  const HashTableCacheKey&) const;
244 
245  bool isBitwiseEq() const;
246 
247  void freeHashBufferMemory();
250 
251  const std::shared_ptr<Analyzer::BinOper> condition_;
252  const std::vector<InputTableInfo>& query_infos_;
255  size_t entry_count_; // number of keys in the hash table
256  size_t emitted_keys_count_; // number of keys emitted across all rows
257  Executor* executor_;
259  std::shared_ptr<std::vector<int8_t>> cpu_hash_table_buff_;
261 #ifdef HAVE_CUDA
262  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
263 #endif
264  typedef std::pair<const int8_t*, size_t> LinearizedColumn;
265  typedef std::pair<int, int> LinearizedColumnCacheKey;
266  std::map<LinearizedColumnCacheKey, LinearizedColumn> linearized_multifrag_columns_;
269  std::vector<InnerOuter> inner_outer_pairs_;
271 #ifdef HAVE_CUDA
272  unsigned block_size_;
273  unsigned grid_size_;
274 #endif // HAVE_CUDA
275 
277  const std::shared_ptr<std::vector<int8_t>> buffer;
279  const size_t entry_count;
280  const size_t emitted_keys_count;
281  };
282 
284 
285  static std::vector<std::pair<HashTableCacheKey, HashTableCacheValue>> hash_table_cache_;
286  static std::mutex hash_table_cache_mutex_;
287 
288  static const int ERR_FAILED_TO_FETCH_COLUMN{-3};
290 };
291 
293  public:
294  static void set(const std::vector<ChunkKey>& key,
295  const JoinHashTableInterface::HashType hash_type);
296 
297  static std::pair<JoinHashTableInterface::HashType, bool> get(
298  const std::vector<ChunkKey>& key);
299 
300  private:
301  static std::map<std::vector<ChunkKey>, JoinHashTableInterface::HashType>
303  static std::mutex hash_type_cache_mutex_;
304 };
305 
306 #endif // QUERYENGINE_BASELINEJOINHASHTABLE_H
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file,::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:106
size_t offsetBufferOff() const noexceptoverride
std::pair< const int8_t *, size_t > LinearizedColumn
static void set(const std::vector< ChunkKey > &key, const JoinHashTableInterface::HashType hash_type)
const boost::optional< double > overlaps_hashjoin_bucket_threshold
static std::map< std::vector< ChunkKey >, JoinHashTableInterface::HashType > hash_type_cache_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
void putHashTableOnCpuToCache(const HashTableCacheKey &)
virtual int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex hash_type_cache_mutex_
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
ExecutorDeviceType
size_t getComponentBufferSize() const noexcept
void reify(const int device_count)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static auto yieldCacheInvalidator() -> std::function< void()>
SQLOps
Definition: sqldefs.h:29
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
JoinHashTableInterface::HashType layout_
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
RowSetMemoryOwner linearized_multifrag_column_owner_
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
int getInnerTableRteIdx() const noexceptoverride
const HashTableCacheValue * findHashTableOnCpuInCache(const HashTableCacheKey &)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
static const int ERR_FAILED_TO_FETCH_COLUMN
CompositeKeyInfo getCompositeKeyInfo() const
ColumnCacheMap & column_cache_
JoinHashTableInterface::HashType getHashType() const noexceptoverride
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
std::pair< int, int > LinearizedColumnCacheKey
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
const std::vector< JoinColumnTypeInfo > join_column_types
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
const std::shared_ptr< std::vector< int8_t > > buffer
int getInnerTableId() const noexceptoverride
JoinColumn fetchColumn(const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
void checkHashJoinReplicationConstraint(const int table_id) const
bool operator<(const struct HashTableCacheKey &that) const
const Catalog_Namespace::Catalog * catalog_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
const std::vector< JoinColumn > join_columns
static std::mutex hash_table_cache_mutex_
std::string toString(const ExecutorDeviceType device_type, const int device_id, bool raw=false) const noexceptoverride
std::map< LinearizedColumnCacheKey, LinearizedColumn > linearized_multifrag_columns_
const Data_Namespace::MemoryLevel memory_level_
std::mutex linearized_multifrag_column_mutex_
virtual int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
llvm::Value * hashPtr(const size_t index)
std::vector< const void * > sd_outer_proxy_per_key
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
virtual ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
virtual size_t getKeyComponentCount() const
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
const std::vector< JoinBucketInfo > join_buckets
static std::shared_ptr< BaselineJoinHashTable > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
virtual size_t getKeyComponentWidth() const
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
virtual void reifyWithLayout(const int device_count, const JoinHashTableInterface::HashType layout)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
std::vector< const void * > sd_inner_proxy_per_key
const std::shared_ptr< Analyzer::BinOper > condition_
std::set< DecodedJoinHashBufferEntry > decodeJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
const JoinHashTableInterface::HashType type
bool operator==(const struct HashTableCacheKey &that) const
size_t countBufferOff() const noexceptoverride