OmniSciDB  04ee39c94c
BaselineJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef QUERYENGINE_BASELINEJOINHASHTABLE_H
17 #define QUERYENGINE_BASELINEJOINHASHTABLE_H
18 
19 #include "../Analyzer/Analyzer.h"
20 #include "../DataMgr/MemoryLevel.h"
21 #include "ColumnarResults.h"
23 #include "HashJoinRuntime.h"
24 #include "InputMetadata.h"
25 #include "JoinHashTableInterface.h"
26 
27 #ifdef HAVE_CUDA
28 #include <cuda.h>
29 #endif
30 #include <cstdint>
31 #include <map>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_set>
35 #include <vector>
36 
37 class Executor;
38 
39 // Representation for a hash table using the baseline layout: an open-addressing
40 // hash with a fill rate of 50%. It is used for equi-joins on multiple columns and
41 // on single sparse columns (with very wide range), typically big integer. As of
42 // now, such tuples must be unique within the inner table.
44  public:
45  static std::shared_ptr<BaselineJoinHashTable> getInstance(
46  const std::shared_ptr<Analyzer::BinOper> condition,
47  const std::vector<InputTableInfo>& query_infos,
48  const Data_Namespace::MemoryLevel memory_level,
49  const HashType preferred_hash_type,
50  const int device_count,
51  ColumnCacheMap& column_map,
52  Executor* executor);
53 
54  static size_t getShardCountForCondition(
55  const Analyzer::BinOper* condition,
56  const Executor* executor,
57  const std::vector<InnerOuter>& inner_outer_pairs);
58 
59  int64_t getJoinHashBuffer(const ExecutorDeviceType device_type,
60  const int device_id) noexcept override;
61 
62  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override;
63 
65  const size_t) override;
66 
67  int getInnerTableId() const noexcept override;
68 
69  int getInnerTableRteIdx() const noexcept override;
70 
71  JoinHashTableInterface::HashType getHashType() const noexcept override;
72 
73  size_t offsetBufferOff() const noexcept override;
74 
75  size_t countBufferOff() const noexcept override;
76 
77  size_t payloadBufferOff() const noexcept override;
78 
79  static auto yieldCacheInvalidator() -> std::function<void()> {
80  return []() -> void {
81  std::lock_guard<std::mutex> guard(hash_table_cache_mutex_);
82  hash_table_cache_.clear();
83  };
84  }
85 
87 
88  private:
89  size_t getComponentBufferSize() const noexcept;
90 
91  protected:
92  BaselineJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
93  const std::vector<InputTableInfo>& query_infos,
94  const Data_Namespace::MemoryLevel memory_level,
95  const HashType preferred_hash_type,
96  const size_t entry_count,
97  ColumnCacheMap& column_map,
98  Executor* executor,
99  const std::vector<InnerOuter>& inner_outer_pairs);
100 
101  static int getInnerTableId(const std::vector<InnerOuter>& inner_outer_pairs);
102 
103  virtual void reifyWithLayout(const int device_count,
104  const JoinHashTableInterface::HashType layout);
105 
107  const std::vector<JoinColumn> join_columns;
108  const std::vector<JoinColumnTypeInfo> join_column_types;
109  const std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
110  const std::vector<JoinBucketInfo> join_buckets;
111  };
112 
114  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
115  const int device_id);
116 
117  virtual std::pair<size_t, size_t> approximateTupleCount(
118  const std::vector<ColumnsForDevice>&) const;
119 
120  virtual size_t getKeyComponentWidth() const;
121 
122  virtual size_t getKeyComponentCount() const;
123 
124  virtual int initHashTableOnCpu(const std::vector<JoinColumn>& join_columns,
125  const std::vector<JoinColumnTypeInfo>& join_column_types,
126  const std::vector<JoinBucketInfo>& join_bucket_info,
127  const JoinHashTableInterface::HashType layout);
128 
129  virtual int initHashTableOnGpu(const std::vector<JoinColumn>& join_columns,
130  const std::vector<JoinColumnTypeInfo>& join_column_types,
131  const std::vector<JoinBucketInfo>& join_bucket_info,
133  const size_t key_component_width,
134  const size_t key_component_count,
135  const int device_id);
136 
137  virtual llvm::Value* codegenKey(const CompilationOptions&);
138 
139  std::pair<const int8_t*, size_t> getAllColumnFragments(
140  const Analyzer::ColumnVar& hash_col,
141  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
142  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
143 
144  size_t shardCount() const;
145 
147  const std::vector<InnerOuter>& inner_outer_pairs) const;
148 
150  std::vector<const void*> sd_inner_proxy_per_key;
151  std::vector<const void*> sd_outer_proxy_per_key;
152  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
153  };
154 
156 
157  void reify(const int device_count);
158 
159  JoinColumn fetchColumn(const Analyzer::ColumnVar* inner_col,
160  const Data_Namespace::MemoryLevel& effective_memory_level,
161  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
162  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
163  const int device_id);
164 
165  void reifyForDevice(const ColumnsForDevice& columns_for_device,
167  const int device_id);
168 
169  void checkHashJoinReplicationConstraint(const int table_id) const;
170 
171  int initHashTableForDevice(const std::vector<JoinColumn>& join_columns,
172  const std::vector<JoinColumnTypeInfo>& join_column_types,
173  const std::vector<JoinBucketInfo>& join_buckets,
175  const Data_Namespace::MemoryLevel effective_memory_level,
176  const int device_id);
177 
178  llvm::Value* hashPtr(const size_t index);
179 
181  const size_t num_elements;
182  const std::vector<ChunkKey> chunk_keys;
183  const SQLOps optype;
184  const boost::optional<double> overlaps_hashjoin_bucket_threshold;
185 
186  bool operator==(const struct HashTableCacheKey& that) const {
187  bool oeq;
188  if (overlaps_hashjoin_bucket_threshold && that.overlaps_hashjoin_bucket_threshold) {
189  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
190  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
191  } else {
192  oeq = (overlaps_hashjoin_bucket_threshold ==
194  }
195  return num_elements == that.num_elements && chunk_keys == that.chunk_keys &&
196  optype == that.optype && oeq;
197  }
198 
199  bool operator<(const struct HashTableCacheKey& that) const {
200  bool oeq;
201  if (overlaps_hashjoin_bucket_threshold && that.overlaps_hashjoin_bucket_threshold) {
202  oeq = (std::abs(*overlaps_hashjoin_bucket_threshold -
203  *that.overlaps_hashjoin_bucket_threshold) <= 0.00000001);
204  } else {
205  oeq = (overlaps_hashjoin_bucket_threshold ==
207  }
208  return num_elements < that.num_elements && chunk_keys < that.chunk_keys &&
209  optype < that.optype && !oeq &&
210  overlaps_hashjoin_bucket_threshold < that.overlaps_hashjoin_bucket_threshold;
211  }
212  };
213 
215 
217 
218  std::pair<ssize_t, size_t> getApproximateTupleCountFromCache(
219  const HashTableCacheKey&) const;
220 
221  bool isBitwiseEq() const;
222 
223  void freeHashBufferMemory();
226 
227  const std::shared_ptr<Analyzer::BinOper> condition_;
228  const std::vector<InputTableInfo>& query_infos_;
231  size_t entry_count_; // number of keys in the hash table
232  size_t emitted_keys_count_; // number of keys emitted across all rows
233  Executor* executor_;
235  std::shared_ptr<std::vector<int8_t>> cpu_hash_table_buff_;
237 #ifdef HAVE_CUDA
238  std::vector<Data_Namespace::AbstractBuffer*> gpu_hash_table_buff_;
239 #endif
240  typedef std::pair<const int8_t*, size_t> LinearizedColumn;
241  typedef std::pair<int, int> LinearizedColumnCacheKey;
242  std::map<LinearizedColumnCacheKey, LinearizedColumn> linearized_multifrag_columns_;
245  std::vector<InnerOuter> inner_outer_pairs_;
247 #ifdef HAVE_CUDA
248  unsigned block_size_;
249  unsigned grid_size_;
250 #endif // HAVE_CUDA
251 
253  const std::shared_ptr<std::vector<int8_t>> buffer;
255  const size_t entry_count;
256  const size_t emitted_keys_count;
257  };
258 
260 
261  static std::vector<std::pair<HashTableCacheKey, HashTableCacheValue>> hash_table_cache_;
262  static std::mutex hash_table_cache_mutex_;
263 
264  static const int ERR_FAILED_TO_FETCH_COLUMN{-3};
266 };
267 
269  public:
270  static void set(const std::vector<ChunkKey>& key,
271  const JoinHashTableInterface::HashType hash_type);
272 
273  static std::pair<JoinHashTableInterface::HashType, bool> get(
274  const std::vector<ChunkKey>& key);
275 
276  private:
277  static std::map<std::vector<ChunkKey>, JoinHashTableInterface::HashType>
279  static std::mutex hash_type_cache_mutex_;
280 };
281 
282 #endif // QUERYENGINE_BASELINEJOINHASHTABLE_H
size_t countBufferOff() const noexcept override
std::pair< const int8_t *, size_t > LinearizedColumn
const boost::optional< double > overlaps_hashjoin_bucket_threshold
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
static std::map< std::vector< ChunkKey >, JoinHashTableInterface::HashType > hash_type_cache_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
void putHashTableOnCpuToCache(const HashTableCacheKey &)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) noexcept override
virtual int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id)
virtual size_t getKeyComponentCount() const
static std::mutex hash_type_cache_mutex_
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
ExecutorDeviceType
size_t getComponentBufferSize() const noexcept
void reify(const int device_count)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static auto yieldCacheInvalidator() -> std::function< void()>
SQLOps
Definition: sqldefs.h:29
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_map, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
JoinHashTableInterface::HashType layout_
std::pair< const int8_t *, size_t > getAllColumnFragments(const Analyzer::ColumnVar &hash_col, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
CompositeKeyInfo getCompositeKeyInfo() const
RowSetMemoryOwner linearized_multifrag_column_owner_
const std::vector< std::shared_ptr< Chunk_NS::Chunk > > chunks_owner
const HashTableCacheValue * findHashTableOnCpuInCache(const HashTableCacheKey &)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
bool operator<(const struct HashTableCacheKey &that) const
std::vector< InnerOuter > inner_outer_pairs_
int getInnerTableRteIdx() const noexcept override
static const int ERR_FAILED_TO_FETCH_COLUMN
ColumnCacheMap & column_cache_
std::pair< int, int > LinearizedColumnCacheKey
bool operator==(const struct HashTableCacheKey &that) const
int getInnerTableId() const noexcept override
const std::vector< JoinColumnTypeInfo > join_column_types
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
const std::shared_ptr< std::vector< int8_t > > buffer
void checkHashJoinReplicationConstraint(const int table_id) const
size_t payloadBufferOff() const noexcept override
JoinColumn fetchColumn(const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
const Catalog_Namespace::Catalog * catalog_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
virtual size_t getKeyComponentWidth() const
const std::vector< JoinColumn > join_columns
size_t offsetBufferOff() const noexcept override
static std::mutex hash_table_cache_mutex_
std::map< LinearizedColumnCacheKey, LinearizedColumn > linearized_multifrag_columns_
const Data_Namespace::MemoryLevel memory_level_
std::mutex linearized_multifrag_column_mutex_
virtual int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
llvm::Value * hashPtr(const size_t index)
std::vector< const void * > sd_outer_proxy_per_key
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file, ::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:101
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
virtual ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
const std::vector< JoinBucketInfo > join_buckets
virtual void reifyWithLayout(const int device_count, const JoinHashTableInterface::HashType layout)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_map, Executor *executor)
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
JoinHashTableInterface::HashType getHashType() const noexcept override
std::vector< const void * > sd_inner_proxy_per_key
const std::shared_ptr< Analyzer::BinOper > condition_
const JoinHashTableInterface::HashType type