OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
22 
23 #include "Shared/scope.h"
24 
26  public:
28 
30  PerfectHashTableEntryInfo hash_table_entry_info,
31  const size_t shard_count,
32  const int device_id,
33  const int device_count,
34  const Executor* executor) {
35 #ifdef HAVE_CUDA
36  if (shard_count) {
37  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
38  CHECK_GT(shards_per_device, 0u);
39  const size_t entries_per_shard =
40  get_entries_per_shard(hash_entry_info.bucketized_hash_entry_count, shard_count);
41  hash_entry_info.bucketized_hash_entry_count = entries_per_shard * shards_per_device;
42  hash_table_entry_info.setNumHashEntries(
43  hash_entry_info.getNormalizedHashEntryCount());
44  }
46  hash_table_ = std::make_unique<PerfectHashTable>(ExecutorDeviceType::GPU,
47  hash_table_entry_info,
48  executor->getDataMgr(),
49  device_id);
50  if (hash_table_entry_info.getNumKeys() == 0) {
51  VLOG(1) << "Stop building a hash table based on a column: an input table is empty";
52  return;
53  }
54  hash_table_->allocateGpuMemory(hash_table_entry_info.computeTotalNumSlots());
55 #else
56  UNREACHABLE();
57 #endif // HAVE_CUDA
58  }
59 
60 #ifdef HAVE_CUDA
61  void initHashTableOnGpu(const ChunkKey& chunk_key,
62  const JoinColumn& join_column,
63  const ExpressionRange& col_range,
64  const bool is_bitwise_eq,
65  const InnerOuter& cols,
66  const JoinType join_type,
67  const BucketizedHashEntryInfo hash_entry_info,
68  PerfectHashTableEntryInfo hash_table_entry_info,
69  const size_t shard_count,
70  const int32_t hash_join_invalid_val,
71  const int device_id,
72  const int device_count,
73  const Executor* executor) {
74  auto timer = DEBUG_TIMER(__func__);
75  if (hash_table_entry_info.getNumKeys() == 0) {
76  VLOG(1) << "Stop building a hash table based on a column: an input table is empty";
77  return;
78  }
79  auto data_mgr = executor->getDataMgr();
80  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
81  CudaAllocator::allocGpuAbstractBuffer(data_mgr, sizeof(int), device_id);
82  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
83  data_mgr->free(gpu_hash_table_err_buff);
84  };
85  CHECK(gpu_hash_table_err_buff);
86  auto dev_err_buff = gpu_hash_table_err_buff->getMemoryPtr();
87  int err{0};
88  auto allocator = std::make_unique<CudaAllocator>(
89  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
90  allocator->copyToDevice(dev_err_buff, &err, sizeof(err));
92  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
93  {
94  auto timer_init = DEBUG_TIMER("Initialize GPU Perfect Hash Table");
95  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
96  hash_entry_info.getNormalizedHashEntryCount(),
97  hash_join_invalid_val);
98  }
99  if (chunk_key.empty()) {
100  return;
101  }
102  // TODO: pass this in? duplicated in JoinHashTable currently
103  const auto inner_col = cols.first;
104  CHECK(inner_col);
105  const auto& ti = inner_col->get_type_info();
106  auto translated_null_val = col_range.getIntMax() + 1;
107  if (col_range.getIntMax() < col_range.getIntMin()) {
108  translated_null_val = col_range.getIntMin() - 1;
109  }
110  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
111  col_range.getIntMin(),
112  col_range.getIntMax(),
114  is_bitwise_eq,
115  translated_null_val,
117  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
118  auto timer_fill = DEBUG_TIMER("Fill GPU Perfect Hash Table");
119  if (hash_table_entry_info.getHashTableLayout() == HashType::OneToOne) {
121  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
122  reinterpret_cast<int32_t*>(dev_err_buff),
123  hash_join_invalid_val,
124  for_semi_anti_join(join_type),
125  join_column,
126  type_info,
127  nullptr,
128  -1,
129  hash_entry_info.bucket_normalization};
130  if (shard_count) {
131  const size_t entries_per_shard = get_entries_per_shard(
132  hash_entry_info.bucketized_hash_entry_count, shard_count);
133  CHECK_GT(device_count, 0);
134  decltype(&fill_hash_join_buff_on_device_sharded) const hash_table_fill_func =
137  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
138  auto const shard_info =
139  ShardInfo{shard, entries_per_shard, shard_count, device_count};
140  hash_table_fill_func(one_to_one_args, shard_info);
141  }
142  } else {
143  decltype(&fill_hash_join_buff_on_device) const hash_table_fill_func =
144  use_bucketization ? fill_hash_join_buff_on_device_bucketized
146  hash_table_fill_func(one_to_one_args);
147  }
148  } else { // layout == HashType::OneToMany
150  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
151  hash_entry_info,
152  join_column,
153  type_info,
154  nullptr,
155  -1,
156  hash_entry_info.bucket_normalization,
157  join_type == JoinType::WINDOW_FUNCTION_FRAMING};
158  if (shard_count) {
159  const size_t entries_per_shard = get_entries_per_shard(
160  hash_entry_info.bucketized_hash_entry_count, shard_count);
161  CHECK_GT(device_count, 0);
162  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
163  auto const shard_info =
164  ShardInfo{shard, entries_per_shard, shard_count, device_count};
165  fill_one_to_many_hash_table_on_device_sharded(one_to_many_args, shard_info);
166  }
167  } else {
168  decltype(&fill_one_to_many_hash_table_on_device) const hash_table_fill_func =
171  hash_table_fill_func(one_to_many_args);
172  }
173  }
174  allocator->copyFromDevice(&err, dev_err_buff, sizeof(err));
175  if (err) {
176  if (hash_table_entry_info.getHashTableLayout() == HashType::OneToOne) {
177  throw NeedsOneToManyHash();
178  } else {
179  throw std::runtime_error("Unexpected error when building perfect hash table: " +
180  std::to_string(err));
181  }
182  }
183  }
184 #endif
185 
187  const JoinColumn& join_column,
188  const ExpressionRange& col_range,
189  const bool is_bitwise_eq,
190  const InnerOuter& cols,
191  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
192  const JoinType join_type,
193  const BucketizedHashEntryInfo hash_entry_info,
194  const PerfectHashTableEntryInfo hash_table_entry_info,
195  const int32_t hash_join_invalid_val,
196  const Executor* executor) {
197  auto timer = DEBUG_TIMER(__func__);
198  const auto inner_col = cols.first;
199  CHECK(inner_col);
200  const auto& ti = inner_col->get_type_info();
201  CHECK(!hash_table_);
202  hash_table_ = std::make_unique<PerfectHashTable>(ExecutorDeviceType::CPU,
203  hash_table_entry_info);
204  if (hash_table_entry_info.getNumKeys() == 0) {
205  VLOG(1) << "Stop building a hash table based on a column: an input table is empty";
206  return;
207  }
208  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
209  const int thread_count = cpu_threads();
210  {
211  DEBUG_TIMER("Initialize CPU One-To-One Perfect Hash Table");
212 #ifdef HAVE_TBB
213  init_hash_join_buff_tbb(cpu_hash_table_buff,
214  hash_entry_info.getNormalizedHashEntryCount(),
215  hash_join_invalid_val);
216 #else // #ifdef HAVE_TBB
217  std::vector<std::thread> init_cpu_buff_threads;
218  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
219  init_cpu_buff_threads.emplace_back([hash_entry_info,
220  hash_join_invalid_val,
221  thread_idx,
222  thread_count,
223  cpu_hash_table_buff] {
224  init_hash_join_buff(cpu_hash_table_buff,
225  hash_entry_info.getNormalizedHashEntryCount(),
226  hash_join_invalid_val,
227  thread_idx,
228  thread_count);
229  });
230  }
231  for (auto& t : init_cpu_buff_threads) {
232  t.join();
233  }
234  init_cpu_buff_threads.clear();
235 #endif // !HAVE_TBB
236  }
237  auto const for_semi_join = for_semi_anti_join(join_type);
238  auto const use_bucketization = inner_col->get_type_info().get_type() == kDATE;
239  auto translated_null_val = col_range.getIntMax() + 1;
240  if (col_range.getIntMax() < col_range.getIntMin()) {
241  translated_null_val = col_range.getIntMin() - 1;
242  }
243  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
244  col_range.getIntMin(),
245  col_range.getIntMax(),
247  is_bitwise_eq,
248  translated_null_val,
250  DEBUG_TIMER("Fill CPU One-To-One Perfect Hash Table");
252  cpu_hash_table_buff,
253  nullptr,
254  hash_join_invalid_val,
255  for_semi_join,
256  join_column,
257  type_info,
258  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
259  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
260  : 0, // 0 is dummy value
261  hash_entry_info.bucket_normalization};
262  decltype(&fill_hash_join_buff) const hash_table_fill_func =
263  use_bucketization ? fill_hash_join_buff_bucketized
264  : type_info.uses_bw_eq ? fill_hash_join_buff_bitwise_eq
266 
267  std::vector<std::future<int>> fill_threads;
268  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
269  fill_threads.emplace_back(std::async(
270  std::launch::async, hash_table_fill_func, args, thread_idx, thread_count));
271  }
272  for (auto& child : fill_threads) {
273  child.wait();
274  }
275  for (auto& child : fill_threads) {
276  if (child.get()) { // see if task returns an error code
277  // Too many hash entries, need to retry with a 1:many table
278  hash_table_ = nullptr; // clear the hash table buffer
279  throw NeedsOneToManyHash();
280  }
281  }
282  }
283 
285  const JoinColumn& join_column,
286  const ExpressionRange& col_range,
287  const bool is_bitwise_eq,
288  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
289  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
290  const JoinType join_type,
291  const BucketizedHashEntryInfo hash_entry_info,
292  const PerfectHashTableEntryInfo hash_table_entry_info,
293  const int32_t hash_join_invalid_val,
294  const Executor* executor) {
295  auto timer = DEBUG_TIMER(__func__);
296  const auto inner_col = cols.first;
297  CHECK(inner_col);
298  const auto& ti = inner_col->get_type_info();
299  CHECK(!hash_table_);
300  hash_table_ = std::make_unique<PerfectHashTable>(ExecutorDeviceType::CPU,
301  hash_table_entry_info);
302  if (hash_table_entry_info.getNumKeys() == 0) {
303  VLOG(1) << "Stop building a hash table based on a column: an input table is empty";
304  return;
305  }
306  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
307  int thread_count = cpu_threads();
308  {
309  auto timer_init = DEBUG_TIMER("Initialize CPU One-To-Many Perfect Hash Table");
310 #ifdef HAVE_TBB
311  init_hash_join_buff_tbb(cpu_hash_table_buff,
312  hash_entry_info.getNormalizedHashEntryCount(),
313  hash_join_invalid_val);
314 #else // #ifdef HAVE_TBB
315  std::vector<std::future<void>> init_threads;
316  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
317  init_threads.emplace_back(
320  cpu_hash_table_buff,
321  hash_entry_info.getNormalizedHashEntryCount(),
322  hash_join_invalid_val,
323  thread_idx,
324  thread_count));
325  }
326  for (auto& child : init_threads) {
327  child.wait();
328  }
329  for (auto& child : init_threads) {
330  child.get();
331  }
332 #endif // !HAVE_TBB
333  }
334  auto timer_build = DEBUG_TIMER("Fill CPU One-To-Many Perfect Hash Table");
335  auto const use_bucketization = inner_col->get_type_info().get_type() == kDATE;
336  auto translated_null_val = col_range.getIntMax() + 1;
337  if (col_range.getIntMax() < col_range.getIntMin()) {
338  translated_null_val = col_range.getIntMin() - 1;
339  }
340  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
341  col_range.getIntMin(),
342  col_range.getIntMax(),
344  is_bitwise_eq,
345  translated_null_val,
348  cpu_hash_table_buff,
349  hash_entry_info,
350  join_column,
351  type_info,
352  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
353  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
354  : 0 /*dummy*/,
355  hash_entry_info.bucket_normalization,
356  join_type == JoinType::WINDOW_FUNCTION_FRAMING};
357  decltype(&fill_one_to_many_hash_table) const hash_table_fill_func =
358  use_bucketization ? fill_one_to_many_hash_table_bucketized
360  hash_table_fill_func(args, thread_count);
361  }
362 
363  std::unique_ptr<PerfectHashTable> getHashTable() {
364  return std::move(hash_table_);
365  }
366 
367  static size_t get_entries_per_shard(const size_t total_entry_count,
368  const size_t shard_count) {
369  CHECK_NE(size_t(0), shard_count);
370  return (total_entry_count + shard_count - 1) / shard_count;
371  }
372 
373  const bool for_semi_anti_join(const JoinType join_type) {
374  return join_type == JoinType::SEMI || join_type == JoinType::ANTI;
375  }
376 
377  private:
378  std::unique_ptr<PerfectHashTable> hash_table_;
379 };
int64_t getIntMin() const
std::vector< int > ChunkKey
Definition: types.h:36
void fill_hash_join_buff_on_device_sharded(OneToOnePerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
JoinType
Definition: sqldefs.h:174
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:105
void fill_one_to_many_hash_table_on_device(OneToManyPerfectJoinHashTableFillFuncArgs const args)
void fill_hash_join_buff_on_device(OneToOnePerfectJoinHashTableFillFuncArgs const args)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:338
DEVICE int SUFFIX() fill_hash_join_buff_bitwise_eq(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void allocateDeviceMemory(BucketizedHashEntryInfo hash_entry_info, PerfectHashTableEntryInfo hash_table_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
#define CHECK_GT(x, y)
Definition: Logger.h:305
void fill_one_to_many_hash_table(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
std::string to_string(char const *&&v)
DEVICE int SUFFIX() fill_hash_join_buff(OneToOnePerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
future< Result > async(Fn &&fn, Args &&...args)
int64_t bucket_normalization
HashType getHashTableLayout() const
Definition: HashTable.h:53
#define CHECK_NE(x, y)
Definition: Logger.h:302
An AbstractBuffer is a unit of data management for a data manager.
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
std::unique_ptr< PerfectHashTable > getHashTable()
size_t getNumKeys() const
Definition: HashTable.h:51
Definition: sqltypes.h:80
size_t getNormalizedHashEntryCount() const
void fill_one_to_many_hash_table_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
void fill_hash_join_buff_on_device_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args)
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void setNumHashEntries(size_t num_hash_entries)
Definition: HashTable.h:54
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, int32_t const cpu_thread_idx, int32_t const cpu_thread_count)
std::unique_ptr< PerfectHashTable > hash_table_
void fill_one_to_many_hash_table_on_device_sharded(OneToManyPerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
int cpu_threads()
Definition: thread_count.h:25
void fill_hash_join_buff_on_device_sharded_bucketized(OneToOnePerfectJoinHashTableFillFuncArgs const args, ShardInfo const shard_info)
size_t bucketized_hash_entry_count
#define VLOG(n)
Definition: Logger.h:388
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const PerfectHashTableEntryInfo hash_table_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void fill_one_to_many_hash_table_on_device_bucketized(OneToManyPerfectJoinHashTableFillFuncArgs const args)