OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OverlapsJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
23 
25  public:
26  OverlapsJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
27  const JoinType join_type,
28  const std::vector<InputTableInfo>& query_infos,
29  const Data_Namespace::MemoryLevel memory_level,
30  ColumnCacheMap& column_cache,
31  Executor* executor,
32  const std::vector<InnerOuter>& inner_outer_pairs,
33  const int device_count,
34  QueryPlan query_plan_dag,
35  HashtableCacheMetaInfo hashtable_cache_meta_info,
36  const TableIdToNodeMap& table_id_to_node_map)
37  : condition_(condition)
38  , join_type_(join_type)
39  , query_infos_(query_infos)
40  , memory_level_(memory_level)
41  , executor_(executor)
42  , column_cache_(column_cache)
43  , inner_outer_pairs_(inner_outer_pairs)
44  , device_count_(device_count)
45  , query_plan_dag_(query_plan_dag)
46  , table_id_to_node_map_(table_id_to_node_map)
48  , hashtable_cache_meta_info_(hashtable_cache_meta_info) {
50  hash_tables_for_device_.resize(std::max(device_count_, 1));
52  }
53 
55 
57  static std::shared_ptr<OverlapsJoinHashTable> getInstance(
58  const std::shared_ptr<Analyzer::BinOper> condition,
59  const std::vector<InputTableInfo>& query_infos,
60  const Data_Namespace::MemoryLevel memory_level,
61  const JoinType join_type,
62  const int device_count,
63  ColumnCacheMap& column_cache,
64  Executor* executor,
65  const HashTableBuildDagMap& hashtable_build_dag_map,
66  const RegisteredQueryHint& query_hint,
67  const TableIdToNodeMap& table_id_to_node_map);
68 
69  static auto getCacheInvalidator() -> std::function<void()> {
70  return []() -> void {
72  auto auto_tuner_cache_invalidator = auto_tuner_cache_->getCacheInvalidator();
73  auto_tuner_cache_invalidator();
74 
76  auto main_cache_invalidator = hash_table_cache_->getCacheInvalidator();
77  main_cache_invalidator();
78  };
79  }
80 
83  return hash_table_cache_.get();
84  }
85 
88  return auto_tuner_cache_.get();
89  }
90 
91  protected:
92  void reify(const HashType preferred_layout);
93 
94  virtual void reifyWithLayout(const HashType layout);
95 
96  virtual void reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
97  const Fragmenter_Namespace::TableInfo& query_info,
98  const HashType layout,
99  const size_t shard_count,
100  const size_t entry_count,
101  const size_t emitted_keys_count,
102  const bool skip_hashtable_caching,
103  const size_t chosen_max_hashtable_size,
104  const double chosen_bucket_threshold);
105 
106  void reifyForDevice(const ColumnsForDevice& columns_for_device,
107  const HashType layout,
108  const size_t entry_count,
109  const size_t emitted_keys_count,
110  const bool skip_hashtable_caching,
111  const int device_id,
112  const logger::ThreadId parent_thread_id);
113 
114  size_t calculateHashTableSize(size_t number_of_dimensions,
115  size_t emitted_keys_count,
116  size_t entry_count) const;
117 
119  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
120  const int device_id,
121  DeviceAllocator* dev_buff_owner);
122 
123  // returns entry_count, emitted_keys_count
124  virtual std::pair<size_t, size_t> approximateTupleCount(
125  const std::vector<double>& inverse_bucket_sizes_for_dimension,
126  std::vector<ColumnsForDevice>&,
127  const size_t chosen_max_hashtable_size,
128  const double chosen_bucket_threshold);
129 
130  // returns entry_count, emitted_keys_count
131  virtual std::pair<size_t, size_t> computeHashTableCounts(
132  const size_t shard_count,
133  const std::vector<double>& inverse_bucket_sizes_for_dimension,
134  std::vector<ColumnsForDevice>& columns_per_device,
135  const size_t chosen_max_hashtable_size,
136  const double chosen_bucket_threshold);
137 
138  void setInverseBucketSizeInfo(const std::vector<double>& inverse_bucket_sizes,
139  std::vector<ColumnsForDevice>& columns_per_device,
140  const size_t device_count);
141 
142  size_t getKeyComponentWidth() const;
143 
144  size_t getKeyComponentCount() const;
145 
146  HashType getHashType() const noexcept override {
147  if (layout_override_) {
148  return *layout_override_;
149  }
150  auto hash_table = getHashTableForDevice(0);
151  CHECK(hash_table);
152  return hash_table->getLayout();
153  }
154 
156  return memory_level_;
157  }
158 
159  int getDeviceCount() const noexcept override { return device_count_; };
160 
161  std::shared_ptr<BaselineHashTable> initHashTableOnCpu(
162  const std::vector<JoinColumn>& join_columns,
163  const std::vector<JoinColumnTypeInfo>& join_column_types,
164  const std::vector<JoinBucketInfo>& join_bucket_info,
165  const HashType layout,
166  const size_t entry_count,
167  const size_t emitted_keys_count,
168  const bool skip_hashtable_caching);
169 
170 #ifdef HAVE_CUDA
171  std::shared_ptr<BaselineHashTable> initHashTableOnGpu(
172  const std::vector<JoinColumn>& join_columns,
173  const std::vector<JoinColumnTypeInfo>& join_column_types,
174  const std::vector<JoinBucketInfo>& join_bucket_info,
175  const HashType layout,
176  const size_t entry_count,
177  const size_t emitted_keys_count,
178  const size_t device_id);
179 
180  std::shared_ptr<BaselineHashTable> copyCpuHashTableToGpu(
181  std::shared_ptr<BaselineHashTable>&& cpu_hash_table,
182  const HashType layout,
183  const size_t entry_count,
184  const size_t emitted_keys_count,
185  const size_t device_id);
186 #endif // HAVE_CUDA
187 
189  const size_t) override;
190 
191  std::string toString(const ExecutorDeviceType device_type,
192  const int device_id = 0,
193  bool raw = false) const override;
194 
196  const int device_id) const override;
197 
198  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override {
199  UNREACHABLE(); // not applicable for overlaps join
200  return nullptr;
201  }
202 
204 
205  void registerQueryHint(const RegisteredQueryHint& query_hint) {
206  query_hint_ = query_hint;
207  }
208 
209  size_t getEntryCount() const {
210  auto hash_table = getHashTableForDevice(0);
211  CHECK(hash_table);
212  return hash_table->getEntryCount();
213  }
214 
215  size_t getEmittedKeysCount() const {
216  auto hash_table = getHashTableForDevice(0);
217  CHECK(hash_table);
218  return hash_table->getEmittedKeysCount();
219  }
220 
221  size_t getComponentBufferSize() const noexcept override {
222  CHECK(!hash_tables_for_device_.empty());
223  auto hash_table = hash_tables_for_device_.front();
224  CHECK(hash_table);
225  return hash_table->getEntryCount() * sizeof(int32_t);
226  }
227 
228  size_t shardCount() const {
230  return 0;
231  }
234  }
235 
237  const std::vector<InnerOuter>& inner_outer_pairs) const;
238 
239  int getInnerTableId() const noexcept override;
240 
241  int getInnerTableRteIdx() const noexcept override {
242  CHECK(!inner_outer_pairs_.empty());
243  const auto first_inner_col = inner_outer_pairs_.front().first;
244  return first_inner_col->get_rte_idx();
245  }
246 
247  size_t getKeyBufferSize() const noexcept {
248  const auto key_component_width = getKeyComponentWidth();
249  CHECK(key_component_width == 4 || key_component_width == 8);
250  const auto key_component_count = getKeyComponentCount();
252  return getEntryCount() * key_component_count * key_component_width;
253  } else {
254  return getEntryCount() * (key_component_count + 1) * key_component_width;
255  }
256  }
257 
258  size_t offsetBufferOff() const noexcept override { return getKeyBufferSize(); }
259 
260  size_t countBufferOff() const noexcept override {
263  } else {
264  return getKeyBufferSize();
265  }
266  }
267 
268  size_t payloadBufferOff() const noexcept override {
271  } else {
272  return getKeyBufferSize();
273  }
274  }
275 
276  std::string getHashJoinType() const final { return "Overlaps"; }
277 
278  std::shared_ptr<HashTable> initHashTableOnCpuFromCache(
279  QueryPlanHash key,
280  CacheItemType item_type,
281  DeviceIdentifier device_identifier);
282 
283  std::optional<std::pair<size_t, size_t>> getApproximateTupleCountFromCache(
284  QueryPlanHash key,
285  CacheItemType item_type,
286  DeviceIdentifier device_identifier);
287 
289  CacheItemType item_type,
290  std::shared_ptr<HashTable> hashtable_ptr,
291  DeviceIdentifier device_identifier,
292  size_t hashtable_building_time);
293 
294  llvm::Value* codegenKey(const CompilationOptions&);
295  std::vector<llvm::Value*> codegenManyKey(const CompilationOptions&);
296 
297  std::optional<OverlapsHashTableMetaInfo> getOverlapsHashTableMetaInfo() {
299  }
300 
302  std::vector<InnerOuter> inner_outer_pairs;
303  const size_t num_elements;
304  const std::vector<ChunkKey> chunk_key;
305  const SQLOps optype;
306  const size_t max_hashtable_size;
307  const double bucket_threshold;
308  const std::vector<double> inverse_bucket_sizes = {};
309  };
310 
312  auto hash = boost::hash_value(::toString(info.chunk_key));
313  for (InnerOuter inner_outer : info.inner_outer_pairs) {
314  auto inner_col = inner_outer.first;
315  auto rhs_col_var = dynamic_cast<const Analyzer::ColumnVar*>(inner_outer.second);
316  auto outer_col = rhs_col_var ? rhs_col_var : inner_col;
317  boost::hash_combine(hash, inner_col->toString());
318  if (inner_col->get_type_info().is_string()) {
319  boost::hash_combine(hash, outer_col->toString());
320  }
321  }
322  boost::hash_combine(hash, info.num_elements);
323  boost::hash_combine(hash, ::toString(info.optype));
324  boost::hash_combine(hash, info.max_hashtable_size);
325  boost::hash_combine(hash, info.bucket_threshold);
326  boost::hash_combine(hash, ::toString(info.inverse_bucket_sizes));
327  return hash;
328  }
329 
330  void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold) {
331  std::ostringstream oss;
332  oss << query_plan_dag_;
333  oss << max_hashtable_size << "|";
334  oss << bucket_threshold;
335  hashtable_cache_key_ = boost::hash_value(oss.str());
336  }
337 
339 
340  const std::vector<InnerOuter>& getInnerOuterPairs() const { return inner_outer_pairs_; }
341 
342  void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes,
343  double bucket_threshold,
344  std::vector<double>& bucket_sizes) {
345  OverlapsHashTableMetaInfo overlaps_meta_info;
346  overlaps_meta_info.bucket_sizes = bucket_sizes;
347  overlaps_meta_info.overlaps_max_table_size_bytes = max_table_size_bytes;
348  overlaps_meta_info.overlaps_bucket_threshold = bucket_threshold;
349  HashtableCacheMetaInfo meta_info;
350  meta_info.overlaps_meta_info = overlaps_meta_info;
351  hashtable_cache_meta_info_ = meta_info;
352  }
353 
354  const std::shared_ptr<Analyzer::BinOper> condition_;
356  const std::vector<InputTableInfo>& query_infos_;
358 
359  Executor* executor_;
361 
362  std::vector<InnerOuter> inner_outer_pairs_;
363  const int device_count_;
364 
369 
370  std::optional<HashType>
371  layout_override_; // allows us to use a 1:many hash table for many:many
372 
374 
375  // cache a hashtable based on the cache key C
376  // C = query plan dag D + join col J + hashtable params P
377  // by varying overlaps join hashtable parameters P, we can build
378  // multiple (and different) hashtables for the same query plan dag D
379  // in this scenario, the rule we follow is cache everything
380  // with the assumption that varying P is intended by user
381  // for the performance and so worth to keep it for future recycling
382  static std::unique_ptr<HashtableRecycler> hash_table_cache_;
383  // auto tuner cache is maintained separately with hashtable cache
384  static std::unique_ptr<OverlapsTuningParamRecycler> auto_tuner_cache_;
385 
391 };
llvm::Value * codegenKey(const CompilationOptions &)
int getInnerTableId() const noexceptoverride
void registerQueryHint(const RegisteredQueryHint &query_hint)
size_t DeviceIdentifier
Definition: DataRecycler.h:111
virtual void reifyWithLayout(const HashType layout)
JoinType
Definition: sqldefs.h:108
QueryPlanHash getCacheKey() const
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
const TableIdToNodeMap table_id_to_node_map_
ExecutorDeviceType
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
void generateCacheKey(const size_t max_hashtable_size, const double bucket_threshold)
std::string getHashJoinType() const final
#define const
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
SQLOps
Definition: sqldefs.h:29
int getInnerTableRteIdx() const noexceptoverride
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hint, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const RegisteredQueryHint & getRegisteredQueryHint()
std::optional< OverlapsHashTableMetaInfo > overlaps_meta_info
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:294
#define UNREACHABLE()
Definition: Logger.h:253
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
static HashtableRecycler * getHashTableCache()
RegisteredQueryHint query_hint_
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
static std::unique_ptr< OverlapsTuningParamRecycler > auto_tuner_cache_
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< HashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching)
#define CHECK_GT(x, y)
Definition: Logger.h:221
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::shared_ptr< Analyzer::BinOper > condition_
int getDeviceCount() const noexceptoverride
OverlapsJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const JoinType join_type, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count, QueryPlan query_plan_dag, HashtableCacheMetaInfo hashtable_cache_meta_info, const TableIdToNodeMap &table_id_to_node_map)
void reify(const HashType preferred_layout)
CacheItemType
Definition: DataRecycler.h:36
ColumnCacheMap & column_cache_
QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForOverlapsHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
const std::vector< InnerOuter > & getInnerOuterPairs() const
size_t payloadBufferOff() const noexceptoverride
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
static OverlapsTuningParamRecycler * getOverlapsTuningParamCache()
void setOverlapsHashtableMetaInfo(size_t max_table_size_bytes, double bucket_threshold, std::vector< double > &bucket_sizes)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:239
std::vector< double > inverse_bucket_sizes_for_dimension_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:187
std::optional< HashType > layout_override_
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const int device_id, const logger::ThreadId parent_thread_id)
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:345
size_t QueryPlanHash
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
HashtableCacheMetaInfo hashtable_cache_meta_info_
#define CHECK(condition)
Definition: Logger.h:209
const Data_Namespace::MemoryLevel memory_level_
size_t getEmittedKeysCount() const
size_t getComponentBufferSize() const noexceptoverride
static std::unique_ptr< HashtableRecycler > hash_table_cache_
std::optional< OverlapsHashTableMetaInfo > getOverlapsHashTableMetaInfo()
std::vector< InnerOuter > inner_outer_pairs_
static auto getCacheInvalidator() -> std::function< void()>
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< double > bucket_sizes
CompositeKeyInfo composite_key_info_
std::string QueryPlan
HashType
Definition: HashTable.h:19
size_t getKeyBufferSize() const noexcept
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:130
std::unordered_map< JoinColumnsInfo, HashTableBuildDag > HashTableBuildDagMap