OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "OverlapsJoinHashTable.h"
18 #include "CodeGenerator.h"
19 #include "ExpressionRewrite.h"
20 #include "HashJoinKeyHandlers.h"
21 #include "JoinHashTableGpuUtils.h"
22 #include "JoinHashTableInterface.h"
23 
24 #include "Execute.h"
25 
26 std::map<OverlapsJoinHashTable::HashTableCacheKey, double>
29 
31 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
32  const std::shared_ptr<Analyzer::BinOper> condition,
33  const std::vector<InputTableInfo>& query_infos,
34  const Data_Namespace::MemoryLevel memory_level,
35  const int device_count,
36  ColumnCacheMap& column_cache,
37  Executor* executor) {
38  auto inner_outer_pairs = normalize_column_pairs(
39  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
40  const auto& query_info =
41  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
42  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
43  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
44  throw TooManyHashEntries();
45  }
46  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
48  condition.get(), executor, inner_outer_pairs)
49  : 0;
50  const auto entries_per_device =
51  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
52  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
53  query_infos,
54  memory_level,
55  entries_per_device,
56  column_cache,
57  executor,
58  inner_outer_pairs);
59  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
60  try {
61  join_hash_table->reify(device_count);
62  } catch (const HashJoinFail& e) {
63  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
64  "involved in equijoin | ") +
65  e.what());
66  } catch (const ColumnarConversionNotSupported& e) {
67  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
68  e.what());
69  } catch (const std::exception& e) {
70  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
71  << e.what();
72  }
73  return join_hash_table;
74 }
75 
77 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getSyntheticInstance(
78  std::string_view table1,
79  std::string_view column1,
80  std::string_view table2,
81  std::string_view column2,
82  const Data_Namespace::MemoryLevel memory_level,
83  const int device_count,
84  ColumnCacheMap& column_cache,
85  Executor* executor) {
86  auto catalog = executor->getCatalog();
87  CHECK(catalog);
88 
89  auto tmeta1 = catalog->getMetadataForTable(std::string(table1));
90  auto tmeta2 = catalog->getMetadataForTable(std::string(table2));
91 
92  CHECK(tmeta1);
93  CHECK(tmeta2);
94 
95  auto cmeta1 = catalog->getMetadataForColumn(tmeta1->tableId, std::string(column1));
96  auto cmeta2 = catalog->getMetadataForColumn(tmeta2->tableId, std::string(column2));
97 
98  CHECK(cmeta1);
99  CHECK(cmeta2);
100 
101  auto ti1 = cmeta1->columnType;
102  auto ti2 = cmeta2->columnType;
103 
104  CHECK(ti1.is_geometry());
105  CHECK(ti2.is_geometry());
106 
107  int targetColumnId = 0;
108  switch (ti2.get_type()) {
109  case kLINESTRING: {
110  targetColumnId = cmeta2->columnId + 2;
111  break;
112  }
113  case kPOLYGON: {
114  targetColumnId = cmeta2->columnId + 3;
115  break;
116  }
117  case kMULTIPOLYGON: {
118  targetColumnId = cmeta2->columnId + 4;
119  break;
120  }
121  default:
122  CHECK(false);
123  }
124 
125  auto cmeta3 = catalog->getMetadataForColumn(tmeta2->tableId, targetColumnId);
126  CHECK(cmeta3);
127  auto ti3 = cmeta3->columnType;
128 
129  auto a1 = std::make_shared<Analyzer::ColumnVar>(ti1, tmeta1->tableId, 1, 0);
130  auto a2 =
131  std::make_shared<Analyzer::ColumnVar>(ti3, tmeta2->tableId, cmeta3->columnId, 1);
132 
133  auto op = std::make_shared<Analyzer::BinOper>(kBOOLEAN, kOVERLAPS, kONE, a1, a2);
134 
135  size_t number_of_join_tables{2};
136  std::vector<InputTableInfo> query_infos(number_of_join_tables);
137  query_infos[0].table_id = tmeta1->tableId;
138  query_infos[0].info = tmeta1->fragmenter->getFragmentsForQuery();
139  query_infos[1].table_id = tmeta2->tableId;
140  query_infos[1].info = tmeta2->fragmenter->getFragmentsForQuery();
141 
143  op, query_infos, memory_level, device_count, column_cache, executor);
144 }
145 
147  const int device_count,
148  const JoinHashTableInterface::HashType layout) {
150  layout_ = layout;
151  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
152  if (query_info.fragments.empty()) {
153  return;
154  }
155  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
156  const auto shard_count = shardCount();
157 
158  // Prepare to calculate the size of the hash table.
160  calculateCounts(shard_count,
161  query_info,
162  device_count,
163  columns_per_device); // called only to populate columns_per_device
164  const auto composite_key_info = getCompositeKeyInfo();
165  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
166  composite_key_info.cache_key_chunks,
167  condition_->get_optype()};
168  columns_per_device.clear();
170 
171  // Auto-tuner: Pre-calculate some possible hash table sizes.
172  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
173  auto atc = auto_tuner_cache_.find(cache_key);
174  if (atc != auto_tuner_cache_.end()) {
176  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
178  } else {
179  VLOG(1) << "Auto tuning for the overlaps hash table size:";
180  const double min_threshold{0.00001};
181  const double max_threshold{0.1};
182  double good_threshold{max_threshold};
183  for (double threshold = max_threshold; threshold >= min_threshold;
184  threshold /= 10.0) {
186  size_t entry_count;
187  size_t emitted_keys_count;
188  std::tie(entry_count, emitted_keys_count) =
189  calculateCounts(shard_count, query_info, device_count, columns_per_device);
190  size_t hash_table_size = calculateHashTableSize(
191  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
192  columns_per_device.clear();
194  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
195  << " giving: entry count " << entry_count << " hash table size "
196  << hash_table_size;
197  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
198  good_threshold = overlaps_hashjoin_bucket_threshold_;
199  } else {
200  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
201  break;
202  }
203  }
204  overlaps_hashjoin_bucket_threshold_ = good_threshold;
206  }
207 
208  // Calculate the final size of the hash table.
209  VLOG(1) << "Accepted bin threshold of " << std::fixed
211  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
212  // from entries_per_device.
213  std::tie(entry_count_, emitted_keys_count_) =
214  calculateCounts(shard_count, query_info, device_count, columns_per_device);
215  size_t hash_table_size = calculateHashTableSize(
217  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
218  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
219  << entry_count_ << " hash table size " << hash_table_size;
220 
221  std::vector<std::future<void>> init_threads;
222  for (int device_id = 0; device_id < device_count; ++device_id) {
223  const auto fragments =
224  shard_count
225  ? only_shards_for_device(query_info.fragments, device_id, device_count)
226  : query_info.fragments;
227  init_threads.push_back(std::async(std::launch::async,
229  this,
230  columns_per_device[device_id],
231  layout,
232  device_id));
233  }
234  for (auto& init_thread : init_threads) {
235  init_thread.wait();
236  }
237  for (auto& init_thread : init_threads) {
238  init_thread.get();
239  }
240 }
241 
242 std::pair<size_t, size_t> OverlapsJoinHashTable::calculateCounts(
243  size_t shard_count,
244  const Fragmenter_Namespace::TableInfo& query_info,
245  const int device_count,
246  std::vector<BaselineJoinHashTable::ColumnsForDevice>& columns_per_device) {
247  for (int device_id = 0; device_id < device_count; ++device_id) {
248  const auto fragments =
249  shard_count
250  ? only_shards_for_device(query_info.fragments, device_id, device_count)
251  : query_info.fragments;
252  const auto columns_for_device = fetchColumnsForDevice(fragments, device_id);
253  columns_per_device.push_back(columns_for_device);
254  }
255 
256  size_t tuple_count;
257  size_t emitted_keys_count;
258  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
259  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
260 
261  return std::make_pair(
262  get_entries_per_device(entry_count, shard_count, device_count, memory_level_),
263  emitted_keys_count);
264 }
265 
266 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
267  size_t emitted_keys_count,
268  size_t entry_count) const {
269  const auto key_component_width = getKeyComponentWidth();
270  const auto key_component_count = number_of_dimensions;
271  const auto entry_size = key_component_count * key_component_width;
272  const auto keys_for_all_rows = emitted_keys_count;
273  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
274  const size_t hash_table_size =
275  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
276  return hash_table_size;
277 }
278 
280  const std::deque<Fragmenter_Namespace::FragmentInfo>& fragments,
281  const int device_id) {
282  const auto& catalog = *executor_->getCatalog();
283  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
284 
285  std::vector<JoinColumn> join_columns;
286  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
287  std::vector<JoinColumnTypeInfo> join_column_types;
288  std::vector<JoinBucketInfo> join_bucket_info;
289  for (const auto& inner_outer_pair : inner_outer_pairs_) {
290  const auto inner_col = inner_outer_pair.first;
291  const auto inner_cd = get_column_descriptor_maybe(
292  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
293  if (inner_cd && inner_cd->isVirtualCol) {
295  }
296  const auto join_column_info = fetchColumn(
297  inner_col, effective_memory_level, fragments, chunks_owner, device_id);
298  join_columns.emplace_back(
299  JoinColumn{join_column_info.col_buff, join_column_info.num_elems});
300  const auto& ti = inner_col->get_type_info();
301  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
302  0,
303  inline_int_null_value<int64_t>(),
304  isBitwiseEq(),
305  0,
307  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
308 
309  if (bucket_sizes_for_dimension_.empty()) {
311  bucket_sizes_for_dimension_, join_columns.back(), inner_outer_pairs_);
312  }
313  const auto elem_ti = ti.get_elem_type();
314  CHECK(elem_ti.is_fp());
315  join_bucket_info.emplace_back(
316  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
317  }
318  return {join_columns, join_column_types, chunks_owner, join_bucket_info};
319 }
320 
322  const std::vector<ColumnsForDevice>& columns_per_device) const {
323  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
324  CountDistinctDescriptor count_distinct_desc{
326  0,
327  11,
328  true,
329  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
332  1};
333  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
334 
335  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
336  // Number of keys must match dimension of buckets
337  CHECK_EQ(columns_per_device.front().join_columns.size(),
338  columns_per_device.front().join_buckets.size());
339  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
340  const auto composite_key_info = getCompositeKeyInfo();
341  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
342  composite_key_info.cache_key_chunks,
343  condition_->get_optype(),
345  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
346  if (cached_count_info.first >= 0) {
347  VLOG(1) << "Using a cached tuple count: " << cached_count_info.first
348  << ", emitted keys count: " << cached_count_info.second;
349  return std::make_pair(cached_count_info.first, cached_count_info.second);
350  }
351  int thread_count = cpu_threads();
352  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
353  auto hll_result = &hll_buffer_all_cpus[0];
354 
355  std::vector<int32_t> num_keys_for_row;
356  // TODO(adb): support multi-column overlaps join
357  CHECK_EQ(columns_per_device.size(), 1u);
358  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
359 
361  num_keys_for_row,
362  count_distinct_desc.bitmap_sz_bits,
363  padded_size_bytes,
364  columns_per_device.front().join_columns,
365  columns_per_device.front().join_column_types,
366  columns_per_device.front().join_buckets,
367  thread_count);
368  for (int i = 1; i < thread_count; ++i) {
369  hll_unify(hll_result,
370  hll_result + i * padded_size_bytes,
371  1 << count_distinct_desc.bitmap_sz_bits);
372  }
373  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
374  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
375  }
376 #ifdef HAVE_CUDA
377  const int device_count = columns_per_device.size();
378  auto& data_mgr = executor_->getCatalog()->getDataMgr();
379  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count);
380  for (auto& host_hll_buffer : host_hll_buffers) {
381  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
382  }
383  std::vector<size_t> emitted_keys_count_device_threads(device_count, 0);
384  std::vector<std::future<void>> approximate_distinct_device_threads;
385  for (int device_id = 0; device_id < device_count; ++device_id) {
386  approximate_distinct_device_threads.emplace_back(std::async(
387  std::launch::async,
388  [device_id,
389  &columns_per_device,
390  &count_distinct_desc,
391  &data_mgr,
392  &host_hll_buffers,
393  &emitted_keys_count_device_threads,
394  this] {
395  ThrustAllocator allocator(&data_mgr, device_id);
396  auto device_hll_buffer =
397  allocator.allocateScopedBuffer(count_distinct_desc.bitmapPaddedSizeBytes());
398  data_mgr.getCudaMgr()->zeroDeviceMem(
399  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
400  const auto& columns_for_device = columns_per_device[device_id];
401  auto join_columns_gpu =
402  transfer_pod_vector_to_gpu(columns_for_device.join_columns, allocator);
403 
404  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
405  const auto& bucket_sizes_for_dimension =
406  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
407  auto bucket_sizes_gpu = allocator.allocateScopedBuffer(
408  bucket_sizes_for_dimension.size() * sizeof(double));
409  copy_to_gpu(&data_mgr,
410  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
411  bucket_sizes_for_dimension.data(),
412  bucket_sizes_for_dimension.size() * sizeof(double),
413  device_id);
414  const size_t row_counts_buffer_sz =
415  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
416  auto row_counts_buffer = allocator.allocateScopedBuffer(row_counts_buffer_sz);
417  data_mgr.getCudaMgr()->zeroDeviceMem(
418  row_counts_buffer, row_counts_buffer_sz, device_id);
419  const auto key_handler =
420  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
421  join_columns_gpu,
422  reinterpret_cast<double*>(bucket_sizes_gpu));
423  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
425  reinterpret_cast<uint8_t*>(device_hll_buffer),
426  count_distinct_desc.bitmap_sz_bits,
427  reinterpret_cast<int32_t*>(row_counts_buffer),
428  key_handler_gpu,
429  columns_for_device.join_columns[0].num_elems,
430  executor_->blockSize(),
431  executor_->gridSize());
432 
433  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
434  copy_from_gpu(&data_mgr,
435  &host_emitted_keys_count,
436  reinterpret_cast<CUdeviceptr>(
437  row_counts_buffer +
438  (columns_per_device.front().join_columns[0].num_elems - 1) *
439  sizeof(int32_t)),
440  sizeof(int32_t),
441  device_id);
442 
443  auto& host_hll_buffer = host_hll_buffers[device_id];
444  copy_from_gpu(&data_mgr,
445  &host_hll_buffer[0],
446  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
447  count_distinct_desc.bitmapPaddedSizeBytes(),
448  device_id);
449  }));
450  }
451  for (auto& child : approximate_distinct_device_threads) {
452  child.get();
453  }
454  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
455  auto& result_hll_buffer = host_hll_buffers.front();
456  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
457  for (int device_id = 1; device_id < device_count; ++device_id) {
458  auto& host_hll_buffer = host_hll_buffers[device_id];
459  hll_unify(hll_result,
460  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
461  1 << count_distinct_desc.bitmap_sz_bits);
462  }
463  size_t emitted_keys_count = 0;
464  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
465  emitted_keys_count += emitted_keys_count_device;
466  }
467  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
468  emitted_keys_count);
469 #else
470  UNREACHABLE();
471  return {0, 0};
472 #endif // HAVE_CUDA
473 }
474 
476  return 8;
477 }
478 
480  return bucket_sizes_for_dimension_.size();
481 }
482 
484  const std::vector<JoinColumn>& join_columns,
485  const std::vector<JoinColumnTypeInfo>& join_column_types,
486  const std::vector<JoinBucketInfo>& join_bucket_info,
487  const JoinHashTableInterface::HashType layout) {
488  const auto composite_key_info = getCompositeKeyInfo();
489  CHECK(!join_columns.empty());
490  CHECK(!join_bucket_info.empty());
491  HashTableCacheKey cache_key{join_columns.front().num_elems,
492  composite_key_info.cache_key_chunks,
493  condition_->get_optype(),
495  initHashTableOnCpuFromCache(cache_key);
496  if (cpu_hash_table_buff_) {
497  return 0;
498  }
500  const auto key_component_width = getKeyComponentWidth();
501  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
502  const auto entry_size = key_component_count * key_component_width;
503  const auto keys_for_all_rows = emitted_keys_count_;
504  const size_t one_to_many_hash_entries = 2 * entry_count_ + keys_for_all_rows;
505  const size_t hash_table_size =
506  calculateHashTableSize(join_bucket_info[0].bucket_sizes_for_dimension.size(),
508  entry_count_);
509 
510  VLOG(1) << "Initializing CPU Overlaps Join Hash Table with " << entry_count_
511  << " hash entries and " << one_to_many_hash_entries
512  << " entries in the one to many buffer";
513  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
514 
515  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
516  int thread_count = cpu_threads();
517  std::vector<std::future<void>> init_cpu_buff_threads;
518  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
519  init_cpu_buff_threads.emplace_back(std::async(
520  std::launch::async,
521  [this, key_component_count, key_component_width, thread_idx, thread_count] {
522  switch (key_component_width) {
523  case 4:
525  entry_count_,
526  key_component_count,
527  false,
528  -1,
529  thread_idx,
530  thread_count);
531  break;
532  case 8:
534  entry_count_,
535  key_component_count,
536  false,
537  -1,
538  thread_idx,
539  thread_count);
540  break;
541  default:
542  CHECK(false);
543  }
544  }));
545  }
546  for (auto& child : init_cpu_buff_threads) {
547  child.get();
548  }
549  std::vector<std::future<int>> fill_cpu_buff_threads;
550  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
551  fill_cpu_buff_threads.emplace_back(std::async(
552  std::launch::async,
553  [this,
554  &join_columns,
555  &join_bucket_info,
556  key_component_count,
557  key_component_width,
558  thread_idx,
559  thread_count] {
560  switch (key_component_width) {
561  case 4: {
562  const auto key_handler = OverlapsKeyHandler(
563  key_component_count,
564  &join_columns[0],
565  join_bucket_info[0].bucket_sizes_for_dimension.data());
567  entry_count_,
568  -1,
569  key_component_count,
570  false,
571  &key_handler,
572  join_columns[0].num_elems,
573  thread_idx,
574  thread_count);
575  }
576  case 8: {
577  const auto key_handler = OverlapsKeyHandler(
578  key_component_count,
579  &join_columns[0],
580  join_bucket_info[0].bucket_sizes_for_dimension.data());
582  entry_count_,
583  -1,
584  key_component_count,
585  false,
586  &key_handler,
587  join_columns[0].num_elems,
588  thread_idx,
589  thread_count);
590  }
591  default:
592  CHECK(false);
593  }
594  return -1;
595  }));
596  }
597  int err = 0;
598  for (auto& child : fill_cpu_buff_threads) {
599  int partial_err = child.get();
600  if (partial_err) {
601  err = partial_err;
602  }
603  }
604  if (err) {
605  cpu_hash_table_buff_.reset();
606  return err;
607  }
608  auto one_to_many_buff =
609  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size);
610  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
611  switch (key_component_width) {
612  case 4: {
613  const auto composite_key_dict =
614  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
616  composite_key_dict,
617  entry_count_,
618  -1,
619  key_component_count,
620  join_columns,
621  join_column_types,
622  join_bucket_info,
623  composite_key_info.sd_inner_proxy_per_key,
624  composite_key_info.sd_outer_proxy_per_key,
625  thread_count);
626  break;
627  }
628  case 8: {
629  const auto composite_key_dict =
630  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
632  composite_key_dict,
633  entry_count_,
634  -1,
635  key_component_count,
636  join_columns,
637  join_column_types,
638  join_bucket_info,
639  composite_key_info.sd_inner_proxy_per_key,
640  composite_key_info.sd_outer_proxy_per_key,
641  thread_count);
642  break;
643  }
644  default:
645  CHECK(false);
646  }
647  if (!err && getInnerTableId() > 0) {
648  putHashTableOnCpuToCache(cache_key);
649  }
650  return err;
651 }
652 
654  const std::vector<JoinColumn>& join_columns,
655  const std::vector<JoinColumnTypeInfo>& join_column_types,
656  const std::vector<JoinBucketInfo>& join_bucket_info,
658  const size_t key_component_width,
659  const size_t key_component_count,
660  const int device_id) {
661  int err = 0;
662  // TODO(adb): 4 byte keys
663  CHECK_EQ(key_component_width, size_t(8));
665 #ifdef HAVE_CUDA
666  const auto catalog = executor_->getCatalog();
667  auto& data_mgr = catalog->getDataMgr();
668  ThrustAllocator allocator(&data_mgr, device_id);
669  auto dev_err_buff =
670  reinterpret_cast<CUdeviceptr>(allocator.allocateScopedBuffer(sizeof(int)));
671  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
672  switch (key_component_width) {
673  case 4:
675  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
676  entry_count_,
677  key_component_count,
678  false,
679  -1,
680  executor_->blockSize(),
681  executor_->gridSize());
682  break;
683  case 8:
685  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
686  entry_count_,
687  key_component_count,
688  false,
689  -1,
690  executor_->blockSize(),
691  executor_->gridSize());
692  break;
693  default:
694  CHECK(false);
695  }
696  auto join_columns_gpu = transfer_pod_vector_to_gpu(join_columns, allocator);
697  auto hash_buff =
698  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
699  CHECK_EQ(join_columns.size(), 1u);
700  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
701  auto bucket_sizes_gpu =
702  transfer_pod_vector_to_gpu(bucket_sizes_for_dimension, allocator);
703  const auto key_handler = OverlapsKeyHandler(
704  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
705  const auto key_handler_gpu = transfer_object_to_gpu(key_handler, allocator);
706  switch (key_component_width) {
707  case 8: {
709  hash_buff,
710  entry_count_,
711  -1,
712  key_component_count,
713  false,
714  reinterpret_cast<int*>(dev_err_buff),
715  key_handler_gpu,
716  join_columns.front().num_elems,
717  executor_->blockSize(),
718  executor_->gridSize());
719  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
720  break;
721  }
722  default:
723  UNREACHABLE();
724  }
725  if (err) {
726  return err;
727  }
728  const auto entry_size = key_component_count * key_component_width;
729  auto one_to_many_buff = reinterpret_cast<int32_t*>(
730  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
731  switch (key_component_width) {
732  case 8: {
733  const auto composite_key_dict =
734  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
735  init_hash_join_buff_on_device(one_to_many_buff,
736  entry_count_,
737  -1,
738  executor_->blockSize(),
739  executor_->gridSize());
741  one_to_many_buff,
742  composite_key_dict,
743  entry_count_,
744  -1,
745  key_handler_gpu,
746  join_columns.front().num_elems,
747  executor_->blockSize(),
748  executor_->gridSize());
749  break;
750  }
751  default:
752  UNREACHABLE();
753  }
754 #else
755  UNREACHABLE();
756 #endif
757  return err;
758 }
759 
760 #define LL_CONTEXT executor_->cgen_state_->context_
761 #define LL_BUILDER executor_->cgen_state_->ir_builder_
762 #define LL_INT(v) executor_->cgen_state_->llInt(v)
763 #define LL_FP(v) executor_->cgen_state_->llFp(v)
764 #define ROW_FUNC executor_->cgen_state_->row_func_
765 
767  const auto key_component_width = getKeyComponentWidth();
768  CHECK(key_component_width == 4 || key_component_width == 8);
769  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
770  llvm::Value* key_buff_lv{nullptr};
771  switch (key_component_width) {
772  case 4:
773  key_buff_lv =
774  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
775  break;
776  case 8:
777  key_buff_lv =
778  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
779  break;
780  default:
781  CHECK(false);
782  }
783 
784  const auto& inner_outer_pair = inner_outer_pairs_[0];
785  const auto outer_col = inner_outer_pair.second;
786  const auto outer_col_ti = outer_col->get_type_info();
787 
788  if (outer_col_ti.is_geometry()) {
789  CodeGenerator code_generator(executor_);
790  // TODO(adb): for points we will use the coords array, but for other geometries we
791  // will need to use the bounding box. For now only support points.
792  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
793  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
794 
795  const auto col_lvs = code_generator.codegen(outer_col, true, co);
796  CHECK_EQ(col_lvs.size(), size_t(1));
797 
798  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
799  CHECK(outer_col_var);
800  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
801  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
802  CHECK(coords_cd);
803 
804  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
805  "array_buff",
806  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
807  {col_lvs.front(), code_generator.posArg(outer_col)});
808  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
809  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
810  const auto arr_ptr =
811  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
812 
813  for (size_t i = 0; i < 2; i++) {
814  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
815 
816  // Note that get_bucket_key_for_range_compressed will need to be specialized for
817  // future compression schemes
818  auto bucket_key =
819  outer_col_ti.get_compression() == kENCODING_GEOINT
820  ? executor_->cgen_state_->emitExternalCall(
821  "get_bucket_key_for_range_compressed",
823  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
824  : executor_->cgen_state_->emitExternalCall(
825  "get_bucket_key_for_range_double",
827  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
828  const auto col_lv = LL_BUILDER.CreateSExt(
829  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
830  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
831  }
832  } else {
833  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
834  }
835  return key_buff_lv;
836 }
837 
839  std::vector<double>& bucket_sizes_for_dimension,
840  const JoinColumn& join_column,
841  const std::vector<InnerOuter>& inner_outer_pairs) {
842  // No coalesced keys for overlaps joins yet
843  CHECK_EQ(inner_outer_pairs.size(), 1u);
844 
845  const auto col = inner_outer_pairs[0].first;
846  CHECK(col);
847  const auto col_ti = col->get_type_info();
848  CHECK(col_ti.is_array());
849 
850  // Compute the number of dimensions for this overlaps key
851  int num_dims{-1};
852  if (col_ti.is_fixlen_array()) {
853  num_dims = col_ti.get_size() / col_ti.get_elem_type().get_size();
854  num_dims /= 2;
855  } else {
856  CHECK(col_ti.is_varlen_array());
857  num_dims = 2;
858  // TODO(adb): how can we pick the number of dims in the varlen case? e.g.
859  // backwards compatibility with existing bounds cols or generic range joins
860  }
861  CHECK_GT(num_dims, 0);
862  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
863 
864  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
866 
867  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
868  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
869  const int thread_count = cpu_threads();
870  compute_bucket_sizes(local_bucket_sizes,
871  join_column,
873  thread_count);
874  }
875 #ifdef HAVE_CUDA
876  else {
877  // Note that we compute the bucket sizes using only a single GPU
878  const int device_id = 0;
879  auto& data_mgr = executor_->getCatalog()->getDataMgr();
880  ThrustAllocator allocator(&data_mgr, device_id);
881  auto device_bucket_sizes_gpu =
882  transfer_pod_vector_to_gpu(local_bucket_sizes, allocator);
883  auto join_columns_gpu = transfer_object_to_gpu(join_column, allocator);
884 
885  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
886  join_columns_gpu,
888  executor_->blockSize(),
889  executor_->gridSize());
890  copy_from_gpu(&data_mgr,
891  local_bucket_sizes.data(),
892  reinterpret_cast<CUdeviceptr>(device_bucket_sizes_gpu),
893  local_bucket_sizes.size() * sizeof(double),
894  device_id);
895  }
896 #endif
897 
898  size_t ctr = 0;
899  for (auto& bucket_sz : local_bucket_sizes) {
900  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
901  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
902  }
903 
904  return;
905 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< double > bucket_sizes_for_dimension_
std::deque< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int8_t * col_buff
int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id) override
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
void putHashTableOnCpuToCache(const HashTableCacheKey &)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static std::mutex auto_tuner_cache_mutex_
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define LOG(tag)
Definition: Logger.h:185
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, const int device_count, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:503
#define UNREACHABLE()
Definition: Logger.h:234
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const double bucket_size_threshold, const int thread_count)
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout) override
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
JoinHashTableInterface::HashType layout_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
T * transfer_pod_vector_to_gpu(const std::vector< T > &vec, ThrustAllocator &allocator)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:202
static std::map< HashTableCacheKey, double > auto_tuner_cache_
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::vector< InputTableInfo > & query_infos_
llvm::Value * codegenKey(const CompilationOptions &) override
ColumnsForDevice fetchColumnsForDevice(const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id) override
std::vector< InnerOuter > inner_outer_pairs_
int8_t * allocateScopedBuffer(std::ptrdiff_t num_bytes)
T * transfer_object_to_gpu(const T &object, ThrustAllocator &allocator)
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:31
CompositeKeyInfo getCompositeKeyInfo() const
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:171
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
static std::shared_ptr< OverlapsJoinHashTable > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
#define LL_BUILDER
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:89
void reifyWithLayout(const int device_count, const JoinHashTableInterface::HashType layout) override
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:25
int getInnerTableId() const noexceptoverride
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
Definition: sqldefs.h:69
#define LL_INT(v)
JoinColumn fetchColumn(const Analyzer::ColumnVar *inner_col, const Data_Namespace::MemoryLevel &effective_memory_level, const std::deque< Fragmenter_Namespace::FragmentInfo > &fragments, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, const int device_id)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id)
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
const Data_Namespace::MemoryLevel memory_level_
size_t getKeyComponentCount() const override
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const OverlapsKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
#define LL_CONTEXT
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column_for_key, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
int cpu_threads()
Definition: thread_count.h:25
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const std::vector< InnerOuter > &inner_outer_pairs)
#define VLOG(n)
Definition: Logger.h:280
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getKeyComponentWidth() const override