OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "QueryEngine/Execute.h"
26 
27 std::map<HashTableCacheKey, double> OverlapsJoinHashTable::auto_tuner_cache_;
29 
31 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
32  const std::shared_ptr<Analyzer::BinOper> condition,
33  const std::vector<InputTableInfo>& query_infos,
34  const Data_Namespace::MemoryLevel memory_level,
35  const int device_count,
36  ColumnCacheMap& column_cache,
37  Executor* executor) {
38  decltype(std::chrono::steady_clock::now()) ts1, ts2;
39  auto inner_outer_pairs = normalize_column_pairs(
40  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
41 
42  const auto getHashTableType = [](const std::shared_ptr<Analyzer::BinOper> condition,
43  const std::vector<InnerOuter>& inner_outer_pairs)
46  if (condition->is_overlaps_oper()) {
47  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
48  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
49  inner_outer_pairs[0].second->get_type_info().is_array()) {
51  }
52  }
53  return layout;
54  };
55 
56  auto layout = getHashTableType(condition, inner_outer_pairs);
57 
58  if (VLOGGING(1)) {
59  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
60  << " for qual: " << condition->toString();
61  ts1 = std::chrono::steady_clock::now();
62  }
63 
64  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
65  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
66 
67  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
68  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
69 
70  const auto& query_info =
71  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
72  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
73  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
74  throw TooManyHashEntries();
75  }
76  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
78  condition.get(), executor, inner_outer_pairs)
79  : 0;
80  const auto entries_per_device =
81  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
82  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
83  query_infos,
84  memory_level,
85  layout,
86  entries_per_device,
87  column_cache,
88  executor,
89  inner_outer_pairs,
90  device_count);
91  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
92  try {
93  join_hash_table->reify(layout);
94  } catch (const HashJoinFail& e) {
95  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
96  "involved in equijoin | ") +
97  e.what());
98  } catch (const ColumnarConversionNotSupported& e) {
99  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
100  e.what());
101  } catch (const std::exception& e) {
102  LOG(FATAL) << "Fatal error while attempting to build hash tables for join: "
103  << e.what();
104  }
105  if (VLOGGING(1)) {
106  ts2 = std::chrono::steady_clock::now();
107  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
108  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
109  << " ms";
110  }
111  return join_hash_table;
112 }
113 
115  const JoinHashTableInterface::HashType layout) {
116  auto timer = DEBUG_TIMER(__func__);
118  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
119  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
120  << "for table_id: " << getInnerTableId();
121  if (query_info.fragments.empty()) {
122  return;
123  }
124  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
125  auto& data_mgr = catalog_->getDataMgr();
126  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
128  for (int device_id = 0; device_id < device_count_; ++device_id) {
129  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
130  }
131  }
132  const auto shard_count = shardCount();
133  for (int device_id = 0; device_id < device_count_; ++device_id) {
134  const auto fragments =
135  shard_count
136  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
137  : query_info.fragments;
138  const auto columns_for_device =
139  fetchColumnsForDevice(fragments,
140  device_id,
142  ? dev_buff_owners[device_id].get()
143  : nullptr);
144  columns_per_device.push_back(columns_for_device);
145  }
146 
147  // Prepare to calculate the size of the hash table.
148  const auto composite_key_info = getCompositeKeyInfo();
149  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
150  composite_key_info.cache_key_chunks,
151  condition_->get_optype()};
153 
154  auto cache_key_contains_intermediate_table = [](const auto cache_key) {
155  for (auto key : cache_key.chunk_keys) {
156  CHECK_GE(key.size(), size_t(2));
157  if (key[1] < 0) {
158  return true;
159  }
160  }
161  return false;
162  };
163 
164  // Auto-tuner: Pre-calculate some possible hash table sizes.
165  std::lock_guard<std::mutex> guard(auto_tuner_cache_mutex_);
166  auto atc = auto_tuner_cache_.find(cache_key);
167  if (atc != auto_tuner_cache_.end()) {
169  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
171  } else {
172  VLOG(1) << "Auto tuning for the overlaps hash table size:";
173  // TODO(jclay): Currently, joining on large poly sets
174  // will lead to lengthy construction times (and large hash tables)
175  // tune this to account for the characteristics of the data being joined.
176  const double min_threshold{1e-5};
177  const double max_threshold{1};
178  double good_threshold{max_threshold};
179  for (double threshold = max_threshold; threshold >= min_threshold;
180  threshold /= 10.0) {
182  size_t entry_count;
183  size_t emitted_keys_count;
184  std::tie(entry_count, emitted_keys_count) =
185  calculateCounts(shard_count, query_info, columns_per_device);
186  size_t hash_table_size = calculateHashTableSize(
187  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
189  VLOG(1) << "Calculated bin threshold of " << std::fixed << threshold
190  << " giving: entry count " << entry_count << " hash table size "
191  << hash_table_size;
192  if (hash_table_size <= g_overlaps_max_table_size_bytes) {
193  good_threshold = overlaps_hashjoin_bucket_threshold_;
194  } else {
195  VLOG(1) << "Rejected bin threshold of " << std::fixed << threshold;
196  break;
197  }
198  }
199  overlaps_hashjoin_bucket_threshold_ = good_threshold;
200  if (!cache_key_contains_intermediate_table(cache_key)) {
202  }
203  }
204 
205  // Calculate the final size of the hash table.
206  VLOG(1) << "Accepted bin threshold of " << std::fixed
208  // NOTE: Setting entry_count_ here overrides when entry_count_ was set in getInstance()
209  // from entries_per_device.
210  std::tie(entry_count_, emitted_keys_count_) =
211  calculateCounts(shard_count, query_info, columns_per_device);
212  size_t hash_table_size = calculateHashTableSize(
214  VLOG(1) << "Finalized overlaps hashjoin bucket threshold of " << std::fixed
215  << overlaps_hashjoin_bucket_threshold_ << " giving: entry count "
216  << entry_count_ << " hash table size " << hash_table_size;
217 
218  std::vector<std::future<void>> init_threads;
219  for (int device_id = 0; device_id < device_count_; ++device_id) {
220  const auto fragments =
221  shard_count
222  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
223  : query_info.fragments;
224  init_threads.push_back(std::async(std::launch::async,
226  this,
227  columns_per_device[device_id],
228  layout,
229  device_id,
230  logger::thread_id()));
231  }
232  for (auto& init_thread : init_threads) {
233  init_thread.wait();
234  }
235  for (auto& init_thread : init_threads) {
236  init_thread.get();
237  }
238 }
239 
240 std::pair<size_t, size_t> OverlapsJoinHashTable::calculateCounts(
241  size_t shard_count,
242  const Fragmenter_Namespace::TableInfo& query_info,
243  std::vector<BaselineJoinHashTable::ColumnsForDevice>& columns_per_device) {
244  // re-compute bucket counts per device based on global bucket size
245  CHECK_EQ(columns_per_device.size(), size_t(device_count_));
246  for (int device_id = 0; device_id < device_count_; ++device_id) {
247  auto& columns_for_device = columns_per_device[device_id];
248  columns_for_device.join_buckets = computeBucketInfo(
249  columns_for_device.join_columns, columns_for_device.join_column_types, device_id);
250  }
251  size_t tuple_count;
252  size_t emitted_keys_count;
253  std::tie(tuple_count, emitted_keys_count) = approximateTupleCount(columns_per_device);
254  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
255 
256  return std::make_pair(
257  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
258  emitted_keys_count);
259 }
260 
261 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
262  size_t emitted_keys_count,
263  size_t entry_count) const {
264  const auto key_component_width = getKeyComponentWidth();
265  const auto key_component_count = number_of_dimensions;
266  const auto entry_size = key_component_count * key_component_width;
267  const auto keys_for_all_rows = emitted_keys_count;
268  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
269  const size_t hash_table_size =
270  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
271  return hash_table_size;
272 }
273 
275  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
276  const int device_id,
277  DeviceAllocator* dev_buff_owner) {
278  const auto& catalog = *executor_->getCatalog();
279  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
280 
281  std::vector<JoinColumn> join_columns;
282  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
283  std::vector<JoinColumnTypeInfo> join_column_types;
284  std::vector<std::shared_ptr<void>> malloc_owner;
285  for (const auto& inner_outer_pair : inner_outer_pairs_) {
286  const auto inner_col = inner_outer_pair.first;
287  const auto inner_cd = get_column_descriptor_maybe(
288  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
289  if (inner_cd && inner_cd->isVirtualCol) {
291  }
292  join_columns.emplace_back(fetchJoinColumn(inner_col,
293  fragments,
294  effective_memory_level,
295  device_id,
296  chunks_owner,
297  dev_buff_owner,
298  malloc_owner,
299  executor_,
300  &column_cache_));
301  const auto& ti = inner_col->get_type_info();
302  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
303  0,
304  0,
305  inline_int_null_value<int64_t>(),
306  isBitwiseEq(),
307  0,
309  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
310  }
311  // compute initial bucket info
312  auto join_bucket_info = computeBucketInfo(join_columns, join_column_types, device_id);
313  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
314 }
315 
316 std::vector<JoinBucketInfo> OverlapsJoinHashTable::computeBucketInfo(
317  const std::vector<JoinColumn>& join_columns,
318  const std::vector<JoinColumnTypeInfo>& join_column_types,
319  const int device_id) {
320  std::vector<JoinBucketInfo> join_bucket_info;
321  CHECK_EQ(inner_outer_pairs_.size(), join_columns.size());
322  CHECK_EQ(join_columns.size(), join_column_types.size());
323  for (size_t i = 0; i < join_columns.size(); i++) {
324  const auto& inner_outer_pair = inner_outer_pairs_[i];
325  const auto inner_col = inner_outer_pair.first;
326  const auto& ti = inner_col->get_type_info();
327  const auto elem_ti = ti.get_elem_type();
328  CHECK(elem_ti.is_fp());
329 
330  if (bucket_sizes_for_dimension_.empty()) {
332  join_columns[i],
333  join_column_types[i],
335  }
336  join_bucket_info.emplace_back(
337  JoinBucketInfo{bucket_sizes_for_dimension_, elem_ti.get_type() == kDOUBLE});
338  }
339  return join_bucket_info;
340 }
341 
343  const std::vector<ColumnsForDevice>& columns_per_device) const {
344  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
345  CountDistinctDescriptor count_distinct_desc{
347  0,
348  11,
349  true,
350  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
353  1};
354  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
355 
356  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
357  // Number of keys must match dimension of buckets
358  CHECK_EQ(columns_per_device.front().join_columns.size(),
359  columns_per_device.front().join_buckets.size());
360  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
361  const auto composite_key_info = getCompositeKeyInfo();
362  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
363  composite_key_info.cache_key_chunks,
364  condition_->get_optype(),
366  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
367  if (cached_count_info.first) {
368  VLOG(1) << "Using a cached tuple count: " << *cached_count_info.first
369  << ", emitted keys count: " << cached_count_info.second;
370  return std::make_pair(*cached_count_info.first, cached_count_info.second);
371  }
372  int thread_count = cpu_threads();
373  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
374  auto hll_result = &hll_buffer_all_cpus[0];
375 
376  std::vector<int32_t> num_keys_for_row;
377  // TODO(adb): support multi-column overlaps join
378  CHECK_EQ(columns_per_device.size(), 1u);
379  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
380 
382  num_keys_for_row,
383  count_distinct_desc.bitmap_sz_bits,
384  padded_size_bytes,
385  columns_per_device.front().join_columns,
386  columns_per_device.front().join_column_types,
387  columns_per_device.front().join_buckets,
388  thread_count);
389  for (int i = 1; i < thread_count; ++i) {
390  hll_unify(hll_result,
391  hll_result + i * padded_size_bytes,
392  1 << count_distinct_desc.bitmap_sz_bits);
393  }
394  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
395  num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0);
396  }
397 #ifdef HAVE_CUDA
398  auto& data_mgr = executor_->getCatalog()->getDataMgr();
399  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
400  for (auto& host_hll_buffer : host_hll_buffers) {
401  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
402  }
403  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
404  std::vector<std::future<void>> approximate_distinct_device_threads;
405  for (int device_id = 0; device_id < device_count_; ++device_id) {
406  approximate_distinct_device_threads.emplace_back(std::async(
407  std::launch::async,
408  [device_id,
409  &columns_per_device,
410  &count_distinct_desc,
411  &data_mgr,
412  &host_hll_buffers,
413  &emitted_keys_count_device_threads,
414  this] {
415  CudaAllocator allocator(&data_mgr, device_id);
416  auto device_hll_buffer =
417  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
418  data_mgr.getCudaMgr()->zeroDeviceMem(
419  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
420  const auto& columns_for_device = columns_per_device[device_id];
421  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
422  columns_for_device.join_columns, allocator);
423 
424  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
425  const auto& bucket_sizes_for_dimension =
426  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
427  auto bucket_sizes_gpu =
428  allocator.alloc(bucket_sizes_for_dimension.size() * sizeof(double));
429  copy_to_gpu(&data_mgr,
430  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
431  bucket_sizes_for_dimension.data(),
432  bucket_sizes_for_dimension.size() * sizeof(double),
433  device_id);
434  const size_t row_counts_buffer_sz =
435  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
436  auto row_counts_buffer = allocator.alloc(row_counts_buffer_sz);
437  data_mgr.getCudaMgr()->zeroDeviceMem(
438  row_counts_buffer, row_counts_buffer_sz, device_id);
439  const auto key_handler =
440  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
441  join_columns_gpu,
442  reinterpret_cast<double*>(bucket_sizes_gpu));
443  const auto key_handler_gpu =
444  transfer_flat_object_to_gpu(key_handler, allocator);
446  reinterpret_cast<uint8_t*>(device_hll_buffer),
447  count_distinct_desc.bitmap_sz_bits,
448  reinterpret_cast<int32_t*>(row_counts_buffer),
449  key_handler_gpu,
450  columns_for_device.join_columns[0].num_elems,
451  executor_->blockSize(),
452  executor_->gridSize());
453 
454  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
455  copy_from_gpu(&data_mgr,
456  &host_emitted_keys_count,
457  reinterpret_cast<CUdeviceptr>(
458  row_counts_buffer +
459  (columns_per_device.front().join_columns[0].num_elems - 1) *
460  sizeof(int32_t)),
461  sizeof(int32_t),
462  device_id);
463 
464  auto& host_hll_buffer = host_hll_buffers[device_id];
465  copy_from_gpu(&data_mgr,
466  &host_hll_buffer[0],
467  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
468  count_distinct_desc.bitmapPaddedSizeBytes(),
469  device_id);
470  }));
471  }
472  for (auto& child : approximate_distinct_device_threads) {
473  child.get();
474  }
475  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
476  auto& result_hll_buffer = host_hll_buffers.front();
477  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
478  for (int device_id = 1; device_id < device_count_; ++device_id) {
479  auto& host_hll_buffer = host_hll_buffers[device_id];
480  hll_unify(hll_result,
481  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
482  1 << count_distinct_desc.bitmap_sz_bits);
483  }
484  size_t emitted_keys_count = 0;
485  for (auto& emitted_keys_count_device : emitted_keys_count_device_threads) {
486  emitted_keys_count += emitted_keys_count_device;
487  }
488  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
489  emitted_keys_count);
490 #else
491  UNREACHABLE();
492  return {0, 0};
493 #endif // HAVE_CUDA
494 }
495 
497  return 8;
498 }
499 
501  return bucket_sizes_for_dimension_.size();
502 }
503 
506  const int device_id,
507  const logger::ThreadId parent_thread_id) {
508  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
509  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
510  const auto err = initHashTableForDevice(columns_for_device.join_columns,
511  columns_for_device.join_column_types,
512  columns_for_device.join_buckets,
513  layout,
514  effective_memory_level,
515  device_id);
516  if (err) {
517  switch (err) {
519  throw FailedToFetchColumn();
522  default:
523  throw HashJoinFail(
524  std::string("Unrecognized error when initializing baseline hash table (") +
525  std::to_string(err) + std::string(")"));
526  }
527  }
528 }
529 
531  const std::vector<JoinColumn>& join_columns,
532  const std::vector<JoinColumnTypeInfo>& join_column_types,
533  const std::vector<JoinBucketInfo>& join_buckets,
535  const Data_Namespace::MemoryLevel effective_memory_level,
536  const int device_id) {
537  auto timer = DEBUG_TIMER(__func__);
538  CHECK_EQ(getKeyComponentWidth(), size_t(8));
540 
541  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
542  return initHashTableOnCpu(join_columns, join_column_types, join_buckets, layout);
543  } else {
544  int err = 0;
545  // TODO(adb): 4 byte keys
546 
547 #ifdef HAVE_CUDA
549 
550  auto& data_mgr = catalog_->getDataMgr();
551  CudaAllocator allocator(&data_mgr, device_id);
552  auto join_columns_gpu =
553  transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
554  CHECK_EQ(join_columns.size(), 1u);
555  CHECK(!join_buckets.empty());
556  auto& bucket_sizes_for_dimension = join_buckets[0].bucket_sizes_for_dimension;
557  auto bucket_sizes_gpu =
558  transfer_vector_of_flat_objects_to_gpu(bucket_sizes_for_dimension, allocator);
559  const auto key_handler = OverlapsKeyHandler(
560  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
561 
562  err = builder.initHashTableOnGpu(&key_handler,
563  join_columns,
564  layout,
567  entry_count_,
569  device_id,
570  block_size_,
571  grid_size_);
572  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
573  hash_tables_for_device_[device_id] = builder.getHashTable();
574 #else
575  UNREACHABLE();
576 #endif
577  return err;
578  }
579 }
580 
582  const std::vector<JoinColumn>& join_columns,
583  const std::vector<JoinColumnTypeInfo>& join_column_types,
584  const std::vector<JoinBucketInfo>& join_bucket_info,
585  const JoinHashTableInterface::HashType layout) {
586  auto timer = DEBUG_TIMER(__func__);
587  const auto composite_key_info = getCompositeKeyInfo();
588  CHECK(!join_columns.empty());
589  CHECK(!join_bucket_info.empty());
590  HashTableCacheKey cache_key{join_columns.front().num_elems,
591  composite_key_info.cache_key_chunks,
592  condition_->get_optype(),
594 
595  if (auto hash_table = initHashTableOnCpuFromCache(cache_key)) {
596  // See if a hash table of a different layout was returned.
597  // If it was OneToMany, we can reuse it on ManyToMany.
599  hash_table->getLayout() == JoinHashTableInterface::HashType::OneToMany) {
600  // use the cached hash table
602  CHECK_GT(hash_tables_for_device_.size(), size_t(0));
603  hash_tables_for_device_[0] = hash_table;
604  return 0;
605  }
606  }
608  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
609 
610  const auto key_handler =
611  OverlapsKeyHandler(key_component_count,
612  &join_columns[0],
613  join_bucket_info[0].bucket_sizes_for_dimension.data());
615  const auto err = builder.initHashTableOnCpu(&key_handler,
616  composite_key_info,
617  join_columns,
618  join_column_types,
619  join_bucket_info,
620  entry_count_,
622  layout,
625  CHECK(!hash_tables_for_device_.empty());
626  hash_tables_for_device_[0] = builder.getHashTable();
627 
628  if (!err && getInnerTableId() > 0) {
629  putHashTableOnCpuToCache(cache_key, hash_tables_for_device_[0]);
630  }
631  return err;
632 }
633 
634 #define LL_CONTEXT executor_->cgen_state_->context_
635 #define LL_BUILDER executor_->cgen_state_->ir_builder_
636 #define LL_INT(v) executor_->cgen_state_->llInt(v)
637 #define LL_FP(v) executor_->cgen_state_->llFp(v)
638 #define ROW_FUNC executor_->cgen_state_->row_func_
639 
641  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
642  const auto key_component_width = getKeyComponentWidth();
643  CHECK(key_component_width == 4 || key_component_width == 8);
644  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
645  llvm::Value* key_buff_lv{nullptr};
646  switch (key_component_width) {
647  case 4:
648  key_buff_lv =
649  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
650  break;
651  case 8:
652  key_buff_lv =
653  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
654  break;
655  default:
656  CHECK(false);
657  }
658 
659  const auto& inner_outer_pair = inner_outer_pairs_[0];
660  const auto outer_col = inner_outer_pair.second;
661  const auto outer_col_ti = outer_col->get_type_info();
662 
663  if (outer_col_ti.is_geometry()) {
664  CodeGenerator code_generator(executor_);
665  // TODO(adb): for points we will use the coords array, but for other geometries we
666  // will need to use the bounding box. For now only support points.
667  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
668  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
669 
670  const auto col_lvs = code_generator.codegen(outer_col, true, co);
671  CHECK_EQ(col_lvs.size(), size_t(1));
672 
673  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
674  CHECK(outer_col_var);
675  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
676  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
677  CHECK(coords_cd);
678 
679  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
680  "array_buff",
681  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
682  {col_lvs.front(), code_generator.posArg(outer_col)});
683  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
684  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
685  const auto arr_ptr =
686  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
687 
688  for (size_t i = 0; i < 2; i++) {
689  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
690 
691  // Note that get_bucket_key_for_range_compressed will need to be specialized for
692  // future compression schemes
693  auto bucket_key =
694  outer_col_ti.get_compression() == kENCODING_GEOINT
695  ? executor_->cgen_state_->emitExternalCall(
696  "get_bucket_key_for_range_compressed",
698  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
699  : executor_->cgen_state_->emitExternalCall(
700  "get_bucket_key_for_range_double",
702  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
703  const auto col_lv = LL_BUILDER.CreateSExt(
704  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
705  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
706  }
707  } else {
708  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
709  }
710  return key_buff_lv;
711 }
712 
713 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
714  const CompilationOptions& co) {
715  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
716  const auto key_component_width = getKeyComponentWidth();
717  CHECK(key_component_width == 4 || key_component_width == 8);
718  auto hash_table = getHashTableForDevice(size_t(0));
719  CHECK(hash_table);
721 
722  VLOG(1) << "Performing codgen for ManyToMany";
723  const auto& inner_outer_pair = inner_outer_pairs_[0];
724  const auto outer_col = inner_outer_pair.second;
725 
726  CodeGenerator code_generator(executor_);
727  const auto col_lvs = code_generator.codegen(outer_col, true, co);
728  CHECK_EQ(col_lvs.size(), size_t(1));
729 
730  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
731  CHECK(outer_col_var);
732  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
733  outer_col_var->get_table_id(), outer_col_var->get_column_id());
734  CHECK(coords_cd);
735 
736  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
737  "array_buff",
738  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
739  {col_lvs.front(), code_generator.posArg(outer_col)});
740 
741  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
742  // const auto arr_ptr =
743  // code_generator.castArrayPointer(array_ptr,
744  // coords_cd->columnType.get_elem_type());
745  array_ptr->setName("array_ptr");
746 
747  auto num_keys_lv =
748  executor_->cgen_state_->emitExternalCall("get_num_buckets_for_bounds",
750  {array_ptr,
751  LL_INT(0),
752  LL_FP(bucket_sizes_for_dimension_[0]),
753  LL_FP(bucket_sizes_for_dimension_[1])});
754  num_keys_lv->setName("num_keys_lv");
755 
756  return {num_keys_lv, array_ptr};
757 }
758 
760  const CompilationOptions& co,
761  const size_t index) {
762  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
764  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
765  const auto key_component_width = getKeyComponentWidth();
766  CHECK(key_component_width == 4 || key_component_width == 8);
767  auto many_to_many_args = codegenManyKey(co);
768  auto hash_ptr = JoinHashTable::codegenHashTableLoad(index, executor_);
769  const auto composite_dict_ptr_type =
770  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
771  const auto composite_key_dict =
772  hash_ptr->getType()->isPointerTy()
773  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
774  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
775  const auto key_component_count = getKeyComponentCount();
776 
777  auto one_to_many_ptr = hash_ptr;
778 
779  if (one_to_many_ptr->getType()->isPointerTy()) {
780  one_to_many_ptr =
781  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
782  } else {
783  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
784  }
785 
786  const auto composite_key_dict_size = offsetBufferOff();
787  one_to_many_ptr =
788  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
789 
790  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
791  // this is likely the maximum value we can do that is safe to use across
792  // all supported GPU architectures.
793  const int max_array_size = 200;
794  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
795  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
796  out_arr_lv->setName("out_arr");
797 
798  const auto casted_out_arr_lv =
799  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
800 
801  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
802 
803  auto rowid_ptr_i32 =
804  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
805 
806  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
807  "get_candidate_rows",
808  llvm::Type::getInt64Ty(LL_CONTEXT),
809  {
810  rowid_ptr_i32,
811  LL_INT(max_array_size),
812  many_to_many_args[1],
813  LL_INT(0),
816  many_to_many_args[0],
817  LL_INT(key_component_count), // key_component_count
818  composite_key_dict, // ptr to hash table
819  LL_INT(entry_count_), // entry_count
820  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
821  LL_INT(entry_count_ * sizeof(int32_t)) // sub_buff_size
822  });
823 
824  const auto slot_lv = LL_INT(int64_t(0));
825 
826  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
827  } else {
828  VLOG(1) << "Building codegenMatchingSet for Baseline";
830  }
831  UNREACHABLE();
832  return HashJoinMatchingSet{};
833 }
834 
836  std::vector<double>& bucket_sizes_for_dimension,
837  const JoinColumn& join_column,
838  const JoinColumnTypeInfo& join_column_type,
839  const std::vector<InnerOuter>& inner_outer_pairs) {
840  // No coalesced keys for overlaps joins yet
841  CHECK_EQ(inner_outer_pairs.size(), 1u);
842 
843  const auto col = inner_outer_pairs[0].first;
844  CHECK(col);
845  const auto col_ti = col->get_type_info();
846  CHECK(col_ti.is_array());
847 
848  // TODO: Compute the number of dimensions for this overlaps key
849  const int num_dims = 2;
850  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
851 
852  VLOG(1) << "Computing bucketed hashjoin with minimum bucket size "
854 
855  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs);
856  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
857  const int thread_count = cpu_threads();
858  compute_bucket_sizes(local_bucket_sizes,
859  join_column,
860  join_column_type,
862  thread_count);
863  }
864 #ifdef HAVE_CUDA
865  else {
866  // Note that we compute the bucket sizes using only a single GPU
867  const int device_id = 0;
868  auto& data_mgr = executor_->getCatalog()->getDataMgr();
869  CudaAllocator allocator(&data_mgr, device_id);
870  auto device_bucket_sizes_gpu =
871  transfer_vector_of_flat_objects_to_gpu(local_bucket_sizes, allocator);
872  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
873  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
874 
875  compute_bucket_sizes_on_device(device_bucket_sizes_gpu,
876  join_column_gpu,
877  join_column_type_gpu,
879  executor_->blockSize(),
880  executor_->gridSize());
881  allocator.copyFromDevice(reinterpret_cast<int8_t*>(local_bucket_sizes.data()),
882  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
883  local_bucket_sizes.size() * sizeof(double));
884  }
885 #endif
886 
887  size_t ctr = 0;
888  for (auto& bucket_sz : local_bucket_sizes) {
889  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
890  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
891  }
892 
893  return;
894 }
size_t offsetBufferOff() const noexceptoverride
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void copyFromDevice(int8_t *host_dst, const int8_t *device_src, const size_t num_bytes) const override
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex auto_tuner_cache_mutex_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:211
#define LOG(tag)
Definition: Logger.h:188
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner) override
int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:513
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const size_t block_size_x, const size_t grid_size_x)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
std::optional< JoinHashTableInterface::HashType > layout_override_
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
const std::vector< InputTableInfo > & query_infos_
std::pair< std::optional< size_t >, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
BaselineHashTable * getHashTableForDevice(const size_t device_id) const
std::vector< JoinBucketInfo > computeBucketInfo(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const int device_id)
llvm::Value * codegenKey(const CompilationOptions &) override
std::vector< InnerOuter > inner_outer_pairs_
static const int ERR_FAILED_TO_FETCH_COLUMN
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:183
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const unsigned block_size, const unsigned grid_size)
void computeBucketSizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs)
ColumnCacheMap & column_cache_
JoinHashTableInterface::HashType getHashType() const noexceptoverride
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
static bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) noexcept
static std::map< HashTableCacheKey, double > auto_tuner_cache_
#define LL_BUILDER
const std::vector< JoinColumnTypeInfo > join_column_types
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:93
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
#define AUTOMATIC_IR_METADATA(CGENSTATE)
CompositeKeyInfo getCompositeKeyInfo() const
std::pair< size_t, size_t > calculateCounts(size_t shard_count, const Fragmenter_Namespace::TableInfo &query_info, std::vector< BaselineJoinHashTable::ColumnsForDevice > &columns_per_device)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define VLOGGING(n)
Definition: Logger.h:195
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
virtual void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id) override
#define CHECK_LT(x, y)
Definition: Logger.h:207
int getInnerTableId() const noexceptoverride
int8_t * alloc(const size_t num_bytes) override
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
std::unique_ptr< BaselineHashTable > getHashTable()
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
void reifyWithLayout(const JoinHashTableInterface::HashType layout) override
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
const Catalog_Namespace::Catalog * catalog_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
const std::vector< JoinColumn > join_columns
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
std::vector< JoinBucketInfo > join_buckets
uint64_t ThreadId
Definition: Logger.h:306
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id) override
const Data_Namespace::MemoryLevel memory_level_
size_t getKeyComponentCount() const override
ThreadId thread_id()
Definition: Logger.cpp:731
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const override
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const size_t block_size_x, const size_t grid_size_x)
static std::string getHashTypeString(HashType ht) noexcept
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
#define LL_CONTEXT
int cpu_threads()
Definition: thread_count.h:24
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
std::shared_ptr< BaselineHashTable > initHashTableOnCpuFromCache(const HashTableCacheKey &)
void putHashTableOnCpuToCache(const HashTableCacheKey &, std::shared_ptr< BaselineHashTable > &hash_table)
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< std::shared_ptr< BaselineHashTable > > hash_tables_for_device_
#define VLOG(n)
Definition: Logger.h:291
const std::shared_ptr< Analyzer::BinOper > condition_
size_t getKeyComponentWidth() const override