OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
21 #include "Shared/scope.h"
22 
24  public:
26  : catalog_(catalog) {}
27 
28  void allocateDeviceMemory(const JoinColumn& join_column,
29  const HashType layout,
30  HashEntryInfo& hash_entry_info,
31  const size_t shard_count,
32  const int device_id,
33  const int device_count) {
34 #ifdef HAVE_CUDA
35  if (shard_count) {
36  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
37  CHECK_GT(shards_per_device, 0u);
38  const size_t entries_per_shard =
39  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
40  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
41  }
42  const size_t total_count =
43  layout == HashType::OneToOne
44  ? hash_entry_info.getNormalizedHashEntryCount()
45  : 2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
47  hash_table_ =
48  std::make_unique<PerfectHashTable>(catalog_,
49  layout,
51  hash_entry_info.getNormalizedHashEntryCount(),
52  join_column.num_elems);
53  hash_table_->allocateGpuMemory(total_count, device_id);
54 #else
55  UNREACHABLE();
56 #endif // HAVE_CUDA
57  }
58 
59 #ifdef HAVE_CUDA
60  void initHashTableOnGpu(const ChunkKey& chunk_key,
61  const JoinColumn& join_column,
62  const ExpressionRange& col_range,
63  const bool is_bitwise_eq,
64  const InnerOuter& cols,
65  const HashType layout,
66  const HashEntryInfo hash_entry_info,
67  const size_t shard_count,
68  const int32_t hash_join_invalid_val,
69  const int device_id,
70  const int device_count,
71  const Executor* executor) {
72  auto catalog = executor->getCatalog();
73  auto& data_mgr = catalog->getDataMgr();
74  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
75  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, sizeof(int), device_id);
76  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
77  data_mgr.free(gpu_hash_table_err_buff);
78  };
79  CHECK(gpu_hash_table_err_buff);
80  auto dev_err_buff =
81  reinterpret_cast<CUdeviceptr>(gpu_hash_table_err_buff->getMemoryPtr());
82  int err{0};
83  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
84 
86  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
87 
88  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
89  hash_entry_info.getNormalizedHashEntryCount(),
90  hash_join_invalid_val);
91  if (chunk_key.empty()) {
92  return;
93  }
94 
95  // TODO: pass this in? duplicated in JoinHashTable currently
96  const auto inner_col = cols.first;
97  CHECK(inner_col);
98  const auto& ti = inner_col->get_type_info();
99 
100  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
101  col_range.getIntMin(),
102  col_range.getIntMax(),
104  is_bitwise_eq,
105  col_range.getIntMax() + 1,
107  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
108  if (shard_count) {
109  const size_t entries_per_shard =
110  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
111  CHECK_GT(device_count, 0);
112  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
113  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
114  if (layout == HashType::OneToOne) {
116  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
117  hash_join_invalid_val,
118  reinterpret_cast<int*>(dev_err_buff),
119  join_column,
120  type_info,
121  shard_info,
122  hash_entry_info.bucket_normalization);
123  } else {
125  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
126  hash_entry_info,
127  hash_join_invalid_val,
128  join_column,
129  type_info,
130  shard_info);
131  }
132  }
133  } else {
134  if (layout == HashType::OneToOne) {
136  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
137  hash_join_invalid_val,
138  reinterpret_cast<int*>(dev_err_buff),
139  join_column,
140  type_info,
141  hash_entry_info.bucket_normalization);
142  } else {
143  if (use_bucketization) {
145  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
146  hash_entry_info,
147  hash_join_invalid_val,
148  join_column,
149  type_info);
150  } else {
152  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
153  hash_entry_info,
154  hash_join_invalid_val,
155  join_column,
156  type_info);
157  }
158  }
159  }
160  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
161  if (err) {
162  if (layout == HashType::OneToOne) {
163  throw NeedsOneToManyHash();
164  } else {
165  throw std::runtime_error("Unexpected error when building perfect hash table: " +
166  std::to_string(err));
167  }
168  }
169  }
170 #endif
171 
172  void initOneToOneHashTableOnCpu(const JoinColumn& join_column,
173  const ExpressionRange& col_range,
174  const bool is_bitwise_eq,
175  const InnerOuter& cols,
176  const HashEntryInfo hash_entry_info,
177  const int32_t hash_join_invalid_val,
178  const Executor* executor) {
179  auto timer = DEBUG_TIMER(__func__);
180  const auto inner_col = cols.first;
181  CHECK(inner_col);
182  const auto& ti = inner_col->get_type_info();
183 
184  CHECK(!hash_table_);
185  hash_table_ =
186  std::make_unique<PerfectHashTable>(catalog_,
189  hash_entry_info.getNormalizedHashEntryCount(),
190  0);
191 
192  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
193  const StringDictionaryProxy* sd_inner_proxy{nullptr};
194  const StringDictionaryProxy* sd_outer_proxy{nullptr};
195  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
196  if (ti.is_string() &&
197  (outer_col && !(inner_col->get_comp_param() == outer_col->get_comp_param()))) {
198  CHECK_EQ(kENCODING_DICT, ti.get_compression());
199  sd_inner_proxy =
200  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
201  CHECK(sd_inner_proxy);
202  CHECK(outer_col);
203  sd_outer_proxy =
204  executor->getStringDictionaryProxy(outer_col->get_comp_param(), true);
205  CHECK(sd_outer_proxy);
206  }
207  int thread_count = cpu_threads();
208  std::vector<std::thread> init_cpu_buff_threads;
209  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
210  init_cpu_buff_threads.emplace_back([hash_entry_info,
211  hash_join_invalid_val,
212  thread_idx,
213  thread_count,
214  cpu_hash_table_buff] {
215  init_hash_join_buff(cpu_hash_table_buff,
216  hash_entry_info.getNormalizedHashEntryCount(),
217  hash_join_invalid_val,
218  thread_idx,
219  thread_count);
220  });
221  }
222  for (auto& t : init_cpu_buff_threads) {
223  t.join();
224  }
225  init_cpu_buff_threads.clear();
226  std::atomic<int> err{0};
227  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
228  init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
229  &join_column,
230  sd_inner_proxy,
231  sd_outer_proxy,
232  thread_idx,
233  thread_count,
234  &ti,
235  &err,
236  &col_range,
237  &is_bitwise_eq,
238  cpu_hash_table_buff,
239  hash_entry_info] {
240  int partial_err =
241  fill_hash_join_buff_bucketized(cpu_hash_table_buff,
242  hash_join_invalid_val,
243  join_column,
244  {static_cast<size_t>(ti.get_size()),
245  col_range.getIntMin(),
246  col_range.getIntMax(),
248  is_bitwise_eq,
249  col_range.getIntMax() + 1,
251  sd_inner_proxy,
252  sd_outer_proxy,
253  thread_idx,
254  thread_count,
255  hash_entry_info.bucket_normalization);
256  int zero{0};
257  err.compare_exchange_strong(zero, partial_err);
258  });
259  }
260  for (auto& t : init_cpu_buff_threads) {
261  t.join();
262  }
263  if (err) {
264  // Too many hash entries, need to retry with a 1:many table
265  hash_table_ = nullptr; // clear the hash table buffer
266  throw NeedsOneToManyHash();
267  }
268  }
269 
271  const JoinColumn& join_column,
272  const ExpressionRange& col_range,
273  const bool is_bitwise_eq,
274  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
275  const HashEntryInfo hash_entry_info,
276  const int32_t hash_join_invalid_val,
277  const Executor* executor) {
278  auto timer = DEBUG_TIMER(__func__);
279  const auto inner_col = cols.first;
280  CHECK(inner_col);
281  const auto& ti = inner_col->get_type_info();
282 
283  CHECK(!hash_table_);
284  hash_table_ =
285  std::make_unique<PerfectHashTable>(catalog_,
288  hash_entry_info.getNormalizedHashEntryCount(),
289  join_column.num_elems);
290 
291  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
292  const StringDictionaryProxy* sd_inner_proxy{nullptr};
293  const StringDictionaryProxy* sd_outer_proxy{nullptr};
294  if (ti.is_string()) {
295  CHECK_EQ(kENCODING_DICT, ti.get_compression());
296  sd_inner_proxy =
297  executor->getStringDictionaryProxy(inner_col->get_comp_param(), true);
298  CHECK(sd_inner_proxy);
299  const auto outer_col = dynamic_cast<const Analyzer::ColumnVar*>(cols.second);
300  CHECK(outer_col);
301  sd_outer_proxy =
302  executor->getStringDictionaryProxy(outer_col->get_comp_param(), true);
303  CHECK(sd_outer_proxy);
304  }
305  int thread_count = cpu_threads();
306  std::vector<std::future<void>> init_threads;
307  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
308  init_threads.emplace_back(std::async(std::launch::async,
310  cpu_hash_table_buff,
311  hash_entry_info.getNormalizedHashEntryCount(),
312  hash_join_invalid_val,
313  thread_idx,
314  thread_count));
315  }
316  for (auto& child : init_threads) {
317  child.wait();
318  }
319  for (auto& child : init_threads) {
320  child.get();
321  }
322 
323  if (ti.get_type() == kDATE) {
324  fill_one_to_many_hash_table_bucketized(cpu_hash_table_buff,
325  hash_entry_info,
326  hash_join_invalid_val,
327  join_column,
328  {static_cast<size_t>(ti.get_size()),
329  col_range.getIntMin(),
330  col_range.getIntMax(),
332  is_bitwise_eq,
333  col_range.getIntMax() + 1,
335  sd_inner_proxy,
336  sd_outer_proxy,
337  thread_count);
338  } else {
339  fill_one_to_many_hash_table(cpu_hash_table_buff,
340  hash_entry_info,
341  hash_join_invalid_val,
342  join_column,
343  {static_cast<size_t>(ti.get_size()),
344  col_range.getIntMin(),
345  col_range.getIntMax(),
347  is_bitwise_eq,
348  col_range.getIntMax() + 1,
350  sd_inner_proxy,
351  sd_outer_proxy,
352  thread_count);
353  }
354  }
355 
356  std::unique_ptr<PerfectHashTable> getHashTable() { return std::move(hash_table_); }
357 
358  static size_t get_entries_per_shard(const size_t total_entry_count,
359  const size_t shard_count) {
360  CHECK_NE(size_t(0), shard_count);
361  return (total_entry_count + shard_count - 1) / shard_count;
362  }
363 
364  private:
366 
367  std::unique_ptr<PerfectHashTable> hash_table_;
368 };
int64_t getIntMin() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count)
const Catalog_Namespace::Catalog * catalog_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:76
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
size_t num_elems
virtual int8_t * getMemoryPtr()=0
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
PerfectJoinHashTableBuilder(const Catalog_Namespace::Catalog *catalog)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
#define CHECK_NE(x, y)
Definition: Logger.h:206
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
StringDictionaryProxy * getStringDictionaryProxy(const int dict_id, const bool with_generation) const
Definition: Execute.h:401
int64_t bucket_normalization
An AbstractBuffer is a unit of data management for a data manager.
size_t hash_entry_count
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
const Catalog_Namespace::Catalog * getCatalog() const
Definition: Execute.cpp:276
std::unique_ptr< PerfectHashTable > getHashTable()
Definition: sqltypes.h:52
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
char * t
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
std::unique_ptr< PerfectHashTable > hash_table_
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int cpu_threads()
Definition: thread_count.h:24
HashType
Definition: HashTable.h:19