OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
21 #include "Shared/scope.h"
22 
24  public:
26 
27  void allocateDeviceMemory(const JoinColumn& join_column,
28  const HashType layout,
29  HashEntryInfo& hash_entry_info,
30  const size_t shard_count,
31  const int device_id,
32  const int device_count,
33  const Executor* executor) {
34 #ifdef HAVE_CUDA
35  if (shard_count) {
36  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
37  CHECK_GT(shards_per_device, 0u);
38  const size_t entries_per_shard =
39  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
40  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
41  }
42  const size_t total_count =
43  layout == HashType::OneToOne
44  ? hash_entry_info.getNormalizedHashEntryCount()
45  : 2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
47  hash_table_ =
48  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
49  layout,
51  hash_entry_info.getNormalizedHashEntryCount(),
52  join_column.num_elems);
53  hash_table_->allocateGpuMemory(total_count, device_id);
54 #else
55  UNREACHABLE();
56 #endif // HAVE_CUDA
57  }
58 
59 #ifdef HAVE_CUDA
60  void initHashTableOnGpu(const ChunkKey& chunk_key,
61  const JoinColumn& join_column,
62  const ExpressionRange& col_range,
63  const bool is_bitwise_eq,
64  const InnerOuter& cols,
65  const JoinType join_type,
66  const HashType layout,
67  const HashEntryInfo hash_entry_info,
68  const size_t shard_count,
69  const int32_t hash_join_invalid_val,
70  const int device_id,
71  const int device_count,
72  const Executor* executor) {
73  auto data_mgr = executor->getDataMgr();
74  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
75  CudaAllocator::allocGpuAbstractBuffer(data_mgr, sizeof(int), device_id);
76  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
77  data_mgr->free(gpu_hash_table_err_buff);
78  };
79  CHECK(gpu_hash_table_err_buff);
80  auto dev_err_buff =
81  reinterpret_cast<CUdeviceptr>(gpu_hash_table_err_buff->getMemoryPtr());
82  int err{0};
83  copy_to_gpu(data_mgr, dev_err_buff, &err, sizeof(err), device_id);
84 
86  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
87 
88  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
89  hash_entry_info.getNormalizedHashEntryCount(),
90  hash_join_invalid_val);
91  if (chunk_key.empty()) {
92  return;
93  }
94 
95  // TODO: pass this in? duplicated in JoinHashTable currently
96  const auto inner_col = cols.first;
97  CHECK(inner_col);
98  const auto& ti = inner_col->get_type_info();
99 
100  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
101  col_range.getIntMin(),
102  col_range.getIntMax(),
104  is_bitwise_eq,
105  col_range.getIntMax() + 1,
107  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
108  if (shard_count) {
109  const size_t entries_per_shard =
110  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
111  CHECK_GT(device_count, 0);
112  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
113  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
114  if (layout == HashType::OneToOne) {
116  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
117  hash_join_invalid_val,
118  for_semi_anti_join(join_type),
119  reinterpret_cast<int*>(dev_err_buff),
120  join_column,
121  type_info,
122  shard_info,
123  hash_entry_info.bucket_normalization);
124  } else {
126  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
127  hash_entry_info,
128  hash_join_invalid_val,
129  join_column,
130  type_info,
131  shard_info);
132  }
133  }
134  } else {
135  if (layout == HashType::OneToOne) {
137  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
138  hash_join_invalid_val,
139  for_semi_anti_join(join_type),
140  reinterpret_cast<int*>(dev_err_buff),
141  join_column,
142  type_info,
143  hash_entry_info.bucket_normalization);
144  } else {
145  if (use_bucketization) {
147  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
148  hash_entry_info,
149  hash_join_invalid_val,
150  join_column,
151  type_info);
152  } else {
154  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
155  hash_entry_info,
156  hash_join_invalid_val,
157  join_column,
158  type_info);
159  }
160  }
161  }
162  copy_from_gpu(data_mgr, &err, dev_err_buff, sizeof(err), device_id);
163  if (err) {
164  if (layout == HashType::OneToOne) {
165  throw NeedsOneToManyHash();
166  } else {
167  throw std::runtime_error("Unexpected error when building perfect hash table: " +
168  std::to_string(err));
169  }
170  }
171  }
172 #endif
173 
174  void initOneToOneHashTableOnCpu(const JoinColumn& join_column,
175  const ExpressionRange& col_range,
176  const bool is_bitwise_eq,
177  const InnerOuter& cols,
178  const JoinType join_type,
179  const HashType hash_type,
180  const HashEntryInfo hash_entry_info,
181  const int32_t hash_join_invalid_val,
182  const Executor* executor) {
183  auto timer = DEBUG_TIMER(__func__);
184  const auto inner_col = cols.first;
185  CHECK(inner_col);
186  const auto& ti = inner_col->get_type_info();
187 
188  CHECK(!hash_table_);
189  hash_table_ =
190  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
191  hash_type,
193  hash_entry_info.getNormalizedHashEntryCount(),
194  0);
195 
196  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
197  const StringDictionaryProxy* sd_inner_proxy{nullptr};
198  const StringDictionaryProxy* sd_outer_proxy{nullptr};
199  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
200  const bool for_semi_join = for_semi_anti_join(join_type);
201  if (ti.is_string() &&
202  (outer_col && !(inner_col->get_comp_param() == outer_col->get_comp_param()))) {
203  CHECK_EQ(kENCODING_DICT, ti.get_compression());
204  sd_inner_proxy =
205  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
206  CHECK(sd_inner_proxy);
207  CHECK(outer_col);
208  sd_outer_proxy =
209  executor->getStringDictionaryProxy(outer_col->get_comp_param(), true);
210  CHECK(sd_outer_proxy);
211  }
212  int thread_count = cpu_threads();
213  std::vector<std::thread> init_cpu_buff_threads;
214  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
215  init_cpu_buff_threads.emplace_back([hash_entry_info,
216  hash_join_invalid_val,
217  thread_idx,
218  thread_count,
219  cpu_hash_table_buff] {
220  init_hash_join_buff(cpu_hash_table_buff,
221  hash_entry_info.getNormalizedHashEntryCount(),
222  hash_join_invalid_val,
223  thread_idx,
224  thread_count);
225  });
226  }
227  for (auto& t : init_cpu_buff_threads) {
228  t.join();
229  }
230  init_cpu_buff_threads.clear();
231  std::atomic<int> err{0};
232  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
233  init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
234  &join_column,
235  sd_inner_proxy,
236  sd_outer_proxy,
237  thread_idx,
238  thread_count,
239  &ti,
240  &err,
241  &col_range,
242  &is_bitwise_eq,
243  &for_semi_join,
244  cpu_hash_table_buff,
245  hash_entry_info] {
246  int partial_err =
247  fill_hash_join_buff_bucketized(cpu_hash_table_buff,
248  hash_join_invalid_val,
249  for_semi_join,
250  join_column,
251  {static_cast<size_t>(ti.get_size()),
252  col_range.getIntMin(),
253  col_range.getIntMax(),
255  is_bitwise_eq,
256  col_range.getIntMax() + 1,
258  sd_inner_proxy,
259  sd_outer_proxy,
260  thread_idx,
261  thread_count,
262  hash_entry_info.bucket_normalization);
263  int zero{0};
264  err.compare_exchange_strong(zero, partial_err);
265  });
266  }
267  for (auto& t : init_cpu_buff_threads) {
268  t.join();
269  }
270  if (err) {
271  // Too many hash entries, need to retry with a 1:many table
272  hash_table_ = nullptr; // clear the hash table buffer
273  throw NeedsOneToManyHash();
274  }
275  }
276 
278  const JoinColumn& join_column,
279  const ExpressionRange& col_range,
280  const bool is_bitwise_eq,
281  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
282  const HashEntryInfo hash_entry_info,
283  const int32_t hash_join_invalid_val,
284  const Executor* executor) {
285  auto timer = DEBUG_TIMER(__func__);
286  const auto inner_col = cols.first;
287  CHECK(inner_col);
288  const auto& ti = inner_col->get_type_info();
289 
290  CHECK(!hash_table_);
291  hash_table_ =
292  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
295  hash_entry_info.getNormalizedHashEntryCount(),
296  join_column.num_elems);
297 
298  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
299  const StringDictionaryProxy* sd_inner_proxy{nullptr};
300  const StringDictionaryProxy* sd_outer_proxy{nullptr};
301  if (ti.is_string()) {
302  CHECK_EQ(kENCODING_DICT, ti.get_compression());
303  sd_inner_proxy =
304  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
305  CHECK(sd_inner_proxy);
306  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
307  CHECK(outer_col);
308  sd_outer_proxy =
309  executor->getStringDictionaryProxy(outer_col->get_comp_param(), true);
310  CHECK(sd_outer_proxy);
311  }
312  int thread_count = cpu_threads();
313  std::vector<std::future<void>> init_threads;
314  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
315  init_threads.emplace_back(std::async(std::launch::async,
317  cpu_hash_table_buff,
318  hash_entry_info.getNormalizedHashEntryCount(),
319  hash_join_invalid_val,
320  thread_idx,
321  thread_count));
322  }
323  for (auto& child : init_threads) {
324  child.wait();
325  }
326  for (auto& child : init_threads) {
327  child.get();
328  }
329 
330  if (ti.get_type() == kDATE) {
331  fill_one_to_many_hash_table_bucketized(cpu_hash_table_buff,
332  hash_entry_info,
333  hash_join_invalid_val,
334  join_column,
335  {static_cast<size_t>(ti.get_size()),
336  col_range.getIntMin(),
337  col_range.getIntMax(),
339  is_bitwise_eq,
340  col_range.getIntMax() + 1,
342  sd_inner_proxy,
343  sd_outer_proxy,
344  thread_count);
345  } else {
346  fill_one_to_many_hash_table(cpu_hash_table_buff,
347  hash_entry_info,
348  hash_join_invalid_val,
349  join_column,
350  {static_cast<size_t>(ti.get_size()),
351  col_range.getIntMin(),
352  col_range.getIntMax(),
354  is_bitwise_eq,
355  col_range.getIntMax() + 1,
357  sd_inner_proxy,
358  sd_outer_proxy,
359  thread_count);
360  }
361  }
362 
363  std::unique_ptr<PerfectHashTable> getHashTable() { return std::move(hash_table_); }
364 
365  static size_t get_entries_per_shard(const size_t total_entry_count,
366  const size_t shard_count) {
367  CHECK_NE(size_t(0), shard_count);
368  return (total_entry_count + shard_count - 1) / shard_count;
369  }
370 
371  const bool for_semi_anti_join(const JoinType join_type) {
372  return join_type == JoinType::SEMI || join_type == JoinType::ANTI;
373  }
374 
375  private:
376  std::unique_ptr<PerfectHashTable> hash_table_;
377 };
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
JoinType
Definition: sqldefs.h:108
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
size_t num_elems
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
virtual int8_t * getMemoryPtr()=0
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:253
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string to_string(char const *&&v)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
future< Result > async(Fn &&fn, Args &&...args)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
#define CHECK_NE(x, y)
Definition: Logger.h:218
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:421
int64_t bucket_normalization
An AbstractBuffer is a unit of data management for a data manager.
size_t hash_entry_count
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
std::unique_ptr< PerfectHashTable > getHashTable()
Definition: sqltypes.h:53
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Data_Namespace::DataMgr * getDataMgr() const
Definition: Execute.h:448
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
char * t
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
std::unique_ptr< PerfectHashTable > hash_table_
int cpu_threads()
Definition: thread_count.h:24
HashType
Definition: HashTable.h:19
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)