OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
23 
25  const size_t num_elements;
26  const std::vector<ChunkKey> chunk_keys;
27  const SQLOps optype;
28  const size_t max_hashtable_size;
29  const double bucket_threshold;
30  const std::vector<double> inverse_bucket_sizes;
31 
32  bool operator==(const struct OverlapsHashTableCacheKey& that) const {
33  if (inverse_bucket_sizes.size() != that.inverse_bucket_sizes.size()) {
34  return false;
35  }
36  for (size_t i = 0; i < inverse_bucket_sizes.size(); i++) {
37  // bucket sizes within 10^-4 are considered close enough
38  if (std::abs(inverse_bucket_sizes[i] - that.inverse_bucket_sizes[i]) > 1e-4) {
39  return false;
40  }
41  }
42  return num_elements == that.num_elements && chunk_keys == that.chunk_keys &&
45  }
46 
48  const std::vector<ChunkKey>& chunk_keys,
49  const SQLOps& optype,
50  const size_t max_hashtable_size,
51  const double bucket_threshold,
52  const std::vector<double> inverse_bucket_sizes = {})
53  : num_elements(num_elements)
54  , chunk_keys(chunk_keys)
55  , optype(optype)
56  , max_hashtable_size(max_hashtable_size)
57  , bucket_threshold(bucket_threshold)
59 
60  // "copy" constructor
62  const size_t max_hashtable_size,
63  const double bucket_threshold,
64  const std::vector<double>& inverse_bucket_sizes = {})
66  , chunk_keys(that.chunk_keys)
67  , optype(that.optype)
68  , max_hashtable_size(max_hashtable_size)
69  , bucket_threshold(bucket_threshold)
71 };
72 
73 template <class K, class V>
74 class OverlapsHashTableCache : public HashTableCache<K, V> {
75  public:
76  std::optional<std::pair<K, V>> getWithKey(const K& key) {
77  std::lock_guard<std::mutex> guard(this->mutex_);
78  for (const auto& kv : this->contents_) {
79  if (kv.first == key) {
80  return kv;
81  }
82  }
83  return std::nullopt;
84  }
85 };
86 
88  public:
89  OverlapsJoinHashTable(const std::shared_ptr<Analyzer::BinOper> condition,
90  const std::vector<InputTableInfo>& query_infos,
91  const Data_Namespace::MemoryLevel memory_level,
92  ColumnCacheMap& column_cache,
93  Executor* executor,
94  const std::vector<InnerOuter>& inner_outer_pairs,
95  const int device_count)
96  : condition_(condition)
97  , query_infos_(query_infos)
98  , memory_level_(memory_level)
99  , executor_(executor)
100  , column_cache_(column_cache)
101  , inner_outer_pairs_(inner_outer_pairs)
102  , device_count_(device_count) {
104  hash_tables_for_device_.resize(std::max(device_count_, 1));
106  }
107 
109 
111  static std::shared_ptr<OverlapsJoinHashTable> getInstance(
112  const std::shared_ptr<Analyzer::BinOper> condition,
113  const std::vector<InputTableInfo>& query_infos,
114  const Data_Namespace::MemoryLevel memory_level,
115  const int device_count,
116  ColumnCacheMap& column_cache,
117  Executor* executor,
118  const RegisteredQueryHint& query_hint);
119 
120  static auto getCacheInvalidator() -> std::function<void()> {
121  return []() -> void {
123  auto auto_tuner_cache_invalidator = auto_tuner_cache_->getCacheInvalidator();
124  auto_tuner_cache_invalidator();
125 
127  auto main_cache_invalidator = hash_table_cache_->getCacheInvalidator();
128  main_cache_invalidator();
129  };
130  }
131 
133  // for unit tests
135  return hash_table_cache_->getNumberOfCachedHashTables() +
136  auto_tuner_cache_->getNumberOfCachedHashTables();
137  }
138 
139  protected:
140  void reify(const HashType preferred_layout);
141 
142  void reifyWithLayout(const HashType layout);
143 
144  virtual void reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
145  const Fragmenter_Namespace::TableInfo& query_info,
146  const HashType layout,
147  const size_t shard_count,
148  const size_t entry_count,
149  const size_t emitted_keys_count,
150  const bool skip_hashtable_caching,
151  const size_t chosen_max_hashtable_size,
152  const double chosen_bucket_threshold);
153 
154  void reifyForDevice(const ColumnsForDevice& columns_for_device,
155  const HashType layout,
156  const size_t entry_count,
157  const size_t emitted_keys_count,
158  const bool skip_hashtable_caching,
159  const size_t chosen_max_hashtable_size,
160  const double chosen_bucket_threshold,
161  const int device_id,
162  const logger::ThreadId parent_thread_id);
163 
164  size_t calculateHashTableSize(size_t number_of_dimensions,
165  size_t emitted_keys_count,
166  size_t entry_count) const;
167 
169  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
170  const int device_id,
171  DeviceAllocator* dev_buff_owner);
172 
173  // returns entry_count, emitted_keys_count
174  virtual std::pair<size_t, size_t> approximateTupleCount(
175  const std::vector<double>& inverse_bucket_sizes_for_dimension,
176  std::vector<ColumnsForDevice>&,
177  const size_t chosen_max_hashtable_size,
178  const double chosen_bucket_threshold);
179 
180  // returns entry_count, emitted_keys_count
181  virtual std::pair<size_t, size_t> computeHashTableCounts(
182  const size_t shard_count,
183  const std::vector<double>& inverse_bucket_sizes_for_dimension,
184  std::vector<ColumnsForDevice>& columns_per_device,
185  const size_t chosen_max_hashtable_size,
186  const double chosen_bucket_threshold);
187 
188  void setInverseBucketSizeInfo(const std::vector<double>& inverse_bucket_sizes,
189  std::vector<ColumnsForDevice>& columns_per_device,
190  const size_t device_count);
191 
192  size_t getKeyComponentWidth() const;
193 
194  size_t getKeyComponentCount() const;
195 
196  HashType getHashType() const noexcept override {
197  if (layout_override_) {
198  return *layout_override_;
199  }
200  auto hash_table = getHashTableForDevice(0);
201  CHECK(hash_table);
202  return hash_table->getLayout();
203  }
204 
206  return memory_level_;
207  }
208 
209  int getDeviceCount() const noexcept override { return device_count_; };
210 
211  std::shared_ptr<BaselineHashTable> initHashTableOnCpu(
212  const std::vector<JoinColumn>& join_columns,
213  const std::vector<JoinColumnTypeInfo>& join_column_types,
214  const std::vector<JoinBucketInfo>& join_bucket_info,
215  const HashType layout,
216  const size_t entry_count,
217  const size_t emitted_keys_count,
218  const bool skip_hashtable_caching,
219  const size_t chosen_max_hashtable_size,
220  const double chosen_bucket_threshold);
221 
222 #ifdef HAVE_CUDA
223  std::shared_ptr<BaselineHashTable> initHashTableOnGpu(
224  const std::vector<JoinColumn>& join_columns,
225  const std::vector<JoinColumnTypeInfo>& join_column_types,
226  const std::vector<JoinBucketInfo>& join_bucket_info,
227  const HashType layout,
228  const size_t entry_count,
229  const size_t emitted_keys_count,
230  const size_t device_id);
231 
232  std::shared_ptr<BaselineHashTable> copyCpuHashTableToGpu(
233  std::shared_ptr<BaselineHashTable>&& cpu_hash_table,
234  const HashType layout,
235  const size_t entry_count,
236  const size_t emitted_keys_count,
237  const size_t device_id);
238 #endif // HAVE_CUDA
239 
241  const size_t) override;
242 
243  std::string toString(const ExecutorDeviceType device_type,
244  const int device_id = 0,
245  bool raw = false) const override;
246 
248  const int device_id) const override;
249 
250  llvm::Value* codegenSlot(const CompilationOptions&, const size_t) override {
251  UNREACHABLE(); // not applicable for overlaps join
252  return nullptr;
253  }
254 
256 
257  void registerQueryHint(const RegisteredQueryHint& query_hint) {
258  query_hint_ = query_hint;
259  }
260 
261  private:
262  size_t getEntryCount() const {
263  auto hash_table = getHashTableForDevice(0);
264  CHECK(hash_table);
265  return hash_table->getEntryCount();
266  }
267 
268  size_t getEmittedKeysCount() const {
269  auto hash_table = getHashTableForDevice(0);
270  CHECK(hash_table);
271  return hash_table->getEmittedKeysCount();
272  }
273 
274  size_t getComponentBufferSize() const noexcept override {
275  CHECK(!hash_tables_for_device_.empty());
276  auto hash_table = hash_tables_for_device_.front();
277  CHECK(hash_table);
278  return hash_table->getEntryCount() * sizeof(int32_t);
279  }
280 
281  size_t shardCount() const {
283  return 0;
284  }
287  }
288 
290  const std::vector<InnerOuter>& inner_outer_pairs) const;
291 
292  int getInnerTableId() const noexcept override;
293 
294  int getInnerTableRteIdx() const noexcept override {
295  CHECK(!inner_outer_pairs_.empty());
296  const auto first_inner_col = inner_outer_pairs_.front().first;
297  return first_inner_col->get_rte_idx();
298  }
299 
300  size_t getKeyBufferSize() const noexcept {
301  const auto key_component_width = getKeyComponentWidth();
302  CHECK(key_component_width == 4 || key_component_width == 8);
303  const auto key_component_count = getKeyComponentCount();
305  return getEntryCount() * key_component_count * key_component_width;
306  } else {
307  return getEntryCount() * (key_component_count + 1) * key_component_width;
308  }
309  }
310 
311  size_t offsetBufferOff() const noexcept override { return getKeyBufferSize(); }
312 
313  size_t countBufferOff() const noexcept override {
316  } else {
317  return getKeyBufferSize();
318  }
319  }
320 
321  size_t payloadBufferOff() const noexcept override {
324  } else {
325  return getKeyBufferSize();
326  }
327  }
328 
329  std::string getHashJoinType() const final { return "Overlaps"; }
330 
331  std::shared_ptr<HashTable> initHashTableOnCpuFromCache(
332  const OverlapsHashTableCacheKey& key);
333 
334  std::optional<std::pair<size_t, size_t>> getApproximateTupleCountFromCache(
336 
338  std::shared_ptr<HashTable> hash_table);
339 
340  llvm::Value* codegenKey(const CompilationOptions&);
341  std::vector<llvm::Value*> codegenManyKey(const CompilationOptions&);
342 
343  const std::shared_ptr<Analyzer::BinOper> condition_;
344  const std::vector<InputTableInfo>& query_infos_;
346 
347  Executor* executor_;
349 
350  std::vector<InnerOuter> inner_outer_pairs_;
351  const int device_count_;
352 
354 
355  std::optional<HashType>
356  layout_override_; // allows us to use a 1:many hash table for many:many
357 
359 
360  using HashTableCacheValue = std::shared_ptr<HashTable>;
361  // includes bucket threshold
362  static std::unique_ptr<
365  // skips bucket threshold
366  using BucketThreshold = double;
367  using BucketSizes = std::vector<double>;
368  static std::unique_ptr<
371 
373 };
llvm::Value * codegenKey(const CompilationOptions &)
OverlapsHashTableCacheKey(const OverlapsHashTableCacheKey &that, const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > &inverse_bucket_sizes={})
int getInnerTableId() const noexceptoverride
void registerQueryHint(const RegisteredQueryHint &query_hint)
void reifyWithLayout(const HashType layout)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
const std::vector< double > inverse_bucket_sizes
std::vector< double > BucketSizes
ExecutorDeviceType
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
std::string getHashJoinType() const final
#define const
SQLOps
Definition: sqldefs.h:29
int getInnerTableRteIdx() const noexceptoverride
std::optional< std::pair< K, V > > getWithKey(const K &key)
bool operator==(const struct OverlapsHashTableCacheKey &that) const
const RegisteredQueryHint & getRegisteredQueryHint()
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(const OverlapsHashTableCacheKey &key)
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:269
#define UNREACHABLE()
Definition: Logger.h:247
RegisteredQueryHint query_hint_
Data_Namespace::MemoryLevel getMemoryLevel() const noexceptoverride
#define CHECK_GT(x, y)
Definition: Logger.h:215
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &inverse_bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
HashType getHashType() const noexceptoverride
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::shared_ptr< Analyzer::BinOper > condition_
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(const OverlapsHashTableCacheKey &)
int getDeviceCount() const noexceptoverride
void reify(const HashType preferred_layout)
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold)
ColumnCacheMap & column_cache_
void putHashTableOnCpuToCache(const OverlapsHashTableCacheKey &key, std::shared_ptr< HashTable > hash_table)
const std::vector< InputTableInfo > & query_infos_
size_t payloadBufferOff() const noexceptoverride
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const RegisteredQueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
static std::unique_ptr< OverlapsHashTableCache< OverlapsHashTableCacheKey, HashTableCacheValue > > hash_table_cache_
OverlapsHashTableCacheKey(const size_t num_elements, const std::vector< ChunkKey > &chunk_keys, const SQLOps &optype, const size_t max_hashtable_size, const double bucket_threshold, const std::vector< double > inverse_bucket_sizes={})
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::shared_ptr< HashTable > HashTableCacheValue
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:214
std::vector< double > inverse_bucket_sizes_for_dimension_
static RegisteredQueryHint defaults()
Definition: QueryHint.h:175
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const bool skip_hashtable_caching, const size_t chosen_max_hashtable_size, const double chosen_bucket_threshold, const int device_id, const logger::ThreadId parent_thread_id)
std::optional< HashType > layout_override_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
std::mutex mutex_
static std::unique_ptr< HashTableCache< OverlapsHashTableCacheKey, std::pair< BucketThreshold, BucketSizes > > > auto_tuner_cache_
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
Definition: HashTable.h:34
static size_t getCombinedHashTableCacheSize()
void setInverseBucketSizeInfo(const std::vector< double > &inverse_bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:312
size_t offsetBufferOff() const noexceptoverride
size_t countBufferOff() const noexceptoverride
#define CHECK(condition)
Definition: Logger.h:203
OverlapsJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
const Data_Namespace::MemoryLevel memory_level_
size_t getEmittedKeysCount() const
size_t getComponentBufferSize() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
const std::vector< ChunkKey > chunk_keys
static auto getCacheInvalidator() -> std::function< void()>
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< std::pair< K, V > > contents_
HashType
Definition: HashTable.h:19
size_t getKeyBufferSize() const noexcept
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:129