OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OverlapsJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "QueryEngine/Execute.h"
27 
33 
34 std::unique_ptr<HashTableCache<HashTableCacheKey,
38  std::make_unique<HashTableCache<HashTableCacheKey,
41 
43 std::shared_ptr<OverlapsJoinHashTable> OverlapsJoinHashTable::getInstance(
44  const std::shared_ptr<Analyzer::BinOper> condition,
45  const std::vector<InputTableInfo>& query_infos,
46  const Data_Namespace::MemoryLevel memory_level,
47  const int device_count,
48  ColumnCacheMap& column_cache,
49  Executor* executor,
50  const QueryHint& query_hint) {
51  decltype(std::chrono::steady_clock::now()) ts1, ts2;
52  auto inner_outer_pairs = normalize_column_pairs(
53  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
54 
55  const auto getHashTableType =
56  [](const std::shared_ptr<Analyzer::BinOper> condition,
57  const std::vector<InnerOuter>& inner_outer_pairs) -> HashType {
59  if (condition->is_overlaps_oper()) {
60  CHECK_EQ(inner_outer_pairs.size(), size_t(1));
61  if (inner_outer_pairs[0].first->get_type_info().is_array() &&
62  inner_outer_pairs[0].second->get_type_info().is_array()) {
63  layout = HashType::ManyToMany;
64  }
65  }
66  return layout;
67  };
68 
69  auto layout = getHashTableType(condition, inner_outer_pairs);
70 
71  if (VLOGGING(1)) {
72  VLOG(1) << "Building geo hash table " << getHashTypeString(layout)
73  << " for qual: " << condition->toString();
74  ts1 = std::chrono::steady_clock::now();
75  }
76 
77  const auto qi_0 = query_infos[0].info.getNumTuplesUpperBound();
78  const auto qi_1 = query_infos[1].info.getNumTuplesUpperBound();
79 
80  VLOG(1) << "table_id = " << query_infos[0].table_id << " has " << qi_0 << " tuples.";
81  VLOG(1) << "table_id = " << query_infos[1].table_id << " has " << qi_1 << " tuples.";
82 
83  const auto& query_info =
84  get_inner_query_info(HashJoin::getInnerTableId(inner_outer_pairs), query_infos)
85  .info;
86  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
87  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
88  throw TooManyHashEntries();
89  }
90 
91  auto join_hash_table = std::make_shared<OverlapsJoinHashTable>(condition,
92  query_infos,
93  memory_level,
94  column_cache,
95  executor,
96  inner_outer_pairs,
97  device_count);
98  if (query_hint.hint_delivered) {
99  join_hash_table->registerQueryHint(query_hint);
100  }
101  try {
102  join_hash_table->reify(layout);
103  } catch (const HashJoinFail& e) {
104  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
105  "involved in overlaps join | ") +
106  e.what());
107  } catch (const ColumnarConversionNotSupported& e) {
108  throw HashJoinFail(std::string("Could not build hash tables for overlaps join | "
109  "Inner table too big. Attempt manual table reordering "
110  "or create a single fragment inner table. | ") +
111  e.what());
112  } catch (const std::exception& e) {
113  throw HashJoinFail(std::string("Failed to build hash tables for overlaps join | ") +
114  e.what());
115  }
116  if (VLOGGING(1)) {
117  ts2 = std::chrono::steady_clock::now();
118  VLOG(1) << "Built geo hash table " << getHashTypeString(layout) << " in "
119  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
120  << " ms";
121  }
122  return join_hash_table;
123 }
124 
125 namespace {
126 
127 void compute_bucket_sizes(std::vector<double>& bucket_sizes_for_dimension,
128  const double bucket_threshold,
129  const Data_Namespace::MemoryLevel effective_memory_level,
130  const JoinColumn& join_column,
131  const JoinColumnTypeInfo& join_column_type,
132  const std::vector<InnerOuter>& inner_outer_pairs,
133  const Executor* executor) {
134  // No coalesced keys for overlaps joins yet
135  CHECK_EQ(inner_outer_pairs.size(), 1u);
136 
137  const auto col = inner_outer_pairs[0].first;
138  CHECK(col);
139  const auto col_ti = col->get_type_info();
140  CHECK(col_ti.is_array());
141 
142  // TODO: Compute the number of dimensions for this overlaps key
143  const int num_dims = 2;
144  std::vector<double> local_bucket_sizes(num_dims, std::numeric_limits<double>::max());
145 
146  VLOG(1)
147  << "Computing x and y bucket sizes for overlaps hash join with minimum bucket size "
148  << std::to_string(bucket_threshold);
149 
150  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
151  const int thread_count = cpu_threads();
152  compute_bucket_sizes_on_cpu(local_bucket_sizes,
153  join_column,
154  join_column_type,
155  bucket_threshold,
156  thread_count);
157  }
158 #ifdef HAVE_CUDA
159  else {
160  // Note that we compute the bucket sizes using only a single GPU
161  const int device_id = 0;
162  auto& data_mgr = executor->getCatalog()->getDataMgr();
163  CudaAllocator allocator(&data_mgr, device_id);
164  auto device_bucket_sizes_gpu =
165  transfer_vector_of_flat_objects_to_gpu(local_bucket_sizes, allocator);
166  auto join_column_gpu = transfer_flat_object_to_gpu(join_column, allocator);
167  auto join_column_type_gpu = transfer_flat_object_to_gpu(join_column_type, allocator);
168 
170  device_bucket_sizes_gpu, join_column_gpu, join_column_type_gpu, bucket_threshold);
171  allocator.copyFromDevice(reinterpret_cast<int8_t*>(local_bucket_sizes.data()),
172  reinterpret_cast<int8_t*>(device_bucket_sizes_gpu),
173  local_bucket_sizes.size() * sizeof(double));
174  }
175 #endif
176 
177  size_t ctr = 0;
178  for (auto& bucket_sz : local_bucket_sizes) {
179  VLOG(1) << "Computed bucket size for dim[" << ctr++ << "]: " << bucket_sz;
180  bucket_sizes_for_dimension.push_back(1.0 / bucket_sz);
181  }
182 
183  return;
184 }
185 
187  BucketSizeTuner(const double bucket_threshold,
188  const double step,
189  const double min_threshold)
190  : bucket_threshold(bucket_threshold * step)
191  , step(step)
192  , min_threshold(min_threshold) {}
193 
194  bool operator()(const std::vector<double> last_iteration_bucket_sizes) {
195  bucket_threshold /= step;
196  if (!last_iteration_bucket_sizes.empty() &&
197  last_iteration_bucket_sizes == previous_bucket_sizes) {
198  // abort the tuning if the bucket size does not change. note that this will always
199  // run at least two steps, as the first step will have no computed bucket sizes
200  VLOG(1) << "Aborting overlaps tuning as bucket size is no longer changing.";
201  return false;
202  }
203  num_steps++;
204 
205  previous_bucket_sizes = last_iteration_bucket_sizes;
206 
207  return bucket_threshold >= min_threshold;
208  }
209 
210  std::vector<double> computeBucketSizes(
211  const Data_Namespace::MemoryLevel effective_memory_level,
212  std::vector<ColumnsForDevice>& columns_per_device,
213  const std::vector<InnerOuter>& inner_outer_pairs,
214  const size_t device_count,
215  const Executor* executor) {
216  // compute bucket info for the current threshold value
217  CHECK(!columns_per_device.empty());
218  std::vector<double> bucket_sizes_for_dimension;
219  compute_bucket_sizes(bucket_sizes_for_dimension,
220  bucket_threshold,
221  effective_memory_level,
222  columns_per_device.front().join_columns[0],
223  columns_per_device.front().join_column_types[0],
224  inner_outer_pairs,
225  executor);
226  return bucket_sizes_for_dimension;
227  }
228 
230  size_t num_steps{0};
231  const double step;
232  const double min_threshold;
233 
234  std::vector<double> previous_bucket_sizes;
235 };
236 
237 std::ostream& operator<<(std::ostream& os, const BucketSizeTuner& tuner) {
238  os << "Step Num: " << tuner.num_steps << ", Threshold: " << std::fixed
239  << tuner.bucket_threshold << ", Step Size: " << std::fixed << tuner.step
240  << ", Min: " << std::fixed << tuner.min_threshold;
241  return os;
242 }
243 
244 } // namespace
245 
247  auto timer = DEBUG_TIMER(__func__);
249  const auto& query_info =
251  .info;
252  VLOG(1) << "Reify with layout " << getHashTypeString(layout)
253  << "for table_id: " << HashJoin::getInnerTableId(inner_outer_pairs_);
254  if (query_info.fragments.empty()) {
255  return;
256  }
257 
258  const double default_overlaps_bucket_threshold = 0.1;
259  auto overlaps_max_table_size_bytes = g_overlaps_max_table_size_bytes;
260  std::optional<double> overlaps_threshold_override;
261  auto query_hint = getRegisteredQueryHint();
262  if (query_hint.hint_delivered) {
263  if (query_hint.overlaps_bucket_threshold != default_overlaps_bucket_threshold) {
264  VLOG(1) << "Setting overlaps bucket threshold "
265  "\'overlaps_hashjoin_bucket_threshold\' via "
266  "query hint: "
267  << query_hint.overlaps_bucket_threshold;
268  overlaps_threshold_override = query_hint.overlaps_bucket_threshold;
269  }
270  if (query_hint.overlaps_max_size != overlaps_max_table_size_bytes) {
271  std::ostringstream oss;
272  oss << "User requests to change a threshold \'overlaps_max_table_size_bytes\' via "
273  "query hint: "
274  << overlaps_max_table_size_bytes << " -> " << query_hint.overlaps_max_size;
275  if (!overlaps_threshold_override.has_value()) {
276  overlaps_max_table_size_bytes = query_hint.overlaps_max_size;
277  } else {
278  oss << ", but is skipped since the query hint also changes the threshold "
279  "\'overlaps_hashjoin_bucket_threshold\'";
280  }
281  VLOG(1) << oss.str();
282  }
283  }
284 
285  std::vector<ColumnsForDevice> columns_per_device;
286  const auto catalog = executor_->getCatalog();
287  CHECK(catalog);
288  auto& data_mgr = catalog->getDataMgr();
289  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
291  for (int device_id = 0; device_id < device_count_; ++device_id) {
292  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
293  }
294  }
295  const auto shard_count = shardCount();
296  for (int device_id = 0; device_id < device_count_; ++device_id) {
297  const auto fragments =
298  shard_count
299  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
300  : query_info.fragments;
301  const auto columns_for_device =
302  fetchColumnsForDevice(fragments,
303  device_id,
305  ? dev_buff_owners[device_id].get()
306  : nullptr);
307  columns_per_device.push_back(columns_for_device);
308  }
309 
310  // Prepare to calculate the size of the hash table.
311  const auto composite_key_info =
313 
314  auto cache_key_contains_intermediate_table = [](const auto cache_key) {
315  for (auto key : cache_key.chunk_keys) {
316  CHECK_GE(key.size(), size_t(2));
317  if (key[1] < 0) {
318  return true;
319  }
320  }
321  return false;
322  };
323 
324  if (overlaps_threshold_override) {
325  // compute bucket sizes based on the user provided threshold
326  BucketSizeTuner tuner(
327  *overlaps_threshold_override, /*step=*/1.0, /*min_threshold=*/0.0);
328  auto bucket_sizes =
329  tuner.computeBucketSizes(getEffectiveMemoryLevel(inner_outer_pairs_),
330  columns_per_device,
332  device_count_,
333  executor_);
334 
335  auto [entry_count, emitted_keys_count] =
336  computeHashTableCounts(shard_count, bucket_sizes, columns_per_device);
337  setBucketSizeInfo(bucket_sizes, columns_per_device, device_count_);
338  // reifyImpl will check the hash table cache for an appropriate hash table w/ those
339  // bucket sizes (or within tolerances) if a hash table exists use it, otherwise build
340  // one
341  reifyImpl(columns_per_device,
342  query_info,
343  layout,
344  shard_count,
345  entry_count,
346  emitted_keys_count);
347  } else {
348  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
349  composite_key_info.cache_key_chunks,
350  condition_->get_optype()};
351  double overlaps_bucket_threshold = -1;
352  auto cached_bucket_threshold_opt = auto_tuner_cache_->get(cache_key);
353  if (cached_bucket_threshold_opt) {
354  overlaps_bucket_threshold = cached_bucket_threshold_opt->first;
355  VLOG(1) << "Auto tuner using cached overlaps hash table size of: "
356  << overlaps_bucket_threshold;
357  auto bucket_sizes = cached_bucket_threshold_opt->second;
358 
359  OverlapsHashTableCacheKey hash_table_cache_key(cache_key, bucket_sizes);
360  if (auto hash_table_cache_opt =
361  hash_table_cache_->getWithKey(hash_table_cache_key)) {
362  // if we already have a built hash table, we can skip the scans required for
363  // computing bucket size and tuple count
364  auto key = hash_table_cache_opt->first;
365  // reset as the hash table sizes can vary a bit
366  setBucketSizeInfo(key.bucket_sizes, columns_per_device, device_count_);
367  auto hash_table = hash_table_cache_opt->second;
368  CHECK(hash_table);
369 
370  VLOG(1) << "Using cached hash table bucket size";
371 
372  reifyImpl(columns_per_device,
373  query_info,
374  layout,
375  shard_count,
376  hash_table->getEntryCount(),
377  hash_table->getEmittedKeysCount());
378  } else {
379  VLOG(1) << "Computing bucket size for cached bucket threshold";
380  // compute bucket size using our cached tuner value
381  BucketSizeTuner tuner(
382  overlaps_bucket_threshold, /*step=*/1, /*min_threshold=*/0.0);
383 
384  auto bucket_sizes =
385  tuner.computeBucketSizes(getEffectiveMemoryLevel(inner_outer_pairs_),
386  columns_per_device,
388  device_count_,
389  executor_);
390 
391  auto [entry_count, emitted_keys_count] =
392  computeHashTableCounts(shard_count, bucket_sizes, columns_per_device);
393  setBucketSizeInfo(bucket_sizes, columns_per_device, device_count_);
394 
395  reifyImpl(columns_per_device,
396  query_info,
397  layout,
398  shard_count,
399  entry_count,
400  emitted_keys_count);
401  }
402  } else {
403  // compute bucket size using the auto tuner
404 
405  // TODO(jclay): Currently, joining on large poly sets
406  // will lead to lengthy construction times (and large hash tables)
407  // tune this to account for the characteristics of the data being joined.
408  size_t entry_count{0};
409  size_t emitted_keys_count{0};
410  double chosen_overlaps_threshold{-1};
411  double last_keys_per_bin{-1};
412  const size_t max_hash_table_size_for_uncapped_min_keys_per_bin{2097152}; // 2MB
413 
414  BucketSizeTuner tuner(
415  /*initial_threshold=*/1.0, /*step=*/2.0, /*min_threshold=*/1e-7);
416  VLOG(1) << "Running overlaps join size auto tune with parameters: " << tuner;
417 
418  std::vector<double> bucket_sizes;
419  while (tuner(bucket_sizes)) {
420  bucket_sizes =
421  tuner.computeBucketSizes(getEffectiveMemoryLevel(inner_outer_pairs_),
422  columns_per_device,
424  device_count_,
425  executor_);
426  const auto [crt_entry_count, crt_emitted_keys_count] =
427  computeHashTableCounts(shard_count, bucket_sizes, columns_per_device);
428 
429  const size_t hash_table_size = calculateHashTableSize(
430  bucket_sizes.size(), crt_emitted_keys_count, crt_entry_count);
431  const double keys_per_bin = crt_emitted_keys_count / (crt_entry_count / 2.0);
432  VLOG(1) << "Tuner output: " << tuner << " giving entry_count: " << crt_entry_count
433  << ", emitted_keys " << crt_emitted_keys_count << ", hash table size "
434  << hash_table_size << ", keys per bin " << keys_per_bin;
435 
436  // decision points for termination prior to reaching the minimum threshold
437  const bool previous_iteration_valid =
438  last_keys_per_bin > 0 && chosen_overlaps_threshold > 0;
439  const bool hash_table_too_big = hash_table_size > overlaps_max_table_size_bytes;
440  const bool keys_per_bin_increasing = keys_per_bin > last_keys_per_bin;
441  if (previous_iteration_valid && (hash_table_too_big || keys_per_bin_increasing)) {
442  if (hash_table_too_big) {
443  VLOG(1) << "Reached hash table size limit: " << overlaps_max_table_size_bytes
444  << " with " << hash_table_size << " byte hash table, " << keys_per_bin
445  << " keys per bin.";
446  } else if (keys_per_bin_increasing) {
447  VLOG(1) << "Keys per bin increasing from " << last_keys_per_bin << " to "
448  << keys_per_bin;
449  }
450  VLOG(1) << "Using previous threshold value " << chosen_overlaps_threshold;
451  // reset bucket size info, as it will get overwriten with calculate counts above
453  bucket_sizes_for_dimension_, columns_per_device, device_count_);
454  break;
455  }
456  chosen_overlaps_threshold = tuner.bucket_threshold;
457  last_keys_per_bin = keys_per_bin;
458  setBucketSizeInfo(bucket_sizes, columns_per_device, device_count_);
459  entry_count = crt_entry_count;
460  emitted_keys_count = crt_emitted_keys_count;
461  const bool keys_per_bin_under_threshold =
462  hash_table_size > max_hash_table_size_for_uncapped_min_keys_per_bin &&
463  keys_per_bin < g_overlaps_target_entries_per_bin;
464  if (keys_per_bin_under_threshold) {
465  VLOG(1) << "Hash table reached size " << hash_table_size << " over threshold "
466  << max_hash_table_size_for_uncapped_min_keys_per_bin
467  << " with keys per bin " << keys_per_bin << " under threshold "
468  << g_overlaps_target_entries_per_bin
469  << ". Terminating bucket size loop.";
470  break;
471  }
472  }
473  const size_t hash_table_size = calculateHashTableSize(
474  bucket_sizes_for_dimension_.size(), emitted_keys_count, entry_count);
475  const double keys_per_bin = emitted_keys_count / (entry_count / 2.0);
476  VLOG(1) << "Final tuner output: " << tuner << " giving entry_count: " << entry_count
477  << ", emitted_keys_count " << emitted_keys_count << " hash table size "
478  << hash_table_size << ", keys per bin " << keys_per_bin;
480  VLOG(1) << "Final bucket sizes: ";
481  for (size_t dim = 0; dim < bucket_sizes_for_dimension_.size(); dim++) {
482  VLOG(1) << "dim[" << dim << "]: " << 1.0 / bucket_sizes_for_dimension_[dim];
483  }
484  CHECK_GE(chosen_overlaps_threshold, double(0));
485  if (!cache_key_contains_intermediate_table(cache_key)) {
486  auto cache_value =
487  std::make_pair(chosen_overlaps_threshold, bucket_sizes_for_dimension_);
488  auto_tuner_cache_->insert(cache_key, cache_value);
489  }
490 
491  reifyImpl(columns_per_device,
492  query_info,
493  layout,
494  shard_count,
495  entry_count,
496  emitted_keys_count);
497  }
498  }
499 }
500 
501 size_t OverlapsJoinHashTable::calculateHashTableSize(size_t number_of_dimensions,
502  size_t emitted_keys_count,
503  size_t entry_count) const {
504  const auto key_component_width = getKeyComponentWidth();
505  const auto key_component_count = number_of_dimensions;
506  const auto entry_size = key_component_count * key_component_width;
507  const auto keys_for_all_rows = emitted_keys_count;
508  const size_t one_to_many_hash_entries = 2 * entry_count + keys_for_all_rows;
509  const size_t hash_table_size =
510  entry_size * entry_count + one_to_many_hash_entries * sizeof(int32_t);
511  return hash_table_size;
512 }
513 
515  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
516  const int device_id,
517  DeviceAllocator* dev_buff_owner) {
518  const auto& catalog = *executor_->getCatalog();
519  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
520 
521  std::vector<JoinColumn> join_columns;
522  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
523  std::vector<JoinColumnTypeInfo> join_column_types;
524  std::vector<std::shared_ptr<void>> malloc_owner;
525  for (const auto& inner_outer_pair : inner_outer_pairs_) {
526  const auto inner_col = inner_outer_pair.first;
527  const auto inner_cd = get_column_descriptor_maybe(
528  inner_col->get_column_id(), inner_col->get_table_id(), catalog);
529  if (inner_cd && inner_cd->isVirtualCol) {
531  }
532  join_columns.emplace_back(fetchJoinColumn(inner_col,
533  fragments,
534  effective_memory_level,
535  device_id,
536  chunks_owner,
537  dev_buff_owner,
538  malloc_owner,
539  executor_,
540  &column_cache_));
541  const auto& ti = inner_col->get_type_info();
542  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
543  0,
544  0,
545  inline_int_null_value<int64_t>(),
546  false,
547  0,
549  CHECK(ti.is_array()) << "Overlaps join currently only supported for arrays.";
550  }
551  return {join_columns, join_column_types, chunks_owner, {}, malloc_owner};
552 }
553 
555  const size_t shard_count,
556  const std::vector<double>& bucket_sizes_for_dimension,
557  std::vector<ColumnsForDevice>& columns_per_device) {
558  CHECK(!bucket_sizes_for_dimension.empty());
559  const auto [tuple_count, emitted_keys_count] =
560  approximateTupleCount(bucket_sizes_for_dimension, columns_per_device);
561  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
562 
563  return std::make_pair(
564  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_),
565  emitted_keys_count);
566 }
567 
569  const std::vector<double>& bucket_sizes_for_dimension,
570  std::vector<ColumnsForDevice>& columns_per_device) {
571  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
572  CountDistinctDescriptor count_distinct_desc{
574  0,
575  11,
576  true,
577  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
580  1};
581  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
582 
583  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
584  if (columns_per_device.front().join_columns.front().num_elems == 0) {
585  return std::make_pair(0, 0);
586  }
587 
588  // TODO: state management in here should be revisited, but this should be safe enough
589  // for now
590  // re-compute bucket counts per device based on global bucket size
591  for (size_t device_id = 0; device_id < columns_per_device.size(); ++device_id) {
592  auto& columns_for_device = columns_per_device[device_id];
593  columns_for_device.setBucketInfo(bucket_sizes_for_dimension, inner_outer_pairs_);
594  }
595 
596  // Number of keys must match dimension of buckets
597  CHECK_EQ(columns_per_device.front().join_columns.size(),
598  columns_per_device.front().join_buckets.size());
599  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
600  // Note that this path assumes each device has the same hash table (for GPU hash join
601  // w/ hash table built on CPU)
602  const auto composite_key_info =
604  OverlapsHashTableCacheKey cache_key{
605  columns_per_device.front().join_columns.front().num_elems,
606  composite_key_info.cache_key_chunks,
607  condition_->get_optype(),
608  bucket_sizes_for_dimension};
609  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
610  if (cached_count_info) {
611  VLOG(1) << "Using a cached tuple count: " << cached_count_info->first
612  << ", emitted keys count: " << cached_count_info->second;
613  return *cached_count_info;
614  }
615  int thread_count = cpu_threads();
616  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
617  auto hll_result = &hll_buffer_all_cpus[0];
618 
619  std::vector<int32_t> num_keys_for_row;
620  // TODO(adb): support multi-column overlaps join
621  num_keys_for_row.resize(columns_per_device.front().join_columns[0].num_elems);
622 
624  num_keys_for_row,
625  count_distinct_desc.bitmap_sz_bits,
626  padded_size_bytes,
627  columns_per_device.front().join_columns,
628  columns_per_device.front().join_column_types,
629  columns_per_device.front().join_buckets,
630  thread_count);
631  for (int i = 1; i < thread_count; ++i) {
632  hll_unify(hll_result,
633  hll_result + i * padded_size_bytes,
634  1 << count_distinct_desc.bitmap_sz_bits);
635  }
636  return std::make_pair(
637  hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
638  static_cast<size_t>(num_keys_for_row.size() > 0 ? num_keys_for_row.back() : 0));
639  }
640 #ifdef HAVE_CUDA
641  auto& data_mgr = executor_->getCatalog()->getDataMgr();
642  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
643  for (auto& host_hll_buffer : host_hll_buffers) {
644  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
645  }
646  std::vector<size_t> emitted_keys_count_device_threads(device_count_, 0);
647  std::vector<std::future<void>> approximate_distinct_device_threads;
648  for (int device_id = 0; device_id < device_count_; ++device_id) {
649  approximate_distinct_device_threads.emplace_back(std::async(
650  std::launch::async,
651  [device_id,
652  &columns_per_device,
653  &count_distinct_desc,
654  &data_mgr,
655  &host_hll_buffers,
656  &emitted_keys_count_device_threads] {
657  CudaAllocator allocator(&data_mgr, device_id);
658  auto device_hll_buffer =
659  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
660  data_mgr.getCudaMgr()->zeroDeviceMem(
661  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
662  const auto& columns_for_device = columns_per_device[device_id];
663  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
664  columns_for_device.join_columns, allocator);
665 
666  CHECK_GT(columns_for_device.join_buckets.size(), 0u);
667  const auto& bucket_sizes_for_dimension =
668  columns_for_device.join_buckets[0].bucket_sizes_for_dimension;
669  auto bucket_sizes_gpu =
670  allocator.alloc(bucket_sizes_for_dimension.size() * sizeof(double));
671  copy_to_gpu(&data_mgr,
672  reinterpret_cast<CUdeviceptr>(bucket_sizes_gpu),
673  bucket_sizes_for_dimension.data(),
674  bucket_sizes_for_dimension.size() * sizeof(double),
675  device_id);
676  const size_t row_counts_buffer_sz =
677  columns_per_device.front().join_columns[0].num_elems * sizeof(int32_t);
678  auto row_counts_buffer = allocator.alloc(row_counts_buffer_sz);
679  data_mgr.getCudaMgr()->zeroDeviceMem(
680  row_counts_buffer, row_counts_buffer_sz, device_id);
681  const auto key_handler =
682  OverlapsKeyHandler(bucket_sizes_for_dimension.size(),
683  join_columns_gpu,
684  reinterpret_cast<double*>(bucket_sizes_gpu));
685  const auto key_handler_gpu =
686  transfer_flat_object_to_gpu(key_handler, allocator);
688  reinterpret_cast<uint8_t*>(device_hll_buffer),
689  count_distinct_desc.bitmap_sz_bits,
690  reinterpret_cast<int32_t*>(row_counts_buffer),
691  key_handler_gpu,
692  columns_for_device.join_columns[0].num_elems);
693 
694  auto& host_emitted_keys_count = emitted_keys_count_device_threads[device_id];
695  copy_from_gpu(&data_mgr,
696  &host_emitted_keys_count,
697  reinterpret_cast<CUdeviceptr>(
698  row_counts_buffer +
699  (columns_per_device.front().join_columns[0].num_elems - 1) *
700  sizeof(int32_t)),
701  sizeof(int32_t),
702  device_id);
703 
704  auto& host_hll_buffer = host_hll_buffers[device_id];
705  copy_from_gpu(&data_mgr,
706  &host_hll_buffer[0],
707  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
708  count_distinct_desc.bitmapPaddedSizeBytes(),
709  device_id);
710  }));
711  }
712  for (auto& child : approximate_distinct_device_threads) {
713  child.get();
714  }
715  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
716  auto& result_hll_buffer = host_hll_buffers.front();
717  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
718  for (int device_id = 1; device_id < device_count_; ++device_id) {
719  auto& host_hll_buffer = host_hll_buffers[device_id];
720  hll_unify(hll_result,
721  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
722  1 << count_distinct_desc.bitmap_sz_bits);
723  }
724  const size_t emitted_keys_count =
725  std::accumulate(emitted_keys_count_device_threads.begin(),
726  emitted_keys_count_device_threads.end(),
727  0);
728  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits),
729  emitted_keys_count);
730 #else
731  UNREACHABLE();
732  return {0, 0};
733 #endif // HAVE_CUDA
734 }
735 
737  const std::vector<double>& bucket_sizes,
738  std::vector<ColumnsForDevice>& columns_per_device,
739  const size_t device_count) {
740  // set global bucket size
741  bucket_sizes_for_dimension_ = bucket_sizes;
742 
743  // re-compute bucket counts per device based on global bucket size
744  CHECK_EQ(columns_per_device.size(), size_t(device_count));
745  for (size_t device_id = 0; device_id < device_count; ++device_id) {
746  auto& columns_for_device = columns_per_device[device_id];
747  columns_for_device.setBucketInfo(bucket_sizes_for_dimension_, inner_outer_pairs_);
748  }
749 }
750 
752  return 8;
753 }
754 
757  return bucket_sizes_for_dimension_.size();
758 }
759 
760 void OverlapsJoinHashTable::reify(const HashType preferred_layout) {
761  auto timer = DEBUG_TIMER(__func__);
763  const auto composite_key_info =
765 
766  CHECK(condition_->is_overlaps_oper());
767  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
768  HashType layout;
769  if (inner_outer_pairs_[0].second->get_type_info().is_array()) {
770  layout = HashType::ManyToMany;
771  } else {
772  layout = HashType::OneToMany;
773  }
774  try {
775  reifyWithLayout(layout);
776  return;
777  } catch (const std::exception& e) {
778  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
779  << e.what();
780  throw;
781  }
782 }
783 
784 void OverlapsJoinHashTable::reifyImpl(std::vector<ColumnsForDevice>& columns_per_device,
785  const Fragmenter_Namespace::TableInfo& query_info,
786  const HashType layout,
787  const size_t shard_count,
788  const size_t entry_count,
789  const size_t emitted_keys_count) {
790  std::vector<std::future<void>> init_threads;
791  for (int device_id = 0; device_id < device_count_; ++device_id) {
792  const auto fragments =
793  shard_count
794  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
795  : query_info.fragments;
796  init_threads.push_back(std::async(std::launch::async,
798  this,
799  columns_per_device[device_id],
800  layout,
801  entry_count,
802  emitted_keys_count,
803  device_id,
804  logger::thread_id()));
805  }
806  for (auto& init_thread : init_threads) {
807  init_thread.wait();
808  }
809  for (auto& init_thread : init_threads) {
810  init_thread.get();
811  }
812 }
813 
815  const HashType layout,
816  const size_t entry_count,
817  const size_t emitted_keys_count,
818  const int device_id,
819  const logger::ThreadId parent_thread_id) {
820  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
821  CHECK_EQ(getKeyComponentWidth(), size_t(8));
823  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
824 
825  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
826  VLOG(1) << "Building overlaps join hash table on CPU.";
827  auto hash_table = initHashTableOnCpu(columns_for_device.join_columns,
828  columns_for_device.join_column_types,
829  columns_for_device.join_buckets,
830  layout,
831  entry_count,
832  emitted_keys_count);
833  CHECK(hash_table);
834 
835 #ifdef HAVE_CUDA
837  auto gpu_hash_table = copyCpuHashTableToGpu(
838  std::move(hash_table), layout, entry_count, emitted_keys_count, device_id);
839  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
840  hash_tables_for_device_[device_id] = std::move(gpu_hash_table);
841  } else {
842 #else
843  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
844 #endif
845  CHECK_EQ(hash_tables_for_device_.size(), size_t(1));
846  hash_tables_for_device_[0] = std::move(hash_table);
847 #ifdef HAVE_CUDA
848  }
849 #endif
850  } else {
851 #ifdef HAVE_CUDA
852  auto hash_table = initHashTableOnGpu(columns_for_device.join_columns,
853  columns_for_device.join_column_types,
854  columns_for_device.join_buckets,
855  layout,
856  entry_count,
857  emitted_keys_count,
858  device_id);
859  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
860  hash_tables_for_device_[device_id] = std::move(hash_table);
861 #else
862  UNREACHABLE();
863 #endif
864  }
865 }
866 
867 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnCpu(
868  const std::vector<JoinColumn>& join_columns,
869  const std::vector<JoinColumnTypeInfo>& join_column_types,
870  const std::vector<JoinBucketInfo>& join_bucket_info,
871  const HashType layout,
872  const size_t entry_count,
873  const size_t emitted_keys_count) {
874  auto timer = DEBUG_TIMER(__func__);
875  const auto composite_key_info =
877  CHECK(!join_columns.empty());
878  CHECK(!join_bucket_info.empty());
879  OverlapsHashTableCacheKey cache_key{join_columns.front().num_elems,
880  composite_key_info.cache_key_chunks,
881  condition_->get_optype(),
883 
884  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
885  if (auto generic_hash_table = initHashTableOnCpuFromCache(cache_key)) {
886  if (auto hash_table =
887  std::dynamic_pointer_cast<BaselineHashTable>(generic_hash_table)) {
888  VLOG(1) << "Using cached CPU hash table for initialization.";
889  // See if a hash table of a different layout was returned.
890  // If it was OneToMany, we can reuse it on ManyToMany.
891  if (layout == HashType::ManyToMany &&
892  hash_table->getLayout() == HashType::OneToMany) {
893  // use the cached hash table
895  }
896  return hash_table;
897  }
898  }
900  const auto key_component_count = join_bucket_info[0].bucket_sizes_for_dimension.size();
901 
902  const auto key_handler =
903  OverlapsKeyHandler(key_component_count,
904  &join_columns[0],
905  join_bucket_info[0].bucket_sizes_for_dimension.data());
906  const auto catalog = executor_->getCatalog();
907  BaselineJoinHashTableBuilder builder(catalog);
908  const auto err = builder.initHashTableOnCpu(&key_handler,
909  composite_key_info,
910  join_columns,
911  join_column_types,
912  join_bucket_info,
913  entry_count,
914  emitted_keys_count,
915  layout,
918  if (err) {
919  throw HashJoinFail(
920  std::string("Unrecognized error when initializing CPU overlaps hash table (") +
921  std::to_string(err) + std::string(")"));
922  }
923  std::shared_ptr<BaselineHashTable> hash_table = builder.getHashTable();
925  putHashTableOnCpuToCache(cache_key, hash_table);
926  }
927  return hash_table;
928 }
929 
930 #ifdef HAVE_CUDA
931 
932 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::initHashTableOnGpu(
933  const std::vector<JoinColumn>& join_columns,
934  const std::vector<JoinColumnTypeInfo>& join_column_types,
935  const std::vector<JoinBucketInfo>& join_bucket_info,
936  const HashType layout,
937  const size_t entry_count,
938  const size_t emitted_keys_count,
939  const size_t device_id) {
941 
942  VLOG(1) << "Building overlaps join hash table on GPU.";
943 
944  const auto catalog = executor_->getCatalog();
945  CHECK(catalog);
946  BaselineJoinHashTableBuilder builder(catalog);
947 
948  auto& data_mgr = catalog->getDataMgr();
949  CudaAllocator allocator(&data_mgr, device_id);
950  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
951  CHECK_EQ(join_columns.size(), 1u);
952  CHECK(!join_bucket_info.empty());
953  auto& bucket_sizes_for_dimension = join_bucket_info[0].bucket_sizes_for_dimension;
954  auto bucket_sizes_gpu =
955  transfer_vector_of_flat_objects_to_gpu(bucket_sizes_for_dimension, allocator);
956  const auto key_handler = OverlapsKeyHandler(
957  bucket_sizes_for_dimension.size(), join_columns_gpu, bucket_sizes_gpu);
958 
959  const auto err = builder.initHashTableOnGpu(&key_handler,
960  join_columns,
961  layout,
964  entry_count,
965  emitted_keys_count,
966  device_id);
967  if (err) {
968  throw HashJoinFail(
969  std::string("Unrecognized error when initializing GPU overlaps hash table (") +
970  std::to_string(err) + std::string(")"));
971  }
972  return builder.getHashTable();
973 }
974 
975 std::shared_ptr<BaselineHashTable> OverlapsJoinHashTable::copyCpuHashTableToGpu(
976  std::shared_ptr<BaselineHashTable>&& cpu_hash_table,
977  const HashType layout,
978  const size_t entry_count,
979  const size_t emitted_keys_count,
980  const size_t device_id) {
982 
983  auto catalog = executor_->getCatalog();
984  CHECK(catalog);
985  auto& data_mgr = catalog->getDataMgr();
986 
987  // copy hash table to GPU
988  BaselineJoinHashTableBuilder gpu_builder(executor_->catalog_);
989  gpu_builder.allocateDeviceMemory(layout,
992  entry_count,
993  emitted_keys_count,
994  device_id);
995  std::shared_ptr<BaselineHashTable> gpu_hash_table = gpu_builder.getHashTable();
996  CHECK(gpu_hash_table);
997  auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
998  CHECK(gpu_buffer_ptr);
999 
1000  CHECK_LE(cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1001  gpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::GPU));
1002  copy_to_gpu(&data_mgr,
1003  reinterpret_cast<CUdeviceptr>(gpu_buffer_ptr),
1004  cpu_hash_table->getCpuBuffer(),
1005  cpu_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
1006  device_id);
1007  return gpu_hash_table;
1008 }
1009 
1010 #endif // HAVE_CUDA
1011 
1012 #define LL_CONTEXT executor_->cgen_state_->context_
1013 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1014 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1015 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1016 #define ROW_FUNC executor_->cgen_state_->row_func_
1017 
1019  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1020  const auto key_component_width = getKeyComponentWidth();
1021  CHECK(key_component_width == 4 || key_component_width == 8);
1022  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1023  llvm::Value* key_buff_lv{nullptr};
1024  switch (key_component_width) {
1025  case 4:
1026  key_buff_lv =
1027  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1028  break;
1029  case 8:
1030  key_buff_lv =
1031  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1032  break;
1033  default:
1034  CHECK(false);
1035  }
1036 
1037  const auto& inner_outer_pair = inner_outer_pairs_[0];
1038  const auto outer_col = inner_outer_pair.second;
1039  const auto outer_col_ti = outer_col->get_type_info();
1040 
1041  if (outer_col_ti.is_geometry()) {
1042  CodeGenerator code_generator(executor_);
1043  // TODO(adb): for points we will use the coords array, but for other geometries we
1044  // will need to use the bounding box. For now only support points.
1045  CHECK_EQ(outer_col_ti.get_type(), kPOINT);
1046  CHECK_EQ(bucket_sizes_for_dimension_.size(), static_cast<size_t>(2));
1047 
1048  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1049  CHECK_EQ(col_lvs.size(), size_t(1));
1050 
1051  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1052  CHECK(outer_col_var);
1053  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1054  outer_col_var->get_table_id(), outer_col_var->get_column_id() + 1);
1055  CHECK(coords_cd);
1056 
1057  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1058  "array_buff",
1059  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1060  {col_lvs.front(), code_generator.posArg(outer_col)});
1061  CHECK(coords_cd->columnType.get_elem_type().get_type() == kTINYINT)
1062  << "Only TINYINT coordinates columns are supported in geo overlaps hash join.";
1063  const auto arr_ptr =
1064  code_generator.castArrayPointer(array_ptr, coords_cd->columnType.get_elem_type());
1065 
1066  for (size_t i = 0; i < 2; i++) {
1067  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
1068 
1069  // Note that get_bucket_key_for_range_compressed will need to be specialized for
1070  // future compression schemes
1071  auto bucket_key =
1072  outer_col_ti.get_compression() == kENCODING_GEOINT
1073  ? executor_->cgen_state_->emitExternalCall(
1074  "get_bucket_key_for_range_compressed",
1075  get_int_type(64, LL_CONTEXT),
1076  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])})
1077  : executor_->cgen_state_->emitExternalCall(
1078  "get_bucket_key_for_range_double",
1079  get_int_type(64, LL_CONTEXT),
1080  {arr_ptr, LL_INT(i), LL_FP(bucket_sizes_for_dimension_[i])});
1081  const auto col_lv = LL_BUILDER.CreateSExt(
1082  bucket_key, get_int_type(key_component_width * 8, LL_CONTEXT));
1083  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1084  }
1085  } else {
1086  LOG(FATAL) << "Overlaps key currently only supported for geospatial types.";
1087  }
1088  return key_buff_lv;
1089 }
1090 
1091 std::vector<llvm::Value*> OverlapsJoinHashTable::codegenManyKey(
1092  const CompilationOptions& co) {
1093  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1094  const auto key_component_width = getKeyComponentWidth();
1095  CHECK(key_component_width == 4 || key_component_width == 8);
1096  auto hash_table = getHashTableForDevice(size_t(0));
1097  CHECK(hash_table);
1099 
1100  VLOG(1) << "Performing codgen for ManyToMany";
1101  const auto& inner_outer_pair = inner_outer_pairs_[0];
1102  const auto outer_col = inner_outer_pair.second;
1103 
1104  CodeGenerator code_generator(executor_);
1105  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1106  CHECK_EQ(col_lvs.size(), size_t(1));
1107 
1108  const auto outer_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
1109  CHECK(outer_col_var);
1110  const auto coords_cd = executor_->getCatalog()->getMetadataForColumn(
1111  outer_col_var->get_table_id(), outer_col_var->get_column_id());
1112  CHECK(coords_cd);
1113 
1114  const auto array_ptr = executor_->cgen_state_->emitExternalCall(
1115  "array_buff",
1116  llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_),
1117  {col_lvs.front(), code_generator.posArg(outer_col)});
1118 
1119  // TODO(jclay): this seems to cast to double, and causes the GPU build to fail.
1120  // const auto arr_ptr =
1121  // code_generator.castArrayPointer(array_ptr,
1122  // coords_cd->columnType.get_elem_type());
1123  array_ptr->setName("array_ptr");
1124 
1125  auto num_keys_lv =
1126  executor_->cgen_state_->emitExternalCall("get_num_buckets_for_bounds",
1127  get_int_type(32, LL_CONTEXT),
1128  {array_ptr,
1129  LL_INT(0),
1130  LL_FP(bucket_sizes_for_dimension_[0]),
1131  LL_FP(bucket_sizes_for_dimension_[1])});
1132  num_keys_lv->setName("num_keys_lv");
1133 
1134  return {num_keys_lv, array_ptr};
1135 }
1136 
1138  const CompilationOptions& co,
1139  const size_t index) {
1140  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
1141  if (getHashType() == HashType::ManyToMany) {
1142  VLOG(1) << "Building codegenMatchingSet for ManyToMany";
1143  const auto key_component_width = getKeyComponentWidth();
1144  CHECK(key_component_width == 4 || key_component_width == 8);
1145  auto many_to_many_args = codegenManyKey(co);
1146  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1147  const auto composite_dict_ptr_type =
1148  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1149  const auto composite_key_dict =
1150  hash_ptr->getType()->isPointerTy()
1151  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1152  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1153  const auto key_component_count = getKeyComponentCount();
1154 
1155  auto one_to_many_ptr = hash_ptr;
1156 
1157  if (one_to_many_ptr->getType()->isPointerTy()) {
1158  one_to_many_ptr =
1159  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1160  } else {
1161  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1162  }
1163 
1164  const auto composite_key_dict_size = offsetBufferOff();
1165  one_to_many_ptr =
1166  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1167 
1168  // NOTE(jclay): A fixed array of size 200 is allocated on the stack.
1169  // this is likely the maximum value we can do that is safe to use across
1170  // all supported GPU architectures.
1171  const int max_array_size = 200;
1172  const auto arr_type = get_int_array_type(32, max_array_size, LL_CONTEXT);
1173  const auto out_arr_lv = LL_BUILDER.CreateAlloca(arr_type);
1174  out_arr_lv->setName("out_arr");
1175 
1176  const auto casted_out_arr_lv =
1177  LL_BUILDER.CreatePointerCast(out_arr_lv, arr_type->getPointerTo());
1178 
1179  const auto element_ptr = LL_BUILDER.CreateGEP(arr_type, casted_out_arr_lv, LL_INT(0));
1180 
1181  auto rowid_ptr_i32 =
1182  LL_BUILDER.CreatePointerCast(element_ptr, llvm::Type::getInt32PtrTy(LL_CONTEXT));
1183 
1184  const auto candidate_count_lv = executor_->cgen_state_->emitExternalCall(
1185  "get_candidate_rows",
1186  llvm::Type::getInt64Ty(LL_CONTEXT),
1187  {
1188  rowid_ptr_i32,
1189  LL_INT(max_array_size),
1190  many_to_many_args[1],
1191  LL_INT(0),
1194  many_to_many_args[0],
1195  LL_INT(key_component_count), // key_component_count
1196  composite_key_dict, // ptr to hash table
1197  LL_INT(getEntryCount()), // entry_count
1198  LL_INT(composite_key_dict_size), // offset_buffer_ptr_offset
1199  LL_INT(getEntryCount() * sizeof(int32_t)) // sub_buff_size
1200  });
1201 
1202  const auto slot_lv = LL_INT(int64_t(0));
1203 
1204  return {rowid_ptr_i32, candidate_count_lv, slot_lv};
1205  } else {
1206  VLOG(1) << "Building codegenMatchingSet for Baseline";
1207  // TODO: duplicated w/ BaselineJoinHashTable -- push into the hash table builder?
1208  const auto key_component_width = getKeyComponentWidth();
1209  CHECK(key_component_width == 4 || key_component_width == 8);
1210  auto key_buff_lv = codegenKey(co);
1212  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
1213  const auto composite_dict_ptr_type =
1214  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1215  const auto composite_key_dict =
1216  hash_ptr->getType()->isPointerTy()
1217  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1218  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1219  const auto key_component_count = getKeyComponentCount();
1220  const auto key = executor_->cgen_state_->emitExternalCall(
1221  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1222  get_int_type(64, LL_CONTEXT),
1223  {key_buff_lv,
1224  LL_INT(key_component_count),
1225  composite_key_dict,
1226  LL_INT(getEntryCount())});
1227  auto one_to_many_ptr = hash_ptr;
1228  if (one_to_many_ptr->getType()->isPointerTy()) {
1229  one_to_many_ptr =
1230  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1231  } else {
1232  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1233  }
1234  const auto composite_key_dict_size = offsetBufferOff();
1235  one_to_many_ptr =
1236  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1238  std::vector<llvm::Value*>{
1239  one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(getEntryCount() - 1)},
1240  false,
1241  false,
1242  false,
1244  executor_);
1245  }
1246  UNREACHABLE();
1247  return HashJoinMatchingSet{};
1248 }
1249 
1251  const int device_id,
1252  bool raw) const {
1253  auto buffer = getJoinHashBuffer(device_type, device_id);
1254  CHECK_LT(device_id, hash_tables_for_device_.size());
1255  auto hash_table = hash_tables_for_device_[device_id];
1256  CHECK(hash_table);
1257  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1258 #ifdef HAVE_CUDA
1259  std::unique_ptr<int8_t[]> buffer_copy;
1260  if (device_type == ExecutorDeviceType::GPU) {
1261  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1262  CHECK(executor_);
1263  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1264 
1265  copy_from_gpu(&data_mgr,
1266  buffer_copy.get(),
1267  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1268  buffer_size,
1269  device_id);
1270  }
1271  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1272 #else
1273  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1274 #endif // HAVE_CUDA
1275  auto ptr2 = ptr1 + offsetBufferOff();
1276  auto ptr3 = ptr1 + countBufferOff();
1277  auto ptr4 = ptr1 + payloadBufferOff();
1278  CHECK(hash_table);
1279  const auto layout = getHashType();
1280  return HashTable::toString(
1281  "geo",
1282  getHashTypeString(layout),
1283  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1285  hash_table->getEntryCount(),
1286  ptr1,
1287  ptr2,
1288  ptr3,
1289  ptr4,
1290  buffer_size,
1291  raw);
1292 }
1293 
1294 std::set<DecodedJoinHashBufferEntry> OverlapsJoinHashTable::toSet(
1295  const ExecutorDeviceType device_type,
1296  const int device_id) const {
1297  auto buffer = getJoinHashBuffer(device_type, device_id);
1298  auto hash_table = getHashTableForDevice(device_id);
1299  CHECK(hash_table);
1300  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
1301 #ifdef HAVE_CUDA
1302  std::unique_ptr<int8_t[]> buffer_copy;
1303  if (device_type == ExecutorDeviceType::GPU) {
1304  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1305  CHECK(executor_);
1306  auto& data_mgr = executor_->getCatalog()->getDataMgr();
1307  copy_from_gpu(&data_mgr,
1308  buffer_copy.get(),
1309  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
1310  buffer_size,
1311  device_id);
1312  }
1313  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
1314 #else
1315  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
1316 #endif // HAVE_CUDA
1317  auto ptr2 = ptr1 + offsetBufferOff();
1318  auto ptr3 = ptr1 + countBufferOff();
1319  auto ptr4 = ptr1 + payloadBufferOff();
1320  const auto layout = getHashType();
1321  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
1323  hash_table->getEntryCount(),
1324  ptr1,
1325  ptr2,
1326  ptr3,
1327  ptr4,
1328  buffer_size);
1329 }
1330 
1332  const std::vector<InnerOuter>& inner_outer_pairs) const {
1333  // always build on CPU
1335  return memory_level_;
1336  }
1338 }
1339 
1341  try {
1343  } catch (...) {
1344  CHECK(false);
1345  }
1346  return 0;
1347 }
1348 
1350  const OverlapsHashTableCacheKey& key) {
1351  auto timer = DEBUG_TIMER(__func__);
1352  VLOG(1) << "Checking CPU hash table cache.";
1354  auto hash_table_opt = hash_table_cache_->getWithKey(key);
1355  if (hash_table_opt) {
1356  CHECK(bucket_sizes_for_dimension_ == hash_table_opt->first.bucket_sizes);
1357  return hash_table_opt->second;
1358  }
1359  return nullptr;
1360 }
1361 
1362 std::optional<std::pair<size_t, size_t>>
1364  const OverlapsHashTableCacheKey& key) {
1365  for (auto chunk_key : key.chunk_keys) {
1366  CHECK_GE(chunk_key.size(), size_t(2));
1367  if (chunk_key[1] < 0) {
1368  return std::nullopt;
1369  ;
1370  }
1371  }
1372 
1374  auto hash_table_opt = hash_table_cache_->getWithKey(key);
1375  if (hash_table_opt) {
1376  auto hash_table = hash_table_opt->second;
1377  return std::make_pair(hash_table->getEntryCount() / 2,
1378  hash_table->getEmittedKeysCount());
1379  }
1380  return std::nullopt;
1381 }
1382 
1384  const OverlapsHashTableCacheKey& key,
1385  std::shared_ptr<HashTable> hash_table) {
1386  for (auto chunk_key : key.chunk_keys) {
1387  CHECK_GE(chunk_key.size(), size_t(2));
1388  if (chunk_key[1] < 0) {
1389  return;
1390  }
1391  }
1393  hash_table_cache_->insert(key, hash_table);
1394 }
llvm::Value * codegenKey(const CompilationOptions &)
static std::unique_ptr< HashTableCache< HashTableCacheKey, std::pair< BucketThreshold, BucketSizes > > > auto_tuner_cache_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< double > bucket_sizes_for_dimension_
std::shared_ptr< BaselineHashTable > initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const HashType layout, const size_t entry_count, const size_t emitted_keys_count)
int getInnerTableId() const noexceptoverride
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:225
void reifyWithLayout(const HashType layout)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:214
void copyFromDevice(int8_t *host_dst, const int8_t *device_src, const size_t num_bytes) const override
std::vector< double > BucketSizes
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
ExecutorDeviceType
#define LOG(tag)
Definition: Logger.h:188
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:53
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< double > &bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const size_t entry_count, const size_t emitted_keys_count, const int device_id, const logger::ThreadId parent_thread_id)
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:54
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(const OverlapsHashTableCacheKey &key)
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define LL_FP(v)
llvm::Value * posArg(const Analyzer::Expr *) const
Definition: ColumnIR.cpp:513
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:261
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
bool operator()(const std::vector< double > last_iteration_bucket_sizes)
#define CHECK_GT(x, y)
Definition: Logger.h:209
HashType getHashType() const noexceptoverride
std::string to_string(char const *&&v)
size_t calculateHashTableSize(size_t number_of_dimensions, size_t emitted_keys_count, size_t entry_count) const
virtual std::pair< size_t, size_t > computeHashTableCounts(const size_t shard_count, const std::vector< double > &bucket_sizes_for_dimension, std::vector< ColumnsForDevice > &columns_per_device)
const std::shared_ptr< Analyzer::BinOper > condition_
std::optional< std::pair< size_t, size_t > > getApproximateTupleCountFromCache(const OverlapsHashTableCacheKey &)
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:72
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:216
void reify(const HashType preferred_layout)
ColumnCacheMap & column_cache_
void putHashTableOnCpuToCache(const OverlapsHashTableCacheKey &key, std::shared_ptr< HashTable > hash_table)
const std::vector< InputTableInfo > & query_infos_
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
size_t payloadBufferOff() const noexceptoverride
const QueryHint & getRegisteredQueryHint()
DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const override
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
static std::unique_ptr< OverlapsHashTableCache< OverlapsHashTableCacheKey, HashTableCacheValue > > hash_table_cache_
virtual void reifyImpl(std::vector< ColumnsForDevice > &columns_per_device, const Fragmenter_Namespace::TableInfo &query_info, const HashType layout, const size_t shard_count, const size_t entry_count, const size_t emitted_keys_count)
double g_overlaps_target_entries_per_bin
Definition: Execute.cpp:97
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:703
#define LL_BUILDER
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const size_t key_component_width, const size_t key_component_count)
BucketSizeTuner(const double bucket_threshold, const double step, const double min_threshold)
size_t g_overlaps_max_table_size_bytes
Definition: Execute.cpp:96
std::vector< llvm::Value * > codegenManyKey(const CompilationOptions &)
std::shared_ptr< HashTable > HashTableCacheValue
virtual int getInnerTableId() const noexcept=0
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:206
#define VLOGGING(n)
Definition: Logger.h:195
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:318
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::optional< HashType > layout_override_
int8_t * alloc(const size_t num_bytes) override
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
#define LL_INT(v)
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:125
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
size_t getKeyComponentCount() const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
uint64_t ThreadId
Definition: Logger.h:306
size_t offsetBufferOff() const noexceptoverride
void setBucketSizeInfo(const std::vector< double > &bucket_sizes, std::vector< ColumnsForDevice > &columns_per_device, const size_t device_count)
size_t countBufferOff() const noexceptoverride
std::vector< double > computeBucketSizes(const Data_Namespace::MemoryLevel effective_memory_level, std::vector< ColumnsForDevice > &columns_per_device, const std::vector< InnerOuter > &inner_outer_pairs, const size_t device_count, const Executor *executor)
ThreadId thread_id()
Definition: Logger.cpp:732
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define CHECK(condition)
Definition: Logger.h:197
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
const Data_Namespace::MemoryLevel memory_level_
bool overlaps_allow_gpu_build
Definition: QueryHint.h:66
size_t getComponentBufferSize() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
const std::vector< ChunkKey > chunk_keys
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:74
#define LL_CONTEXT
int cpu_threads()
Definition: thread_count.h:24
bool hint_delivered
Definition: QueryHint.h:58
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const double bucket_threshold, const Data_Namespace::MemoryLevel effective_memory_level, const JoinColumn &join_column, const JoinColumnTypeInfo &join_column_type, const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
llvm::ArrayType * get_int_array_type(int const width, int count, llvm::LLVMContext &context)
size_t getKeyComponentWidth() const
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
HashType
Definition: HashTable.h:19
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:71
#define VLOG(n)
Definition: Logger.h:291
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:121