OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
21 
22 #include "Shared/scope.h"
23 
25  public:
27 
28  void allocateDeviceMemory(const size_t num_column_elems,
29  const HashType layout,
30  BucketizedHashEntryInfo hash_entry_info,
31  const size_t shard_count,
32  const int device_id,
33  const int device_count,
34  const Executor* executor) {
35 #ifdef HAVE_CUDA
36  if (shard_count) {
37  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
38  CHECK_GT(shards_per_device, 0u);
39  const size_t entries_per_shard =
40  get_entries_per_shard(hash_entry_info.bucketized_hash_entry_count, shard_count);
41  hash_entry_info.bucketized_hash_entry_count = entries_per_shard * shards_per_device;
42  }
43  const size_t total_count =
44  layout == HashType::OneToOne
45  ? hash_entry_info.getNormalizedHashEntryCount()
46  : 2 * hash_entry_info.getNormalizedHashEntryCount() + num_column_elems;
48  hash_table_ =
49  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
50  layout,
52  hash_entry_info.getNormalizedHashEntryCount(),
53  num_column_elems);
54  hash_table_->allocateGpuMemory(total_count, device_id);
55 #else
56  UNREACHABLE();
57 #endif // HAVE_CUDA
58  }
59 
60  void allocateDeviceMemory(const JoinColumn& join_column,
61  const HashType layout,
62  BucketizedHashEntryInfo& hash_entry_info,
63  const size_t shard_count,
64  const int device_id,
65  const int device_count,
66  const Executor* executor) {
67  allocateDeviceMemory(join_column.num_elems,
68  layout,
69  hash_entry_info,
70  shard_count,
71  device_id,
72  device_count,
73  executor);
74  }
75 
76 #ifdef HAVE_CUDA
77  void initHashTableOnGpu(const ChunkKey& chunk_key,
78  const JoinColumn& join_column,
79  const ExpressionRange& col_range,
80  const bool is_bitwise_eq,
81  const InnerOuter& cols,
82  const JoinType join_type,
83  const HashType layout,
84  const BucketizedHashEntryInfo hash_entry_info,
85  const size_t shard_count,
86  const int32_t hash_join_invalid_val,
87  const int device_id,
88  const int device_count,
89  const Executor* executor) {
90  auto timer = DEBUG_TIMER(__func__);
91  auto data_mgr = executor->getDataMgr();
92  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
93  CudaAllocator::allocGpuAbstractBuffer(data_mgr, sizeof(int), device_id);
94  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
95  data_mgr->free(gpu_hash_table_err_buff);
96  };
97  CHECK(gpu_hash_table_err_buff);
98  auto dev_err_buff = gpu_hash_table_err_buff->getMemoryPtr();
99  int err{0};
100 
101  auto allocator = std::make_unique<CudaAllocator>(
102  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
103  allocator->copyToDevice(dev_err_buff, &err, sizeof(err));
104 
106  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
107 
108  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
109  hash_entry_info.getNormalizedHashEntryCount(),
110  hash_join_invalid_val);
111  if (chunk_key.empty()) {
112  return;
113  }
114 
115  // TODO: pass this in? duplicated in JoinHashTable currently
116  const auto inner_col = cols.first;
117  CHECK(inner_col);
118  const auto& ti = inner_col->get_type_info();
119  auto translated_null_val = col_range.getIntMax() + 1;
120  if (col_range.getIntMax() < col_range.getIntMin()) {
121  translated_null_val = col_range.getIntMin() - 1;
122  }
123  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
124  col_range.getIntMin(),
125  col_range.getIntMax(),
127  is_bitwise_eq,
128  translated_null_val,
130  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
131  if (shard_count) {
132  const size_t entries_per_shard =
133  get_entries_per_shard(hash_entry_info.bucketized_hash_entry_count, shard_count);
134  CHECK_GT(device_count, 0);
135  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
136  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
137  if (layout == HashType::OneToOne) {
139  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
140  hash_join_invalid_val,
141  for_semi_anti_join(join_type),
142  reinterpret_cast<int*>(dev_err_buff),
143  join_column,
144  type_info,
145  shard_info,
146  hash_entry_info.bucket_normalization);
147  } else {
149  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
150  hash_entry_info,
151  join_column,
152  type_info,
153  shard_info);
154  }
155  }
156  } else {
157  if (layout == HashType::OneToOne) {
159  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
160  hash_join_invalid_val,
161  for_semi_anti_join(join_type),
162  reinterpret_cast<int*>(dev_err_buff),
163  join_column,
164  type_info,
165  hash_entry_info.bucket_normalization);
166  } else {
167  if (use_bucketization) {
169  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
170  hash_entry_info,
171  join_column,
172  type_info);
173  } else {
175  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
176  hash_entry_info,
177  join_column,
178  type_info,
179  join_type == JoinType::WINDOW_FUNCTION_FRAMING);
180  }
181  }
182  }
183  allocator->copyFromDevice(&err, dev_err_buff, sizeof(err));
184  if (err) {
185  if (layout == HashType::OneToOne) {
186  throw NeedsOneToManyHash();
187  } else {
188  throw std::runtime_error("Unexpected error when building perfect hash table: " +
189  std::to_string(err));
190  }
191  }
192  }
193 #endif
194 
196  const JoinColumn& join_column,
197  const ExpressionRange& col_range,
198  const bool is_bitwise_eq,
199  const InnerOuter& cols,
200  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
201  const JoinType join_type,
202  const HashType hash_type,
203  const BucketizedHashEntryInfo hash_entry_info,
204  const int32_t hash_join_invalid_val,
205  const Executor* executor) {
206  auto timer = DEBUG_TIMER(__func__);
207  const auto inner_col = cols.first;
208  CHECK(inner_col);
209  const auto& ti = inner_col->get_type_info();
210 
211  CHECK(!hash_table_);
212  hash_table_ =
213  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
214  hash_type,
216  hash_entry_info.getNormalizedHashEntryCount(),
217  0);
218 
219  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
220  const int thread_count = cpu_threads();
221  std::vector<std::thread> init_cpu_buff_threads;
222 
223  {
224  auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
225 #ifdef HAVE_TBB
226  init_hash_join_buff_tbb(cpu_hash_table_buff,
227  hash_entry_info.getNormalizedHashEntryCount(),
228  hash_join_invalid_val);
229 #else // #ifdef HAVE_TBB
230  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
231  init_cpu_buff_threads.emplace_back([hash_entry_info,
232  hash_join_invalid_val,
233  thread_idx,
234  thread_count,
235  cpu_hash_table_buff] {
236  init_hash_join_buff(cpu_hash_table_buff,
237  hash_entry_info.getNormalizedHashEntryCount(),
238  hash_join_invalid_val,
239  thread_idx,
240  thread_count);
241  });
242  }
243  for (auto& t : init_cpu_buff_threads) {
244  t.join();
245  }
246  init_cpu_buff_threads.clear();
247 #endif // !HAVE_TBB
248  }
249  const bool for_semi_join = for_semi_anti_join(join_type);
250  std::atomic<int> err{0};
251  {
252  auto timer_fill =
253  DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
254  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
255  init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
256  &join_column,
257  str_proxy_translation_map,
258  thread_idx,
259  thread_count,
260  &ti,
261  &err,
262  &col_range,
263  &is_bitwise_eq,
264  &for_semi_join,
265  cpu_hash_table_buff,
266  hash_entry_info] {
267  auto translated_null_val = col_range.getIntMax() + 1;
268  if (col_range.getIntMax() < col_range.getIntMin()) {
269  translated_null_val = col_range.getIntMin() - 1;
270  }
271  int partial_err = fill_hash_join_buff_bucketized(
272  cpu_hash_table_buff,
273  hash_join_invalid_val,
274  for_semi_join,
275  join_column,
276  {static_cast<size_t>(ti.get_size()),
277  col_range.getIntMin(),
278  col_range.getIntMax(),
280  is_bitwise_eq,
281  translated_null_val,
283  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
284  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
285  : 0, // 0 is dummy value
286  thread_idx,
287  thread_count,
288  hash_entry_info.bucket_normalization);
289  int zero{0};
290  err.compare_exchange_strong(zero, partial_err);
291  });
292  }
293  for (auto& t : init_cpu_buff_threads) {
294  t.join();
295  }
296  }
297  if (err) {
298  // Too many hash entries, need to retry with a 1:many table
299  hash_table_ = nullptr; // clear the hash table buffer
300  throw NeedsOneToManyHash();
301  }
302  }
303 
305  const JoinColumn& join_column,
306  const ExpressionRange& col_range,
307  const bool is_bitwise_eq,
308  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
309  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
310  const JoinType join_type,
311  const BucketizedHashEntryInfo hash_entry_info,
312  const int32_t hash_join_invalid_val,
313  const Executor* executor) {
314  auto timer = DEBUG_TIMER(__func__);
315  const auto inner_col = cols.first;
316  CHECK(inner_col);
317  const auto& ti = inner_col->get_type_info();
318  CHECK(!hash_table_);
319  hash_table_ = std::make_unique<PerfectHashTable>(
320  executor->getDataMgr(),
323  hash_entry_info.getNormalizedHashEntryCount(),
324  join_column.num_elems,
325  join_type == JoinType::WINDOW_FUNCTION_FRAMING);
326  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
327 
328  int thread_count = cpu_threads();
329  {
330  auto timer_init =
331  DEBUG_TIMER("CPU One-To-Many Perfect Hash Table Builder: init_hash_join_buff");
332 #ifdef HAVE_TBB
333  init_hash_join_buff_tbb(cpu_hash_table_buff,
334  hash_entry_info.getNormalizedHashEntryCount(),
335  hash_join_invalid_val);
336 #else // #ifdef HAVE_TBB
337  std::vector<std::future<void> > init_threads;
338  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
339  init_threads.emplace_back(
342  cpu_hash_table_buff,
343  hash_entry_info.getNormalizedHashEntryCount(),
344  hash_join_invalid_val,
345  thread_idx,
346  thread_count));
347  }
348  for (auto& child : init_threads) {
349  child.wait();
350  }
351  for (auto& child : init_threads) {
352  child.get();
353  }
354 #endif // !HAVE_TBB
355  }
356  {
357  auto timer_fill = DEBUG_TIMER(
358  "CPU One-To-Many Perfect Hash Table Builder: fill_hash_join_buff_bucketized");
359  auto translated_null_val = col_range.getIntMax() + 1;
360  if (col_range.getIntMax() < col_range.getIntMin()) {
361  translated_null_val = col_range.getIntMin() - 1;
362  }
363  if (ti.get_type() == kDATE) {
365  cpu_hash_table_buff,
366  hash_entry_info,
367  join_column,
368  {static_cast<size_t>(ti.get_size()),
369  col_range.getIntMin(),
370  col_range.getIntMax(),
372  is_bitwise_eq,
373  translated_null_val,
375  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
376  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
377  : 0 /*dummy*/,
378  thread_count);
379  } else {
381  cpu_hash_table_buff,
382  hash_entry_info,
383  join_column,
384  {static_cast<size_t>(ti.get_size()),
385  col_range.getIntMin(),
386  col_range.getIntMax(),
388  is_bitwise_eq,
389  translated_null_val,
391  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
392  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
393  : 0 /*dummy*/,
394  thread_count,
395  join_type == JoinType::WINDOW_FUNCTION_FRAMING);
396  }
397  }
398  }
399 
400  std::unique_ptr<PerfectHashTable> getHashTable() { return std::move(hash_table_); }
401 
402  static size_t get_entries_per_shard(const size_t total_entry_count,
403  const size_t shard_count) {
404  CHECK_NE(size_t(0), shard_count);
405  return (total_entry_count + shard_count - 1) / shard_count;
406  }
407 
408  const bool for_semi_anti_join(const JoinType join_type) {
409  return join_type == JoinType::SEMI || join_type == JoinType::ANTI;
410  }
411 
412  private:
413  std::unique_ptr<PerfectHashTable> hash_table_;
414 };
int64_t getIntMin() const
std::vector< int > ChunkKey
Definition: types.h:36
JoinType
Definition: sqldefs.h:165
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, const bool for_window_framing)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:106
size_t num_elems
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:337
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
#define CHECK_GT(x, y)
Definition: Logger.h:305
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
std::string to_string(char const *&&v)
future< Result > async(Fn &&fn, Args &&...args)
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, BucketizedHashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
int64_t bucket_normalization
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, BucketizedHashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
#define CHECK_NE(x, y)
Definition: Logger.h:302
An AbstractBuffer is a unit of data management for a data manager.
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const bool for_window_framing)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unique_ptr< PerfectHashTable > getHashTable()
Definition: sqltypes.h:70
size_t getNormalizedHashEntryCount() const
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Data_Namespace::DataMgr * getDataMgr() const
Definition: Execute.h:571
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unique_ptr< PerfectHashTable > hash_table_
int cpu_threads()
Definition: thread_count.h:25
HashType
Definition: HashTable.h:19
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
size_t bucketized_hash_entry_count