OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
23 
25  public:
26  OverlapsJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
27  const JoinType join_type,
28  const std::vector<InputTableInfo>& query_infos,
29  const Data_Namespace::MemoryLevel memory_level,
30  ColumnCacheMap& column_cache,
31  Executor* executor,
32  const std::vector<InnerOuter>& inner_outer_pairs,
33  const int device_count,
34  const HashTableBuildDagMap& hashtable_build_dag_map,
35  const TableIdToNodeMap& table_id_to_node_map)
36  : condition_(condition)
37  , join_type_(join_type)
38  , query_infos_(query_infos)
39  , memory_level_(memory_level)
40  , executor_(executor)
41  , column_cache_(column_cache)
42  , inner_outer_pairs_(inner_outer_pairs)
43  , device_count_(device_count)
44  , hashtable_build_dag_map_(hashtable_build_dag_map)
45  , table_id_to_node_map_(table_id_to_node_map) {
47  hash_tables_for_device_.resize(std::max(device_count_, 1));
49  }
50 
52 
54  static std::shared_ptr<OverlapsJoinHashTable> getInstance(
55  const std::shared_ptr<Analyzer::BinOper> condition,
56  const std::vector<InputTableInfo>& query_infos,
57  const Data_Namespace::MemoryLevel memory_level,
58  const JoinType join_type,
59  const int device_count,
60  ColumnCacheMap& column_cache,
61  Executor* executor,
62  const HashTableBuildDagMap& hashtable_build_dag_map,
63  const RegisteredQueryHint& query_hint,
64  const TableIdToNodeMap& table_id_to_node_map);
65 
66  static void invalidateCache() {
68  auto_tuner_cache_->clearCache();
69 
71  hash_table_cache_->clearCache();
72  }
73 
74  static void markCachedItemAsDirty(size_t table_key) {
77  auto candidate_table_keys =
78  hash_table_cache_->getMappedQueryPlanDagsWithTableKey(table_key);
79  if (candidate_table_keys.has_value()) {
80  auto_tuner_cache_->markCachedItemAsDirty(table_key,
81  *candidate_table_keys,
84  hash_table_cache_->markCachedItemAsDirty(table_key,
85  *candidate_table_keys,
88  }
89  }
90 
93  return hash_table_cache_.get();
94  }
95 
98  return auto_tuner_cache_.get();
99  }
100 
101  protected:
102  void reify(const HashType preferred_layout);
103 
104  virtual void reifyWithLayout(const HashType layout);
105 
106  virtual void reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
107  const Fragmenter_Namespace::TableInfo& query_info,
108  const HashType layout,
109  const size_t shard_count,
110  const size_t entry_count,
111  const size_t emitted_keys_count,
112  const bool skip_hashtable_caching,
113  const size_t chosen_max_hashtable_size,
114  const double chosen_bucket_threshold);
115 
116  void reifyForDevice(const ColumnsForDevice& columns_for_device,
117  const HashType layout,
118  const size_t entry_count,
119  const size_t emitted_keys_count,
120  const bool skip_hashtable_caching,
121  const int device_id,
122  const logger::ThreadId parent_thread_id);
123 
124  size_t calculateHashTableSize(size_t number_of_dimensions,
125  size_t emitted_keys_count,
126  size_t entry_count) const;
127 
129  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
130  const int device_id,
131  DeviceAllocator* dev_buff_owner);
132 
133  // returns entry_count, emitted_keys_count
134  virtual std::pair<size_t, size_t> approximateTupleCount(
135  const std::vector<double>& inverse_bucket_sizes_for_dimension,
136  std::vector<ColumnsForDevice>&,
137  const size_t chosen_max_hashtable_size,
138  const double chosen_bucket_threshold);
139 
140  // returns entry_count, emitted_keys_count
141  virtual std::pair<size_t, size_t> computeHashTableCounts(
142  const size_t shard_count,
143  const std::vector<double>& inverse_bucket_sizes_for_dimension,
144  std::vector<ColumnsForDevice>& columns_per_device,
145  const size_t chosen_max_hashtable_size,
146  const double chosen_bucket_threshold);
147 
148  void setInverseBucketSizeInfo(const std::vector<double>& inverse_bucket_sizes,
149  std::vector<ColumnsForDevice>& columns_per_device,
150  const size_t device_count);
151 
152  size_t getKeyComponentWidth() const;
153 
154  size_t getKeyComponentCount() const;
155 
156  HashType getHashType() const noexcept override {
157  if (layout_override_) {
158  return *layout_override_;
159  }
160  auto hash_table = getHashTableForDevice(0);
161  CHECK(hash_table);
162  return hash_table->getLayout();
163  }
164 
165  Data_Namespace::MemoryLevel getMemoryLevel() const noexcept override {
166  return memory_level_;
167  }
168 
169  int getDeviceCount() const noexcept override { return device_count_; };
170 
171  std::shared_ptr<BaselineHashTable> initHashTableOnCpu(
172  const std::vector<JoinColumn>& join_columns,
173  const std::vector<JoinColumnTypeInfo>& join_column_types,
174  const std::vector<JoinBucketInfo>& join_bucket_info,
175  const HashType layout,
176  const size_t entry_count,
177  const size_t emitted_keys_count,
178  const bool skip_hashtable_caching);
179 
180 #ifdef HAVE_CUDA
181  std::shared_ptr<BaselineHashTable> initHashTableOnGpu(
182  const std::vector<JoinColumn>& join_columns,
183  const std::vector<JoinColumnTypeInfo>& join_column_types,
184  const std::vector<JoinBucketInfo>& join_bucket_info,
185  const HashType layout,
186  const size_t entry_count,
187  const size_t emitted_keys_count,
188  const size_t device_id);
189 
190  std::shared_ptr<BaselineHashTable> copyCpuHashTableToGpu(
191  std::shared_ptr<BaselineHashTable>& cpu_hash_table,
192  const HashType layout,
193  const size_t entry_count,
194  const size_t emitted_keys_count,
195  const size_t device_id);
196 #endif // HAVE_CUDA
197 
199  const size_t) override;
200 
201  std::string toString(const ExecutorDeviceType device_type,
202  const int device_id = 0,
203  bool raw = false) const override;
204 
206  const int device_id) const override;
207 
208  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override {
209  UNREACHABLE(); // not applicable for overlaps join
210  return nullptr;
211  }
212 
214 
215  void registerQueryHint(const RegisteredQueryHint& query_hint) {
216  query_hint_ = query_hint;
217  }
218 
219  size_t getEntryCount() const {
220  auto hash_table = getHashTableForDevice(0);
221  CHECK(hash_table);
222  return hash_table->getEntryCount();
223  }
224 
225  size_t getEmittedKeysCount() const {
226  auto hash_table = getHashTableForDevice(0);
227  CHECK(hash_table);
228  return hash_table->getEmittedKeysCount();
229  }
230 
231  size_t getComponentBufferSize() const noexcept override {
232  CHECK(!hash_tables_for_device_.empty());
233  auto hash_table = hash_tables_for_device_.front();
234  CHECK(hash_table);
235  return hash_table->getEntryCount() * sizeof(int32_t);
236  }
237 
238  size_t shardCount() const {
240  return 0;
241  }
244  }
245 
247  const std::vector<InnerOuter>& inner_outer_pairs) const;
248 
249  int getInnerTableId() const noexcept override;
250 
251  int getInnerTableRteIdx() const noexcept override {
252  CHECK(!inner_outer_pairs_.empty());
253  const auto first_inner_col = inner_outer_pairs_.front().first;
254  return first_inner_col->get_rte_idx();
255  }
256 
257  size_t getKeyBufferSize() const noexcept {
258  const auto key_component_width = getKeyComponentWidth();
259  CHECK(key_component_width == 4 || key_component_width == 8);
260  const auto key_component_count = getKeyComponentCount();
262  return getEntryCount() * key_component_count * key_component_width;
263  } else {
264  return getEntryCount() * (key_component_count + 1) * key_component_width;
265  }
266  }
267 
268  size_t offsetBufferOff() const noexcept override { return getKeyBufferSize(); }
269 
270  size_t countBufferOff() const noexcept override {
273  } else {
274  return getKeyBufferSize();
275  }
276  }
277 
278  size_t payloadBufferOff() const noexcept override {
281  } else {
282  return getKeyBufferSize();
283  }
284  }
285 
286  std::string getHashJoinType() const final { return "Overlaps"; }
287 
288  bool isBitwiseEq() const override;
289 
290  std::shared_ptr<HashTable> initHashTableOnCpuFromCache(
291  QueryPlanHash key,
292  CacheItemType item_type,
293  DeviceIdentifier device_identifier);
294 
295  std::optional<std::pair<size_t, size_t>> getApproximateTupleCountFromCache(
296  QueryPlanHash key,
297  CacheItemType item_type,
298  DeviceIdentifier device_identifier);
299 
301  CacheItemType item_type,
302  std::shared_ptr<HashTable> hashtable_ptr,
303  DeviceIdentifier device_identifier,
304  size_t hashtable_building_time);
305 
306  llvm::Value* codegenKey(const CompilationOptions&);
307  std::vector<llvm::Value*> codegenManyKey(const CompilationOptions&);
308 
309  std::optional<OverlapsHashTableMetaInfo> getOverlapsHashTableMetaInfo() {
311  }
312 
314  std::vector<InnerOuter> inner_outer_pairs;
315  const size_t num_elements;
316  const size_t chunk_key_hash;
317  const SQLOps optype;
318  const size_t max_hashtable_size;
319  const double bucket_threshold;
320  const std::vector<double> inverse_bucket_sizes = {};
321  };
322 
324  auto hash = info.chunk_key_hash;
325  for (InnerOuter inner_outer : info.inner_outer_pairs) {
326  auto inner_col = inner_outer.first;
327  auto rhs_col_var = dynamic_cast<const Analyzer::ColumnVar*>(inner_outer.second);
328  auto outer_col = rhs_col_var ? rhs_col_var : inner_col;
329  boost::hash_combine(hash, inner_col->toString());
330  if (inner_col->get_type_info().is_string()) {
331  boost::hash_combine(hash, outer_col->toString());
332  }
333  }
334  boost::hash_combine(hash, info.num_elements);
335  boost::hash_combine(hash, info.optype);
336  boost::hash_combine(hash, info.max_hashtable_size);
337  boost::hash_combine(hash, info.bucket_threshold);
338  boost::hash_combine(hash, info.inverse_bucket_sizes);
339  return hash;
340  }
341 
343  const size_t max_hashtable_size,
344  const double bucket_threshold,
345  const std::vector<double>& bucket_sizes,
346  std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>>& fragments_per_device,
347  int device_count) {
348  for (int device_id = 0; device_id < device_count; ++device_id) {
349  auto hash_val = boost::hash_value(hashtable_cache_key_[device_id]);
350  boost::hash_combine(hash_val, max_hashtable_size);
351  boost::hash_combine(hash_val, bucket_threshold);
352  boost::hash_combine(hash_val, bucket_sizes);
353  boost::hash_combine(hash_val,
354  HashJoin::collectFragmentIds(fragments_per_device[device_id]));
355  hashtable_cache_key_[device_id] = hash_val;
356  hash_table_cache_->addQueryPlanDagForTableKeys(hashtable_cache_key_[device_id],
357  table_keys_);
358  }
359  }
360 
361  QueryPlanHash getCacheKey(int device_id) const {
362  return hashtable_cache_key_[device_id];
363  }
364 
365  const std::vector<InnerOuter>& getInnerOuterPairs() const { return inner_outer_pairs_; }
366 
367  void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes,
368  double bucket_threshold,
369  std::vector<double>& bucket_sizes) {
370  OverlapsHashTableMetaInfo overlaps_meta_info;
371  overlaps_meta_info.bucket_sizes = bucket_sizes;
372  overlaps_meta_info.overlaps_max_table_size_bytes = max_table_size_bytes;
373  overlaps_meta_info.overlaps_bucket_threshold = bucket_threshold;
374  HashtableCacheMetaInfo meta_info;
375  meta_info.overlaps_meta_info = overlaps_meta_info;
376  hashtable_cache_meta_info_ = meta_info;
377  }
378 
379  const std::shared_ptr<Analyzer::BinOper> condition_;
381  const std::vector<InputTableInfo>& query_infos_;
383 
384  Executor* executor_;
386 
387  std::vector<InnerOuter> inner_outer_pairs_;
388  const int device_count_;
389 
394 
395  std::optional<HashType>
396  layout_override_; // allows us to use a 1:many hash table for many:many
397 
399 
400  // cache a hashtable based on the cache key C
401  // C = query plan dag D + join col J + hashtable params P
402  // by varying overlaps join hashtable parameters P, we can build
403  // multiple (and different) hashtables for the same query plan dag D
404  // in this scenario, the rule we follow is cache everything
405  // with the assumption that varying P is intended by user
406  // for the performance and so worth to keep it for future recycling
407  static std::unique_ptr<HashtableRecycler> hash_table_cache_;
408  // auto tuner cache is maintained separately with hashtable cache
409  static std::unique_ptr<OverlapsTuningParamRecycler> auto_tuner_cache_;
410 
414  std::vector<QueryPlanHash> hashtable_cache_key_;
416  std::unordered_set<size_t> table_keys_;
418 };
static std::vector< int > collectFragmentIds(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments)
Definition: HashJoin.cpp:446
llvm::Value * codegenKey(const CompilationOptions &)
static void markCachedItemAsDirty(size_t table_key)
int getInnerTableId() const noexceptoverride
void registerQueryHint(const RegisteredQueryHint &query_hint)
size_t DeviceIdentifier
Definition: DataRecycler.h:129
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:157
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:95
const TableIdToNodeMap table_id_to_node_map_
ExecutorDeviceType
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::string getHashJoinType() const final
std::string QueryPlanDAG
SQLOps
Definition: sqldefs.h:28
int getInnerTableRteIdx() const noexceptoverride
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const RegisteredQueryHint & getRegisteredQueryHint()
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:351
#define UNREACHABLE()
Definition: Logger.h:266
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static HashtableRecycler * getHashTableCache()
RegisteredQueryHint query_hint_
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
#define CHECK_GT(x, y)
Definition: Logger.h:234
QueryPlanHash getCacheKey(int device_id) const
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::shared_ptr< Analyzer::BinOper > condition_
int getDeviceCount() const noexceptoverride
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:38
ColumnCacheMap & column_cache_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
const std::vector< InnerOuter > & getInnerOuterPairs() const
size_t payloadBufferOff() const noexceptoverride
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
HashTableBuildDagMap hashtable_build_dag_map_
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::vector< QueryPlanHash > hashtable_cache_key_
static OverlapsTuningParamRecycler * getOverlapsTuningParamCache()
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:269
std::vector< double > inverse_bucket_sizes_for_dimension_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:269
std::optional< HashType > layout_override_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadId parent_thread_id)
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:364
size_t QueryPlanHash
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
HashtableCacheMetaInfo hashtable_cache_meta_info_
#define CHECK(condition)
Definition: Logger.h:222
OverlapsJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const JoinType join_type, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
const Data_Namespace::MemoryLevel memory_level_
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &bucket_sizes, std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &fragments_per_device, int device_count)
size_t getEmittedKeysCount() const
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
std::unordered_set< size_t > table_keys_
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< double > bucket_sizes
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
Definition: DataRecycler.h:136
CompositeKeyInfo composite_key_info_
bool isBitwiseEq() const override
HashType
Definition: HashTable.h:19
size_t getKeyBufferSize() const noexcept
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:150