OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
23 
25  public:
26  OverlapsJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
27  const JoinType join_type,
28  const std::vector<InputTableInfo>& query_infos,
29  const Data_Namespace::MemoryLevel memory_level,
30  ColumnCacheMap& column_cache,
31  Executor* executor,
32  const std::vector<InnerOuter>& inner_outer_pairs,
33  const int device_count,
34  const RegisteredQueryHint& query_hints,
35  const HashTableBuildDagMap& hashtable_build_dag_map,
36  const TableIdToNodeMap& table_id_to_node_map)
37  : condition_(condition)
38  , join_type_(join_type)
39  , query_infos_(query_infos)
40  , memory_level_(memory_level)
41  , executor_(executor)
42  , column_cache_(column_cache)
43  , inner_outer_pairs_(inner_outer_pairs)
44  , device_count_(device_count)
45  , query_hints_(query_hints)
46  , hashtable_build_dag_map_(hashtable_build_dag_map)
47  , table_id_to_node_map_(table_id_to_node_map) {
49  hash_tables_for_device_.resize(std::max(device_count_, 1));
50  }
51 
53 
55  static std::shared_ptr<OverlapsJoinHashTable> getInstance(
56  const std::shared_ptr<Analyzer::BinOper> condition,
57  const std::vector<InputTableInfo>& query_infos,
58  const Data_Namespace::MemoryLevel memory_level,
59  const JoinType join_type,
60  const int device_count,
61  ColumnCacheMap& column_cache,
62  Executor* executor,
63  const HashTableBuildDagMap& hashtable_build_dag_map,
64  const RegisteredQueryHint& query_hint,
65  const TableIdToNodeMap& table_id_to_node_map);
66 
67  static void invalidateCache() {
69  auto_tuner_cache_->clearCache();
70 
72  hash_table_cache_->clearCache();
73  }
74 
75  static void markCachedItemAsDirty(size_t table_key) {
78  auto candidate_table_keys =
79  hash_table_cache_->getMappedQueryPlanDagsWithTableKey(table_key);
80  if (candidate_table_keys.has_value()) {
81  auto_tuner_cache_->markCachedItemAsDirty(table_key,
82  *candidate_table_keys,
85  hash_table_cache_->markCachedItemAsDirty(table_key,
86  *candidate_table_keys,
89  }
90  }
91 
94  return hash_table_cache_.get();
95  }
96 
99  return auto_tuner_cache_.get();
100  }
101 
102  protected:
103  void reify(const HashType preferred_layout);
104 
105  virtual void reifyWithLayout(const HashType layout);
106 
107  virtual void reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
108  const Fragmenter_Namespace::TableInfo& query_info,
109  const HashType layout,
110  const size_t shard_count,
111  const size_t entry_count,
112  const size_t emitted_keys_count,
113  const bool skip_hashtable_caching,
114  const size_t chosen_max_hashtable_size,
115  const double chosen_bucket_threshold);
116 
117  void reifyForDevice(const ColumnsForDevice& columns_for_device,
118  const HashType layout,
119  const size_t entry_count,
120  const size_t emitted_keys_count,
121  const bool skip_hashtable_caching,
122  const int device_id,
123  const logger::ThreadLocalIds parent_thread_local_ids);
124 
125  size_t calculateHashTableSize(size_t number_of_dimensions,
126  size_t emitted_keys_count,
127  size_t entry_count) const;
128 
130  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
131  const int device_id,
132  DeviceAllocator* dev_buff_owner);
133 
134  // returns entry_count, emitted_keys_count
135  virtual std::pair<size_t, size_t> approximateTupleCount(
136  const std::vector<double>& inverse_bucket_sizes_for_dimension,
137  std::vector<ColumnsForDevice>&,
138  const size_t chosen_max_hashtable_size,
139  const double chosen_bucket_threshold);
140 
141  // returns entry_count, emitted_keys_count
142  virtual std::pair<size_t, size_t> computeHashTableCounts(
143  const size_t shard_count,
144  const std::vector<double>& inverse_bucket_sizes_for_dimension,
145  std::vector<ColumnsForDevice>& columns_per_device,
146  const size_t chosen_max_hashtable_size,
147  const double chosen_bucket_threshold);
148 
149  void setInverseBucketSizeInfo(const std::vector<double>& inverse_bucket_sizes,
150  std::vector<ColumnsForDevice>& columns_per_device,
151  const size_t device_count);
152 
153  size_t getKeyComponentWidth() const;
154 
155  size_t getKeyComponentCount() const;
156 
157  HashType getHashType() const noexcept override {
158  if (layout_override_) {
159  return *layout_override_;
160  }
161  auto hash_table = getHashTableForDevice(0);
162  CHECK(hash_table);
163  return hash_table->getLayout();
164  }
165 
166  Data_Namespace::MemoryLevel getMemoryLevel() const noexcept override {
167  return memory_level_;
168  }
169 
170  int getDeviceCount() const noexcept override { return device_count_; };
171 
172  std::shared_ptr<BaselineHashTable> initHashTableOnCpu(
173  const std::vector<JoinColumn>& join_columns,
174  const std::vector<JoinColumnTypeInfo>& join_column_types,
175  const std::vector<JoinBucketInfo>& join_bucket_info,
176  const HashType layout,
177  const size_t entry_count,
178  const size_t emitted_keys_count,
179  const bool skip_hashtable_caching);
180 
181 #ifdef HAVE_CUDA
182  std::shared_ptr<BaselineHashTable> initHashTableOnGpu(
183  const std::vector<JoinColumn>& join_columns,
184  const std::vector<JoinColumnTypeInfo>& join_column_types,
185  const std::vector<JoinBucketInfo>& join_bucket_info,
186  const HashType layout,
187  const size_t entry_count,
188  const size_t emitted_keys_count,
189  const size_t device_id);
190 
191  std::shared_ptr<BaselineHashTable> copyCpuHashTableToGpu(
192  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
193  const HashType layout,
194  const size_t entry_count,
195  const size_t emitted_keys_count,
196  const size_t device_id);
197 #endif // HAVE_CUDA
198 
200  const size_t) override;
201 
202  std::string toString(const ExecutorDeviceType device_type,
203  const int device_id = 0,
204  bool raw = false) const override;
205 
207  const int device_id) const override;
208 
209  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override {
210  UNREACHABLE(); // not applicable for overlaps join
211  return nullptr;
212  }
213 
215 
216  size_t getEntryCount() const {
217  auto hash_table = getHashTableForDevice(0);
218  CHECK(hash_table);
219  return hash_table->getEntryCount();
220  }
221 
222  size_t getEmittedKeysCount() const {
223  auto hash_table = getHashTableForDevice(0);
224  CHECK(hash_table);
225  return hash_table->getEmittedKeysCount();
226  }
227 
228  size_t getComponentBufferSize() const noexcept override {
229  CHECK(!hash_tables_for_device_.empty());
230  auto hash_table = hash_tables_for_device_.front();
231  CHECK(hash_table);
232  return hash_table->getEntryCount() * sizeof(int32_t);
233  }
234 
235  size_t shardCount() const {
237  return 0;
238  }
241  }
242 
244  const std::vector<InnerOuter>& inner_outer_pairs) const;
245 
246  int getInnerTableId() const noexcept override;
247 
248  int getInnerTableRteIdx() const noexcept override {
249  CHECK(!inner_outer_pairs_.empty());
250  const auto first_inner_col = inner_outer_pairs_.front().first;
251  return first_inner_col->get_rte_idx();
252  }
253 
254  size_t getKeyBufferSize() const noexcept {
255  const auto key_component_width = getKeyComponentWidth();
256  CHECK(key_component_width == 4 || key_component_width == 8);
257  const auto key_component_count = getKeyComponentCount();
259  return getEntryCount() * key_component_count * key_component_width;
260  } else {
261  return getEntryCount() * (key_component_count + 1) * key_component_width;
262  }
263  }
264 
265  size_t offsetBufferOff() const noexcept override { return getKeyBufferSize(); }
266 
267  size_t countBufferOff() const noexcept override {
270  } else {
271  return getKeyBufferSize();
272  }
273  }
274 
275  size_t payloadBufferOff() const noexcept override {
278  } else {
279  return getKeyBufferSize();
280  }
281  }
282 
283  std::string getHashJoinType() const final { return "Overlaps"; }
284 
285  bool isBitwiseEq() const override;
286 
287  std::shared_ptr<HashTable> initHashTableOnCpuFromCache(
288  QueryPlanHash key,
289  CacheItemType item_type,
290  DeviceIdentifier device_identifier);
291 
292  std::optional<std::pair<size_t, size_t>> getApproximateTupleCountFromCache(
293  QueryPlanHash key,
294  CacheItemType item_type,
295  DeviceIdentifier device_identifier);
296 
298  CacheItemType item_type,
299  std::shared_ptr<HashTable> hashtable_ptr,
300  DeviceIdentifier device_identifier,
301  size_t hashtable_building_time);
302 
303  llvm::Value* codegenKey(const CompilationOptions&);
304  std::vector<llvm::Value*> codegenManyKey(const CompilationOptions&);
305 
306  std::optional<OverlapsHashTableMetaInfo> getOverlapsHashTableMetaInfo() {
308  }
309 
311  std::vector<InnerOuter> inner_outer_pairs;
312  const size_t num_elements;
313  const size_t chunk_key_hash;
314  const SQLOps optype;
315  const size_t max_hashtable_size;
316  const double bucket_threshold;
317  const std::vector<double> inverse_bucket_sizes = {};
318  };
319 
321  auto hash = info.chunk_key_hash;
322  for (InnerOuter inner_outer : info.inner_outer_pairs) {
323  auto inner_col = inner_outer.first;
324  auto rhs_col_var = dynamic_cast<const Analyzer::ColumnVar*>(inner_outer.second);
325  auto outer_col = rhs_col_var ? rhs_col_var : inner_col;
326  boost::hash_combine(hash, inner_col->toString());
327  if (inner_col->get_type_info().is_string()) {
328  boost::hash_combine(hash, outer_col->toString());
329  }
330  }
331  boost::hash_combine(hash, info.num_elements);
332  boost::hash_combine(hash, info.optype);
333  boost::hash_combine(hash, info.max_hashtable_size);
334  boost::hash_combine(hash, info.bucket_threshold);
335  boost::hash_combine(hash, info.inverse_bucket_sizes);
336  return hash;
337  }
338 
340  const size_t max_hashtable_size,
341  const double bucket_threshold,
342  const std::vector<double>& bucket_sizes,
343  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>>& fragments_per_device,
344  int device_count) {
345  for (int device_id = 0; device_id < device_count; ++device_id) {
346  auto hash_val = boost::hash_value(hashtable_cache_key_[device_id]);
347  boost::hash_combine(hash_val, max_hashtable_size);
348  boost::hash_combine(hash_val, bucket_threshold);
349  boost::hash_combine(hash_val, bucket_sizes);
350  boost::hash_combine(hash_val,
351  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
352  hashtable_cache_key_[device_id] = hash_val;
353  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
354  table_keys_);
355  }
356  }
357 
358  QueryPlanHash getCacheKey(int device_id) const {
359  return hashtable_cache_key_[device_id];
360  }
361 
362  const std::vector<InnerOuter>& getInnerOuterPairs() const { return inner_outer_pairs_; }
363 
364  void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes,
365  double bucket_threshold,
366  std::vector<double>& bucket_sizes) {
367  OverlapsHashTableMetaInfo overlaps_meta_info;
368  overlaps_meta_info.bucket_sizes = bucket_sizes;
369  overlaps_meta_info.overlaps_max_table_size_bytes = max_table_size_bytes;
370  overlaps_meta_info.overlaps_bucket_threshold = bucket_threshold;
371  HashtableCacheMetaInfo meta_info;
372  meta_info.overlaps_meta_info = overlaps_meta_info;
373  hashtable_cache_meta_info_ = meta_info;
374  }
375 
376  const std::shared_ptr<Analyzer::BinOper> condition_;
378  const std::vector<InputTableInfo>& query_infos_;
380 
381  Executor* executor_;
383 
384  std::vector<InnerOuter> inner_outer_pairs_;
385  const int device_count_;
387 
392 
393  std::optional<HashType>
394  layout_override_; // allows us to use a 1:many hash table for many:many
395 
397 
398  // cache a hashtable based on the cache key C
399  // C = query plan dag D + join col J + hashtable params P
400  // by varying overlaps join hashtable parameters P, we can build
401  // multiple (and different) hashtables for the same query plan dag D
402  // in this scenario, the rule we follow is cache everything
403  // with the assumption that varying P is intended by user
404  // for the performance and so worth to keep it for future recycling
405  static std::unique_ptr<HashtableRecycler> hash_table_cache_;
406  // auto tuner cache is maintained separately with hashtable cache
407  static std::unique_ptr<OverlapsTuningParamRecycler> auto_tuner_cache_;
408 
411  std::vector<QueryPlanHash> hashtable_cache_key_;
413  std::unordered_set<size_t> table_keys_;
415 };
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:451
llvm::Value * codegenKey(const CompilationOptions &)
static void markCachedItemAsDirty(size_t table_key)
int getInnerTableId() const noexceptoverride
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:164
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
const TableIdToNodeMap table_id_to_node_map_
ExecutorDeviceType
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::string getHashJoinType() const final
std::string QueryPlanDAG
SQLOps
Definition: sqldefs.h:28
int getInnerTableRteIdx() const noexceptoverride
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const RegisteredQueryHint & getRegisteredQueryHint()
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:361
#define UNREACHABLE()
Definition: Logger.h:333
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static HashtableRecycler * getHashTableCache()
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
#define CHECK_GT(x, y)
Definition: Logger.h:301
QueryPlanHash getCacheKey(int device_id) const
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::shared_ptr< Analyzer::BinOper > condition_
int getDeviceCount() const noexceptoverride
OverlapsJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const JoinType join_type, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:38
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadLocalIds parent_thread_local_ids)
ColumnCacheMap & column_cache_
RegisteredQueryHint query_hints_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
const std::vector< InnerOuter > & getInnerOuterPairs() const
size_t payloadBufferOff() const noexceptoverride
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashTableBuildDagMap hashtable_build_dag_map_
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::vector< QueryPlanHash > hashtable_cache_key_
static OverlapsTuningParamRecycler * getOverlapsTuningParamCache()
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:279
std::vector< double > inverse_bucket_sizes_for_dimension_
std::optional< HashType > layout_override_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
size_t QueryPlanHash
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
HashtableCacheMetaInfo hashtable_cache_meta_info_
#define CHECK(condition)
Definition: Logger.h:289
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getEmittedKeysCount() const
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< double > bucket_sizes
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
CompositeKeyInfo composite_key_info_
bool isBitwiseEq() const override
HashType
Definition: HashTable.h:19
size_t getKeyBufferSize() const noexcept
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:160