OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
21 
22 #include "Shared/scope.h"
23 
25  public:
27 
28  void allocateDeviceMemory(const size_t num_column_elems,
29  const HashType layout,
30  HashEntryInfo hash_entry_info,
31  const size_t shard_count,
32  const int device_id,
33  const int device_count,
34  const Executor* executor) {
35 #ifdef HAVE_CUDA
36  if (shard_count) {
37  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
38  CHECK_GT(shards_per_device, 0u);
39  const size_t entries_per_shard =
40  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
41  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
42  }
43  const size_t total_count =
44  layout == HashType::OneToOne
45  ? hash_entry_info.getNormalizedHashEntryCount()
46  : 2 * hash_entry_info.getNormalizedHashEntryCount() + num_column_elems;
48  hash_table_ =
49  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
50  layout,
52  hash_entry_info.getNormalizedHashEntryCount(),
53  num_column_elems);
54  hash_table_->allocateGpuMemory(total_count, device_id);
55 #else
56  UNREACHABLE();
57 #endif // HAVE_CUDA
58  }
59 
60  void allocateDeviceMemory(const JoinColumn& join_column,
61  const HashType layout,
62  HashEntryInfo& hash_entry_info,
63  const size_t shard_count,
64  const int device_id,
65  const int device_count,
66  const Executor* executor) {
67  allocateDeviceMemory(join_column.num_elems,
68  layout,
69  hash_entry_info,
70  shard_count,
71  device_id,
72  device_count,
73  executor);
74  }
75 
76 #ifdef HAVE_CUDA
77  void initHashTableOnGpu(const ChunkKey& chunk_key,
78  const JoinColumn& join_column,
79  const ExpressionRange& col_range,
80  const bool is_bitwise_eq,
81  const InnerOuter& cols,
82  const JoinType join_type,
83  const HashType layout,
84  const HashEntryInfo hash_entry_info,
85  const size_t shard_count,
86  const int32_t hash_join_invalid_val,
87  const int device_id,
88  const int device_count,
89  const Executor* executor) {
90  auto timer = DEBUG_TIMER(__func__);
91  auto data_mgr = executor->getDataMgr();
92  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
93  CudaAllocator::allocGpuAbstractBuffer(data_mgr, sizeof(int), device_id);
94  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
95  data_mgr->free(gpu_hash_table_err_buff);
96  };
97  CHECK(gpu_hash_table_err_buff);
98  auto dev_err_buff = gpu_hash_table_err_buff->getMemoryPtr();
99  int err{0};
100 
101  auto allocator = std::make_unique<CudaAllocator>(
102  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
103  allocator->copyToDevice(dev_err_buff, &err, sizeof(err));
104 
106  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
107 
108  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
109  hash_entry_info.getNormalizedHashEntryCount(),
110  hash_join_invalid_val);
111  if (chunk_key.empty()) {
112  return;
113  }
114 
115  // TODO: pass this in? duplicated in JoinHashTable currently
116  const auto inner_col = cols.first;
117  CHECK(inner_col);
118  const auto& ti = inner_col->get_type_info();
119 
120  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
121  col_range.getIntMin(),
122  col_range.getIntMax(),
124  is_bitwise_eq,
125  col_range.getIntMax() + 1,
127  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
128  if (shard_count) {
129  const size_t entries_per_shard =
130  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
131  CHECK_GT(device_count, 0);
132  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
133  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
134  if (layout == HashType::OneToOne) {
136  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
137  hash_join_invalid_val,
138  for_semi_anti_join(join_type),
139  reinterpret_cast<int*>(dev_err_buff),
140  join_column,
141  type_info,
142  shard_info,
143  hash_entry_info.bucket_normalization);
144  } else {
146  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
147  hash_entry_info,
148  join_column,
149  type_info,
150  shard_info);
151  }
152  }
153  } else {
154  if (layout == HashType::OneToOne) {
156  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
157  hash_join_invalid_val,
158  for_semi_anti_join(join_type),
159  reinterpret_cast<int*>(dev_err_buff),
160  join_column,
161  type_info,
162  hash_entry_info.bucket_normalization);
163  } else {
164  if (use_bucketization) {
166  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
167  hash_entry_info,
168  join_column,
169  type_info);
170  } else {
172  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
173  hash_entry_info,
174  join_column,
175  type_info);
176  }
177  }
178  }
179  allocator->copyFromDevice(&err, dev_err_buff, sizeof(err));
180  if (err) {
181  if (layout == HashType::OneToOne) {
182  throw NeedsOneToManyHash();
183  } else {
184  throw std::runtime_error("Unexpected error when building perfect hash table: " +
185  std::to_string(err));
186  }
187  }
188  }
189 #endif
190 
192  const JoinColumn& join_column,
193  const ExpressionRange& col_range,
194  const bool is_bitwise_eq,
195  const InnerOuter& cols,
196  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
197  const JoinType join_type,
198  const HashType hash_type,
199  const HashEntryInfo hash_entry_info,
200  const int32_t hash_join_invalid_val,
201  const Executor* executor) {
202  auto timer = DEBUG_TIMER(__func__);
203  const auto inner_col = cols.first;
204  CHECK(inner_col);
205  const auto& ti = inner_col->get_type_info();
206 
207  CHECK(!hash_table_);
208  hash_table_ =
209  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
210  hash_type,
212  hash_entry_info.getNormalizedHashEntryCount(),
213  0);
214 
215  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
216  const int thread_count = cpu_threads();
217  std::vector<std::thread> init_cpu_buff_threads;
218 
219  {
220  auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
221 #ifdef HAVE_TBB
222  init_hash_join_buff_tbb(cpu_hash_table_buff,
223  hash_entry_info.getNormalizedHashEntryCount(),
224  hash_join_invalid_val);
225 #else // #ifdef HAVE_TBB
226  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
227  init_cpu_buff_threads.emplace_back([hash_entry_info,
228  hash_join_invalid_val,
229  thread_idx,
230  thread_count,
231  cpu_hash_table_buff] {
232  init_hash_join_buff(cpu_hash_table_buff,
233  hash_entry_info.getNormalizedHashEntryCount(),
234  hash_join_invalid_val,
235  thread_idx,
236  thread_count);
237  });
238  }
239  for (auto& t : init_cpu_buff_threads) {
240  t.join();
241  }
242  init_cpu_buff_threads.clear();
243 #endif // !HAVE_TBB
244  }
245  const bool for_semi_join = for_semi_anti_join(join_type);
246  std::atomic<int> err{0};
247  {
248  auto timer_fill =
249  DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
250  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
251  init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
252  &join_column,
253  str_proxy_translation_map,
254  thread_idx,
255  thread_count,
256  &ti,
257  &err,
258  &col_range,
259  &is_bitwise_eq,
260  &for_semi_join,
261  cpu_hash_table_buff,
262  hash_entry_info] {
263  int partial_err = fill_hash_join_buff_bucketized(
264  cpu_hash_table_buff,
265  hash_join_invalid_val,
266  for_semi_join,
267  join_column,
268  {static_cast<size_t>(ti.get_size()),
269  col_range.getIntMin(),
270  col_range.getIntMax(),
272  is_bitwise_eq,
273  col_range.getIntMax() + 1,
275  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
276  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
277  : 0, // 0 is dummy value
278  thread_idx,
279  thread_count,
280  hash_entry_info.bucket_normalization);
281  int zero{0};
282  err.compare_exchange_strong(zero, partial_err);
283  });
284  }
285  for (auto& t : init_cpu_buff_threads) {
286  t.join();
287  }
288  }
289  if (err) {
290  // Too many hash entries, need to retry with a 1:many table
291  hash_table_ = nullptr; // clear the hash table buffer
292  throw NeedsOneToManyHash();
293  }
294  }
295 
297  const JoinColumn& join_column,
298  const ExpressionRange& col_range,
299  const bool is_bitwise_eq,
300  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
301  const StringDictionaryProxy::IdMap* str_proxy_translation_map,
302  const HashEntryInfo hash_entry_info,
303  const int32_t hash_join_invalid_val,
304  const Executor* executor) {
305  auto timer = DEBUG_TIMER(__func__);
306  const auto inner_col = cols.first;
307  CHECK(inner_col);
308  const auto& ti = inner_col->get_type_info();
309  CHECK(!hash_table_);
310  hash_table_ =
311  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
314  hash_entry_info.getNormalizedHashEntryCount(),
315  join_column.num_elems);
316 
317  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
318 
319  int thread_count = cpu_threads();
320  {
321  auto timer_init =
322  DEBUG_TIMER("CPU One-To-Many Perfect Hash Table Builder: init_hash_join_buff");
323 #ifdef HAVE_TBB
324  init_hash_join_buff_tbb(cpu_hash_table_buff,
325  hash_entry_info.getNormalizedHashEntryCount(),
326  hash_join_invalid_val);
327 #else // #ifdef HAVE_TBB
328  std::vector<std::future<void> > init_threads;
329  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
330  init_threads.emplace_back(
333  cpu_hash_table_buff,
334  hash_entry_info.getNormalizedHashEntryCount(),
335  hash_join_invalid_val,
336  thread_idx,
337  thread_count));
338  }
339  for (auto& child : init_threads) {
340  child.wait();
341  }
342  for (auto& child : init_threads) {
343  child.get();
344  }
345 #endif // !HAVE_TBB
346  }
347  {
348  auto timer_fill = DEBUG_TIMER(
349  "CPU One-To-Many Perfect Hash Table Builder: fill_hash_join_buff_bucketized");
350  if (ti.get_type() == kDATE) {
352  cpu_hash_table_buff,
353  hash_entry_info,
354  join_column,
355  {static_cast<size_t>(ti.get_size()),
356  col_range.getIntMin(),
357  col_range.getIntMax(),
359  is_bitwise_eq,
360  col_range.getIntMax() + 1,
362  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
363  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
364  : 0 /*dummy*/,
365  thread_count);
366  } else {
368  cpu_hash_table_buff,
369  hash_entry_info,
370  join_column,
371  {static_cast<size_t>(ti.get_size()),
372  col_range.getIntMin(),
373  col_range.getIntMax(),
375  is_bitwise_eq,
376  col_range.getIntMax() + 1,
378  str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
379  str_proxy_translation_map ? str_proxy_translation_map->domainStart()
380  : 0 /*dummy*/,
381  thread_count);
382  }
383  }
384  }
385 
386  std::unique_ptr<PerfectHashTable> getHashTable() { return std::move(hash_table_); }
387 
388  static size_t get_entries_per_shard(const size_t total_entry_count,
389  const size_t shard_count) {
390  CHECK_NE(size_t(0), shard_count);
391  return (total_entry_count + shard_count - 1) / shard_count;
392  }
393 
394  const bool for_semi_anti_join(const JoinType join_type) {
395  return join_type == JoinType::SEMI || join_type == JoinType::ANTI;
396  }
397 
398  private:
399  std::unique_ptr<PerfectHashTable> hash_table_;
400 };
int64_t getIntMin() const
std::vector< int > ChunkKey
Definition: types.h:36
JoinType
Definition: sqldefs.h:151
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:95
size_t num_elems
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:266
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
future< Result > async(Fn &&fn, Args &&...args)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
#define CHECK_NE(x, y)
Definition: Logger.h:231
int64_t bucket_normalization
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
An AbstractBuffer is a unit of data management for a data manager.
size_t hash_entry_count
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unique_ptr< PerfectHashTable > getHashTable()
Definition: sqltypes.h:53
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, HashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Data_Namespace::DataMgr * getDataMgr() const
Definition: Execute.h:571
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::unique_ptr< PerfectHashTable > hash_table_
int cpu_threads()
Definition: thread_count.h:24
HashType
Definition: HashTable.h:19
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)