OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PerfectHashTableBuilder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <tbb/parallel_for.h>
20 
22 
23 #include "Shared/scope.h"
24 
26  public:
28 
29  void allocateDeviceMemory(const JoinColumn& join_column,
30  const HashType layout,
31  HashEntryInfo& hash_entry_info,
32  const size_t shard_count,
33  const int device_id,
34  const int device_count,
35  const Executor* executor) {
36 #ifdef HAVE_CUDA
37  if (shard_count) {
38  const auto shards_per_device = (shard_count + device_count - 1) / device_count;
39  CHECK_GT(shards_per_device, 0u);
40  const size_t entries_per_shard =
41  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
42  hash_entry_info.hash_entry_count = entries_per_shard * shards_per_device;
43  }
44  const size_t total_count =
45  layout == HashType::OneToOne
46  ? hash_entry_info.getNormalizedHashEntryCount()
47  : 2 * hash_entry_info.getNormalizedHashEntryCount() + join_column.num_elems;
49  hash_table_ =
50  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
51  layout,
53  hash_entry_info.getNormalizedHashEntryCount(),
54  join_column.num_elems);
55  hash_table_->allocateGpuMemory(total_count, device_id);
56 #else
57  UNREACHABLE();
58 #endif // HAVE_CUDA
59  }
60 
61 #ifdef HAVE_CUDA
62  void initHashTableOnGpu(const ChunkKey& chunk_key,
63  const JoinColumn& join_column,
64  const ExpressionRange& col_range,
65  const bool is_bitwise_eq,
66  const InnerOuter& cols,
67  const JoinType join_type,
68  const HashType layout,
69  const HashEntryInfo hash_entry_info,
70  const size_t shard_count,
71  const int32_t hash_join_invalid_val,
72  const int device_id,
73  const int device_count,
74  const Executor* executor) {
75  auto timer = DEBUG_TIMER(__func__);
76  auto data_mgr = executor->getDataMgr();
77  Data_Namespace::AbstractBuffer* gpu_hash_table_err_buff =
78  CudaAllocator::allocGpuAbstractBuffer(data_mgr, sizeof(int), device_id);
79  ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
80  data_mgr->free(gpu_hash_table_err_buff);
81  };
82  CHECK(gpu_hash_table_err_buff);
83  auto dev_err_buff = gpu_hash_table_err_buff->getMemoryPtr();
84  int err{0};
85 
86  auto allocator = data_mgr->createGpuAllocator(device_id);
87  allocator->copyToDevice(dev_err_buff, &err, sizeof(err));
88 
90  auto gpu_hash_table_buff = hash_table_->getGpuBuffer();
91 
92  init_hash_join_buff_on_device(reinterpret_cast<int32_t*>(gpu_hash_table_buff),
93  hash_entry_info.getNormalizedHashEntryCount(),
94  hash_join_invalid_val);
95  if (chunk_key.empty()) {
96  return;
97  }
98 
99  // TODO: pass this in? duplicated in JoinHashTable currently
100  const auto inner_col = cols.first;
101  CHECK(inner_col);
102  const auto& ti = inner_col->get_type_info();
103 
104  JoinColumnTypeInfo type_info{static_cast<size_t>(ti.get_size()),
105  col_range.getIntMin(),
106  col_range.getIntMax(),
108  is_bitwise_eq,
109  col_range.getIntMax() + 1,
111  auto use_bucketization = inner_col->get_type_info().get_type() == kDATE;
112  if (shard_count) {
113  const size_t entries_per_shard =
114  get_entries_per_shard(hash_entry_info.hash_entry_count, shard_count);
115  CHECK_GT(device_count, 0);
116  for (size_t shard = device_id; shard < shard_count; shard += device_count) {
117  ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
118  if (layout == HashType::OneToOne) {
120  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
121  hash_join_invalid_val,
122  for_semi_anti_join(join_type),
123  reinterpret_cast<int*>(dev_err_buff),
124  join_column,
125  type_info,
126  shard_info,
127  hash_entry_info.bucket_normalization);
128  } else {
130  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
131  hash_entry_info,
132  hash_join_invalid_val,
133  join_column,
134  type_info,
135  shard_info);
136  }
137  }
138  } else {
139  if (layout == HashType::OneToOne) {
141  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
142  hash_join_invalid_val,
143  for_semi_anti_join(join_type),
144  reinterpret_cast<int*>(dev_err_buff),
145  join_column,
146  type_info,
147  hash_entry_info.bucket_normalization);
148  } else {
149  if (use_bucketization) {
151  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
152  hash_entry_info,
153  hash_join_invalid_val,
154  join_column,
155  type_info);
156  } else {
158  reinterpret_cast<int32_t*>(gpu_hash_table_buff),
159  hash_entry_info,
160  hash_join_invalid_val,
161  join_column,
162  type_info);
163  }
164  }
165  }
166  allocator->copyFromDevice(&err, dev_err_buff, sizeof(err));
167  if (err) {
168  if (layout == HashType::OneToOne) {
169  throw NeedsOneToManyHash();
170  } else {
171  throw std::runtime_error("Unexpected error when building perfect hash table: " +
172  std::to_string(err));
173  }
174  }
175  }
176 #endif
177 
179  const JoinColumn& join_column,
180  const ExpressionRange& col_range,
181  const bool is_bitwise_eq,
182  const InnerOuter& cols,
183  const StringDictionaryProxyTranslationMap* str_proxy_translation_map,
184  const JoinType join_type,
185  const HashType hash_type,
186  const HashEntryInfo hash_entry_info,
187  const int32_t hash_join_invalid_val,
188  const Executor* executor) {
189  auto timer = DEBUG_TIMER(__func__);
190  const auto inner_col = cols.first;
191  CHECK(inner_col);
192  const auto& ti = inner_col->get_type_info();
193 
194  CHECK(!hash_table_);
195  hash_table_ =
196  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
197  hash_type,
199  hash_entry_info.getNormalizedHashEntryCount(),
200  0);
201 
202  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
203 
204  // We always expect a non-null translation map (as we use this to in
205  // PerfectJoinHashTable to know if we need to fetch the map or not), but if it's
206  // invalid (i.e. we don't have string columns, or we do but the dictionaries are the
207  // same, isEmpty() will return true)
208  CHECK(str_proxy_translation_map);
209 
210  {
211  auto timer_init =
212  DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff_tbb");
213  init_hash_join_buff_tbb(cpu_hash_table_buff,
214  hash_entry_info.getNormalizedHashEntryCount(),
215  hash_join_invalid_val);
216  }
217 
218  const bool for_semi_join = for_semi_anti_join(join_type);
219  std::atomic<int> err{0};
220 
221  int thread_count = cpu_threads();
222  std::vector<std::thread> init_cpu_buff_threads;
223  {
224  auto timer_fill =
225  DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
226  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
227  init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
228  &join_column,
229  str_proxy_translation_map,
230  thread_idx,
231  thread_count,
232  &ti,
233  &err,
234  &col_range,
235  &is_bitwise_eq,
236  &for_semi_join,
237  cpu_hash_table_buff,
238  hash_entry_info] {
239  int partial_err =
240  fill_hash_join_buff_bucketized(cpu_hash_table_buff,
241  hash_join_invalid_val,
242  for_semi_join,
243  join_column,
244  {static_cast<size_t>(ti.get_size()),
245  col_range.getIntMin(),
246  col_range.getIntMax(),
248  is_bitwise_eq,
249  col_range.getIntMax() + 1,
251  str_proxy_translation_map->dataPtr(),
252  str_proxy_translation_map->domainStart(),
253  thread_idx,
254  thread_count,
255  hash_entry_info.bucket_normalization);
256  int zero{0};
257  err.compare_exchange_strong(zero, partial_err);
258  });
259  }
260  for (auto& t : init_cpu_buff_threads) {
261  t.join();
262  }
263  }
264  if (err) {
265  // Too many hash entries, need to retry with a 1:many table
266  hash_table_ = nullptr; // clear the hash table buffer
267  throw NeedsOneToManyHash();
268  }
269  }
270 
272  const JoinColumn& join_column,
273  const ExpressionRange& col_range,
274  const bool is_bitwise_eq,
275  const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
276  const StringDictionaryProxyTranslationMap* str_proxy_translation_map,
277  const HashEntryInfo hash_entry_info,
278  const int32_t hash_join_invalid_val,
279  const Executor* executor) {
280  auto timer = DEBUG_TIMER(__func__);
281  const auto inner_col = cols.first;
282  CHECK(inner_col);
283  const auto& ti = inner_col->get_type_info();
284  CHECK(!hash_table_);
285  hash_table_ =
286  std::make_unique<PerfectHashTable>(executor->getDataMgr(),
289  hash_entry_info.getNormalizedHashEntryCount(),
290  join_column.num_elems);
291 
292  auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
293 
294  // str_proxy_translation_map.is_valid() is used to control
295  // validilty and whether pointer exists or is nullptr
296  CHECK(str_proxy_translation_map);
297 
298  int thread_count = cpu_threads();
299  {
300  auto timer_init = DEBUG_TIMER(
301  "CPU One-To-Many Perfect Hash Table Builder: init_hash_join_buff_tbb");
302  init_hash_join_buff_tbb(cpu_hash_table_buff,
303  hash_entry_info.getNormalizedHashEntryCount(),
304  hash_join_invalid_val);
305  }
306  {
307  auto timer_fill = DEBUG_TIMER(
308  "CPU One-To-Many Perfect Hash Table Builder: fill_hash_join_buff_bucketized");
309  if (ti.get_type() == kDATE) {
311  cpu_hash_table_buff,
312  hash_entry_info,
313  hash_join_invalid_val,
314  join_column,
315  {static_cast<size_t>(ti.get_size()),
316  col_range.getIntMin(),
317  col_range.getIntMax(),
319  is_bitwise_eq,
320  col_range.getIntMax() + 1,
322  str_proxy_translation_map->dataPtr(), // will return nullptr if !is_valid()
323  str_proxy_translation_map->domainStart(),
324  thread_count);
325  } else {
327  cpu_hash_table_buff,
328  hash_entry_info,
329  hash_join_invalid_val,
330  join_column,
331  {static_cast<size_t>(ti.get_size()),
332  col_range.getIntMin(),
333  col_range.getIntMax(),
335  is_bitwise_eq,
336  col_range.getIntMax() + 1,
338  str_proxy_translation_map->dataPtr(), // will return nullptr if !is_valid()
339  str_proxy_translation_map->domainStart(),
340  thread_count);
341  }
342  }
343  }
344 
345  std::unique_ptr<PerfectHashTable> getHashTable() { return std::move(hash_table_); }
346 
347  static size_t get_entries_per_shard(const size_t total_entry_count,
348  const size_t shard_count) {
349  CHECK_NE(size_t(0), shard_count);
350  return (total_entry_count + shard_count - 1) / shard_count;
351  }
352 
353  const bool for_semi_anti_join(const JoinType join_type) {
354  return join_type == JoinType::SEMI || join_type == JoinType::ANTI;
355  }
356 
357  private:
358  std::unique_ptr<PerfectHashTable> hash_table_;
359 };
int64_t getIntMin() const
std::vector< int > ChunkKey
Definition: types.h:37
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxyTranslationMap *str_proxy_translation_map, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
JoinType
Definition: sqldefs.h:108
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
Definition: HashJoin.h:77
size_t num_elems
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, HashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:255
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxyTranslationMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
#define CHECK_GT(x, y)
Definition: Logger.h:223
std::string to_string(char const *&&v)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
#define CHECK_NE(x, y)
Definition: Logger.h:220
int64_t bucket_normalization
An AbstractBuffer is a unit of data management for a data manager.
size_t hash_entry_count
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
std::unique_ptr< PerfectHashTable > getHashTable()
Definition: sqltypes.h:53
Data_Namespace::DataMgr * getDataMgr() const
Definition: Execute.h:501
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
char * t
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
std::unique_ptr< PerfectHashTable > hash_table_
int cpu_threads()
Definition: thread_count.h:24
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
void SUFFIX() init_hash_join_buff_tbb(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val)
HashType
Definition: HashTable.h:19
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)