OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaselineJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <future>
20 
24 #include "QueryEngine/Execute.h"
31 
32 std::unique_ptr<
36 
38 std::shared_ptr<BaselineJoinHashTable> BaselineJoinHashTable::getInstance(
39  const std::shared_ptr<Analyzer::BinOper> condition,
40  const std::vector<InputTableInfo>& query_infos,
41  const Data_Namespace::MemoryLevel memory_level,
42  const HashType preferred_hash_type,
43  const int device_count,
44  ColumnCacheMap& column_cache,
45  Executor* executor) {
46  decltype(std::chrono::steady_clock::now()) ts1, ts2;
47 
48  if (VLOGGING(1)) {
49  VLOG(1) << "Building keyed hash table " << getHashTypeString(preferred_hash_type)
50  << " for qual: " << condition->toString();
51  ts1 = std::chrono::steady_clock::now();
52  }
53  auto inner_outer_pairs = normalize_column_pairs(
54  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
55 
56  const auto& query_info =
57  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
58  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
59  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
60  throw TooManyHashEntries();
61  }
62  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
64  condition.get(), executor, inner_outer_pairs)
65  : 0;
66  const auto entries_per_device =
67  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
68  auto join_hash_table =
69  std::shared_ptr<BaselineJoinHashTable>(new BaselineJoinHashTable(condition,
70  query_infos,
71  memory_level,
72  entries_per_device,
73  column_cache,
74  executor,
75  inner_outer_pairs,
76  device_count));
77  try {
78  join_hash_table->reify(preferred_hash_type);
79  } catch (const TableMustBeReplicated& e) {
80  // Throw a runtime error to abort the query
81  join_hash_table->freeHashBufferMemory();
82  throw std::runtime_error(e.what());
83  } catch (const HashJoinFail& e) {
84  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
85  // possible)
86  join_hash_table->freeHashBufferMemory();
87  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
88  "involved in equijoin | ") +
89  e.what());
90  } catch (const ColumnarConversionNotSupported& e) {
91  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
92  e.what());
93  } catch (const OutOfMemory& e) {
94  throw HashJoinFail(
95  std::string("Ran out of memory while building hash tables for equijoin | ") +
96  e.what());
97  } catch (const std::exception& e) {
98  throw std::runtime_error(
99  std::string("Fatal error while attempting to build hash tables for join: ") +
100  e.what());
101  }
102  if (VLOGGING(1)) {
103  ts2 = std::chrono::steady_clock::now();
104  VLOG(1) << "Built keyed hash table "
105  << getHashTypeString(join_hash_table->getHashType()) << " in "
106  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
107  << " ms";
108  }
109  return join_hash_table;
110 }
111 
113  const std::shared_ptr<Analyzer::BinOper> condition,
114  const std::vector<InputTableInfo>& query_infos,
115  const Data_Namespace::MemoryLevel memory_level,
116  const size_t entry_count,
117  ColumnCacheMap& column_cache,
118  Executor* executor,
119  const std::vector<InnerOuter>& inner_outer_pairs,
120  const int device_count)
121  : condition_(condition)
122  , query_infos_(query_infos)
123  , memory_level_(memory_level)
124  , entry_count_(entry_count)
125  , emitted_keys_count_(0)
126  , executor_(executor)
127  , column_cache_(column_cache)
128  , inner_outer_pairs_(inner_outer_pairs)
129  , catalog_(executor->getCatalog())
130  , device_count_(device_count) {
132  hash_tables_for_device_.resize(std::max(device_count_, 1));
133 }
134 
136  const Analyzer::BinOper* condition,
137  const Executor* executor,
138  const std::vector<InnerOuter>& inner_outer_pairs) {
139  for (const auto& inner_outer_pair : inner_outer_pairs) {
140  const auto pair_shard_count = get_shard_count(inner_outer_pair, executor);
141  if (pair_shard_count) {
142  return pair_shard_count;
143  }
144  }
145  return 0;
146 }
147 
149  const int device_id,
150  bool raw) const {
151  auto buffer = getJoinHashBuffer(device_type, device_id);
152  CHECK_LT(device_id, hash_tables_for_device_.size());
153  auto hash_table = hash_tables_for_device_[device_id];
154  CHECK(hash_table);
155  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
156 #ifdef HAVE_CUDA
157  std::unique_ptr<int8_t[]> buffer_copy;
158  if (device_type == ExecutorDeviceType::GPU) {
159  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
160 
162  buffer_copy.get(),
163  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
164  buffer_size,
165  device_id);
166  }
167  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
168 #else
169  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
170 #endif // HAVE_CUDA
171  auto ptr2 = ptr1 + offsetBufferOff();
172  auto ptr3 = ptr1 + countBufferOff();
173  auto ptr4 = ptr1 + payloadBufferOff();
174  CHECK(hash_table);
175  const auto layout = getHashType();
176  return HashTable::toString(
177  "keyed",
178  getHashTypeString(layout),
179  getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
181  entry_count_,
182  ptr1,
183  ptr2,
184  ptr3,
185  ptr4,
186  buffer_size,
187  raw);
188 }
189 
190 std::set<DecodedJoinHashBufferEntry> BaselineJoinHashTable::toSet(
191  const ExecutorDeviceType device_type,
192  const int device_id) const {
193  auto buffer = getJoinHashBuffer(device_type, device_id);
194  auto hash_table = getHashTableForDevice(device_id);
195  CHECK(hash_table);
196  auto buffer_size = hash_table->getHashTableBufferSize(device_type);
197 #ifdef HAVE_CUDA
198  std::unique_ptr<int8_t[]> buffer_copy;
199  if (device_type == ExecutorDeviceType::GPU) {
200  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
201 
203  buffer_copy.get(),
204  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
205  buffer_size,
206  device_id);
207  }
208  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
209 #else
210  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
211 #endif // HAVE_CUDA
212  auto ptr2 = ptr1 + offsetBufferOff();
213  auto ptr3 = ptr1 + countBufferOff();
214  auto ptr4 = ptr1 + payloadBufferOff();
215  const auto layout = getHashType();
216  return HashTable::toSet(getKeyComponentCount() + (layout == HashType::OneToOne ? 1 : 0),
218  entry_count_,
219  ptr1,
220  ptr2,
221  ptr3,
222  ptr4,
223  buffer_size);
224 }
225 
226 void BaselineJoinHashTable::reify(const HashType preferred_layout) {
227  auto timer = DEBUG_TIMER(__func__);
229  const auto composite_key_info =
231  const auto type_and_found = HashTypeCache::get(composite_key_info.cache_key_chunks);
232  const auto layout = type_and_found.second ? type_and_found.first : preferred_layout;
233 
238  executor_);
239 
240  if (condition_->is_overlaps_oper()) {
241  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
242  HashType layout;
243 
244  if (inner_outer_pairs_[0].second->get_type_info().is_array()) {
245  layout = HashType::ManyToMany;
246  } else {
247  layout = HashType::OneToMany;
248  }
249  try {
250  reifyWithLayout(layout);
251  return;
252  } catch (const std::exception& e) {
253  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
254  << e.what();
255  throw;
256  }
257  }
258 
259  try {
260  reifyWithLayout(layout);
261  } catch (const std::exception& e) {
262  VLOG(1) << "Caught exception while building baseline hash table: " << e.what();
264  HashTypeCache::set(composite_key_info.cache_key_chunks, HashType::OneToMany);
266  }
267 }
268 
270  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
271  if (query_info.fragments.empty()) {
272  return;
273  }
274  auto& data_mgr = catalog_->getDataMgr();
275  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
277  for (int device_id = 0; device_id < device_count_; ++device_id) {
278  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
279  }
280  }
281  std::vector<ColumnsForDevice> columns_per_device;
282  const auto shard_count = shardCount();
283  for (int device_id = 0; device_id < device_count_; ++device_id) {
284  const auto fragments =
285  shard_count
286  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
287  : query_info.fragments;
288  const auto columns_for_device =
289  fetchColumnsForDevice(fragments,
290  device_id,
292  ? dev_buff_owners[device_id].get()
293  : nullptr);
294  columns_per_device.push_back(columns_for_device);
295  }
296  if (layout == HashType::OneToMany) {
297  CHECK(!columns_per_device.front().join_columns.empty());
298  emitted_keys_count_ = columns_per_device.front().join_columns.front().num_elems;
299  size_t tuple_count;
300  std::tie(tuple_count, std::ignore) = approximateTupleCount(columns_per_device);
301  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
302 
303  entry_count_ =
304  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_);
305  }
306  std::vector<std::future<void>> init_threads;
307  for (int device_id = 0; device_id < device_count_; ++device_id) {
308  const auto fragments =
309  shard_count
310  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
311  : query_info.fragments;
312  init_threads.push_back(std::async(std::launch::async,
314  this,
315  columns_per_device[device_id],
316  layout,
317  device_id,
318  logger::thread_id()));
319  }
320  for (auto& init_thread : init_threads) {
321  init_thread.wait();
322  }
323  for (auto& init_thread : init_threads) {
324  init_thread.get();
325  }
326 }
327 
329  const std::vector<ColumnsForDevice>& columns_per_device) const {
330  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
331  CountDistinctDescriptor count_distinct_desc{
333  0,
334  11,
335  true,
336  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
339  1};
340  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
341 
342  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
343 
344  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
345  const auto composite_key_info =
347  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
348  composite_key_info.cache_key_chunks,
349  condition_->get_optype()};
350  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
351  if (cached_count_info.first) {
352  VLOG(1) << "Using a cached tuple count: " << *cached_count_info.first
353  << ", emitted keys count: " << cached_count_info.second;
354  return std::make_pair(*cached_count_info.first, cached_count_info.second);
355  }
356  int thread_count = cpu_threads();
357  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
358  auto hll_result = &hll_buffer_all_cpus[0];
359 
360  approximate_distinct_tuples(hll_result,
361  count_distinct_desc.bitmap_sz_bits,
362  padded_size_bytes,
363  columns_per_device.front().join_columns,
364  columns_per_device.front().join_column_types,
365  thread_count);
366  for (int i = 1; i < thread_count; ++i) {
367  hll_unify(hll_result,
368  hll_result + i * padded_size_bytes,
369  1 << count_distinct_desc.bitmap_sz_bits);
370  }
371  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
372  }
373 #ifdef HAVE_CUDA
374  auto& data_mgr = catalog_->getDataMgr();
375  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
376  for (auto& host_hll_buffer : host_hll_buffers) {
377  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
378  }
379  std::vector<std::future<void>> approximate_distinct_device_threads;
380  for (int device_id = 0; device_id < device_count_; ++device_id) {
381  approximate_distinct_device_threads.emplace_back(std::async(
382  std::launch::async,
383  [device_id,
384  &columns_per_device,
385  &count_distinct_desc,
386  &data_mgr,
387  &host_hll_buffers] {
388  CudaAllocator allocator(&data_mgr, device_id);
389  auto device_hll_buffer =
390  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
391  data_mgr.getCudaMgr()->zeroDeviceMem(
392  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
393  const auto& columns_for_device = columns_per_device[device_id];
394  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
395  columns_for_device.join_columns, allocator);
396  auto join_column_types_gpu = transfer_vector_of_flat_objects_to_gpu(
397  columns_for_device.join_column_types, allocator);
398  const auto key_handler =
399  GenericKeyHandler(columns_for_device.join_columns.size(),
400  true,
401  join_columns_gpu,
402  join_column_types_gpu,
403  nullptr,
404  nullptr);
405  const auto key_handler_gpu =
406  transfer_flat_object_to_gpu(key_handler, allocator);
408  reinterpret_cast<uint8_t*>(device_hll_buffer),
409  count_distinct_desc.bitmap_sz_bits,
410  key_handler_gpu,
411  columns_for_device.join_columns[0].num_elems);
412 
413  auto& host_hll_buffer = host_hll_buffers[device_id];
414  copy_from_gpu(&data_mgr,
415  &host_hll_buffer[0],
416  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
417  count_distinct_desc.bitmapPaddedSizeBytes(),
418  device_id);
419  }));
420  }
421  for (auto& child : approximate_distinct_device_threads) {
422  child.get();
423  }
424  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
425  auto& result_hll_buffer = host_hll_buffers.front();
426  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
427  for (int device_id = 1; device_id < device_count_; ++device_id) {
428  auto& host_hll_buffer = host_hll_buffers[device_id];
429  hll_unify(hll_result,
430  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
431  1 << count_distinct_desc.bitmap_sz_bits);
432  }
433  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
434 #else
435  UNREACHABLE();
436  return {0, 0};
437 #endif // HAVE_CUDA
438 }
439 
441  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
442  const int device_id,
443  DeviceAllocator* dev_buff_owner) {
444  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
445 
446  std::vector<JoinColumn> join_columns;
447  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
448  std::vector<JoinColumnTypeInfo> join_column_types;
449  std::vector<JoinBucketInfo> join_bucket_info;
450  std::vector<std::shared_ptr<void>> malloc_owner;
451  for (const auto& inner_outer_pair : inner_outer_pairs_) {
452  const auto inner_col = inner_outer_pair.first;
453  const auto inner_cd = get_column_descriptor_maybe(
454  inner_col->get_column_id(), inner_col->get_table_id(), *catalog_);
455  if (inner_cd && inner_cd->isVirtualCol) {
457  }
458  join_columns.emplace_back(fetchJoinColumn(inner_col,
459  fragments,
460  effective_memory_level,
461  device_id,
462  chunks_owner,
463  dev_buff_owner,
464  malloc_owner,
465  executor_,
466  &column_cache_));
467  const auto& ti = inner_col->get_type_info();
468  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
469  0,
470  0,
472  isBitwiseEq(),
473  0,
475  }
476  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
477 }
478 
480  const HashType layout,
481  const int device_id,
482  const logger::ThreadId parent_thread_id) {
483  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
484  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
485  const auto err = initHashTableForDevice(columns_for_device.join_columns,
486  columns_for_device.join_column_types,
487  columns_for_device.join_buckets,
488  layout,
489  effective_memory_level,
490  device_id);
491  if (err) {
492  throw HashJoinFail(
493  std::string("Unrecognized error when initializing baseline hash table (") +
494  std::to_string(err) + std::string(")"));
495  }
496 }
497 
500  return 0;
501  }
504 }
505 
507  for (const auto& inner_outer_pair : inner_outer_pairs_) {
508  const auto inner_col = inner_outer_pair.first;
509  const auto& inner_col_ti = inner_col->get_type_info();
510  if (inner_col_ti.get_logical_size() > 4) {
511  CHECK_EQ(8, inner_col_ti.get_logical_size());
512  return 8;
513  }
514  }
515  return 4;
516 }
517 
519  return inner_outer_pairs_.size();
520 }
521 
523  const std::vector<InnerOuter>& inner_outer_pairs) const {
524  for (const auto& inner_outer_pair : inner_outer_pairs) {
526  inner_outer_pair.first, inner_outer_pair.second, executor_)) {
528  }
529  }
530  return memory_level_;
531 }
532 
534  const std::vector<JoinColumn>& join_columns,
535  const std::vector<JoinColumnTypeInfo>& join_column_types,
536  const std::vector<JoinBucketInfo>& join_bucket_info,
537  const HashType layout,
538  const Data_Namespace::MemoryLevel effective_memory_level,
539  const int device_id) {
540  auto timer = DEBUG_TIMER(__func__);
541  const auto key_component_count = getKeyComponentCount();
542  int err = 0;
543 
544  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
545  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
546 
547  const auto composite_key_info =
549 
550  CHECK(!join_columns.empty());
551  HashTableCacheKey cache_key{join_columns.front().num_elems,
552  composite_key_info.cache_key_chunks,
553  condition_->get_optype()};
554 
556  CHECK_EQ(device_id, size_t(0));
557  }
558  CHECK_LT(static_cast<size_t>(device_id), hash_tables_for_device_.size());
559 
560  auto hash_table = initHashTableOnCpuFromCache(cache_key);
561  if (hash_table) {
562  hash_tables_for_device_[device_id] = hash_table;
563  } else {
565 
566  const auto key_handler =
567  GenericKeyHandler(key_component_count,
568  true,
569  &join_columns[0],
570  &join_column_types[0],
571  &composite_key_info.sd_inner_proxy_per_key[0],
572  &composite_key_info.sd_outer_proxy_per_key[0]);
573  err = builder.initHashTableOnCpu(&key_handler,
574  composite_key_info,
575  join_columns,
576  join_column_types,
577  join_bucket_info,
578  entry_count_,
579  join_columns.front().num_elems,
580  layout,
583  hash_tables_for_device_[device_id] = builder.getHashTable();
584 
585  if (!err) {
586  if (getInnerTableId() > 0) {
587  putHashTableOnCpuToCache(cache_key, hash_tables_for_device_[device_id]);
588  }
589  }
590  }
591  // Transfer the hash table on the GPU if we've only built it on CPU
592  // but the query runs on GPU (join on dictionary encoded columns).
593  // Don't transfer the buffer if there was an error since we'll bail anyway.
594  if (memory_level_ == Data_Namespace::GPU_LEVEL && !err) {
595 #ifdef HAVE_CUDA
597 
598  builder.allocateDeviceMemory(layout,
601  entry_count_,
603  device_id);
604 
605  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
606  auto cpu_source_hash_table = hash_tables_for_device_[device_id];
607  CHECK(cpu_source_hash_table);
608  auto gpu_target_hash_table = builder.getHashTable();
609  CHECK(gpu_target_hash_table);
610 
611  const auto gpu_buff = gpu_target_hash_table->getGpuBuffer();
612  CHECK(gpu_buff);
613  auto& data_mgr = catalog_->getDataMgr();
614  copy_to_gpu(&data_mgr,
615  reinterpret_cast<CUdeviceptr>(gpu_buff),
616  cpu_source_hash_table->getCpuBuffer(),
617  cpu_source_hash_table->getHashTableBufferSize(ExecutorDeviceType::CPU),
618  device_id);
619  hash_tables_for_device_[device_id] = std::move(gpu_target_hash_table);
620 #else
621  CHECK(false);
622 #endif
623  }
624  } else {
625 #ifdef HAVE_CUDA
627 
628  auto& data_mgr = catalog_->getDataMgr();
629  CudaAllocator allocator(&data_mgr, device_id);
630  auto join_column_types_gpu =
631  transfer_vector_of_flat_objects_to_gpu(join_column_types, allocator);
632  auto join_columns_gpu =
633  transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
634  const auto key_handler = GenericKeyHandler(key_component_count,
635  true,
636  join_columns_gpu,
637  join_column_types_gpu,
638  nullptr,
639  nullptr);
640 
641  err = builder.initHashTableOnGpu(&key_handler,
642  join_columns,
643  layout,
646  entry_count_,
648  device_id);
649  CHECK_LT(size_t(device_id), hash_tables_for_device_.size());
650  hash_tables_for_device_[device_id] = builder.getHashTable();
651 #else
652  UNREACHABLE();
653 #endif
654  }
655  return err;
656 }
657 
658 #define LL_CONTEXT executor_->cgen_state_->context_
659 #define LL_BUILDER executor_->cgen_state_->ir_builder_
660 #define LL_INT(v) executor_->cgen_state_->llInt(v)
661 #define LL_FP(v) executor_->cgen_state_->llFp(v)
662 #define ROW_FUNC executor_->cgen_state_->row_func_
663 
665  const size_t index) {
666  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
668  const auto key_component_width = getKeyComponentWidth();
669  CHECK(key_component_width == 4 || key_component_width == 8);
670  auto key_buff_lv = codegenKey(co);
671  const auto hash_ptr = hashPtr(index);
672  const auto key_ptr_lv =
673  LL_BUILDER.CreatePointerCast(key_buff_lv, llvm::Type::getInt8PtrTy(LL_CONTEXT));
674  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
675  return executor_->cgen_state_->emitExternalCall(
676  "baseline_hash_join_idx_" + std::to_string(key_component_width * 8),
678  {hash_ptr, key_ptr_lv, key_size_lv, LL_INT(entry_count_)});
679 }
680 
682  const CompilationOptions& co,
683  const size_t index) {
684  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
685  const auto key_component_width = getKeyComponentWidth();
686  CHECK(key_component_width == 4 || key_component_width == 8);
687  auto key_buff_lv = codegenKey(co);
689  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
690  const auto composite_dict_ptr_type =
691  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
692  const auto composite_key_dict =
693  hash_ptr->getType()->isPointerTy()
694  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
695  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
696  const auto key_component_count = getKeyComponentCount();
697  const auto key = executor_->cgen_state_->emitExternalCall(
698  "get_composite_key_index_" + std::to_string(key_component_width * 8),
700  {key_buff_lv,
701  LL_INT(key_component_count),
702  composite_key_dict,
703  LL_INT(entry_count_)});
704  auto one_to_many_ptr = hash_ptr;
705  if (one_to_many_ptr->getType()->isPointerTy()) {
706  one_to_many_ptr =
707  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
708  } else {
709  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
710  }
711  const auto composite_key_dict_size = offsetBufferOff();
712  one_to_many_ptr =
713  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
715  {one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(entry_count_ - 1)},
716  false,
717  false,
718  false,
720  executor_);
721 }
722 
723 size_t BaselineJoinHashTable::offsetBufferOff() const noexcept {
724  return getKeyBufferSize();
725 }
726 
727 size_t BaselineJoinHashTable::countBufferOff() const noexcept {
730  } else {
731  return getKeyBufferSize();
732  }
733 }
734 
738  } else {
739  return getKeyBufferSize();
740  }
741 }
742 
744  const auto key_component_width = getKeyComponentWidth();
745  CHECK(key_component_width == 4 || key_component_width == 8);
746  const auto key_component_count = getKeyComponentCount();
748  return entry_count_ * key_component_count * key_component_width;
749  } else {
750  return entry_count_ * (key_component_count + 1) * key_component_width;
751  }
752 }
753 
755  return entry_count_ * sizeof(int32_t);
756 }
757 
759  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
760  const auto key_component_width = getKeyComponentWidth();
761  CHECK(key_component_width == 4 || key_component_width == 8);
762  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
763  llvm::Value* key_buff_lv{nullptr};
764  switch (key_component_width) {
765  case 4:
766  key_buff_lv =
767  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
768  break;
769  case 8:
770  key_buff_lv =
771  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
772  break;
773  default:
774  CHECK(false);
775  }
776 
777  CodeGenerator code_generator(executor_);
778  for (size_t i = 0; i < getKeyComponentCount(); ++i) {
779  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
780  const auto& inner_outer_pair = inner_outer_pairs_[i];
781  const auto outer_col = inner_outer_pair.second;
782  const auto key_col_var = dynamic_cast<const Analyzer::ColumnVar*>(outer_col);
783  const auto val_col_var =
784  dynamic_cast<const Analyzer::ColumnVar*>(inner_outer_pair.first);
785  if (key_col_var && val_col_var &&
787  key_col_var,
788  val_col_var,
789  get_max_rte_scan_table(executor_->cgen_state_->scan_idx_to_hash_pos_))) {
790  throw std::runtime_error(
791  "Query execution fails because the query contains not supported self-join "
792  "pattern. We suspect the query requires multiple left-deep join tree due to "
793  "the join condition of the self-join and is not supported for now. Please "
794  "consider rewriting table order in "
795  "FROM clause.");
796  }
797  const auto col_lvs = code_generator.codegen(outer_col, true, co);
798  CHECK_EQ(size_t(1), col_lvs.size());
799  const auto col_lv = LL_BUILDER.CreateSExt(
800  col_lvs.front(), get_int_type(key_component_width * 8, LL_CONTEXT));
801  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
802  }
803  return key_buff_lv;
804 }
805 
806 llvm::Value* BaselineJoinHashTable::hashPtr(const size_t index) {
807  AUTOMATIC_IR_METADATA(executor_->cgen_state_.get());
808  auto hash_ptr = HashJoin::codegenHashTableLoad(index, executor_);
809  const auto pi8_type = llvm::Type::getInt8PtrTy(LL_CONTEXT);
810  return hash_ptr->getType()->isPointerTy()
811  ? LL_BUILDER.CreatePointerCast(hash_ptr, pi8_type)
812  : LL_BUILDER.CreateIntToPtr(hash_ptr, pi8_type);
813 }
814 
815 #undef ROW_FUNC
816 #undef LL_INT
817 #undef LL_BUILDER
818 #undef LL_CONTEXT
819 
821  try {
823  } catch (...) {
824  CHECK(false);
825  }
826  return 0;
827 }
828 
830  CHECK(!inner_outer_pairs_.empty());
831  const auto first_inner_col = inner_outer_pairs_.front().first;
832  return first_inner_col->get_rte_idx();
833 }
834 
836  auto hash_table = getHashTableForDevice(size_t(0));
837  CHECK(hash_table);
838  if (layout_override_) {
839  return *layout_override_;
840  } else {
841  return hash_table->getLayout();
842  }
843 }
844 
846  const std::vector<InnerOuter>& inner_outer_pairs) {
847  CHECK(!inner_outer_pairs.empty());
848  const auto first_inner_col = inner_outer_pairs.front().first;
849  return first_inner_col->get_table_id();
850 }
851 
853  const HashTableCacheKey& key) {
854  auto timer = DEBUG_TIMER(__func__);
855  VLOG(1) << "Checking CPU hash table cache.";
857  return hash_table_cache_->get(key);
858 }
859 
861  const HashTableCacheKey& key,
862  std::shared_ptr<HashTable>& hash_table) {
863  for (auto chunk_key : key.chunk_keys) {
864  CHECK_GE(chunk_key.size(), size_t(2));
865  if (chunk_key[1] < 0) {
866  return;
867  }
868  }
870  hash_table_cache_->insert(key, hash_table);
871 }
872 
873 std::pair<std::optional<size_t>, size_t>
875  const HashTableCacheKey& key) const {
876  for (auto chunk_key : key.chunk_keys) {
877  CHECK_GE(chunk_key.size(), size_t(2));
878  if (chunk_key[1] < 0) {
879  return std::make_pair(std::nullopt, 0);
880  ;
881  }
882  }
883 
885  auto hash_table = hash_table_cache_->get(key);
886  if (hash_table) {
887  return std::make_pair(hash_table->getEntryCount() / 2,
888  hash_table->getEmittedKeysCount());
889  }
890  return std::make_pair(std::nullopt, 0);
891 }
892 
894  return condition_->get_optype() == kBW_EQ;
895 }
896 
897 std::map<std::vector<ChunkKey>, HashType> HashTypeCache::hash_type_cache_;
899 
900 void HashTypeCache::set(const std::vector<ChunkKey>& key, const HashType hash_type) {
901  for (auto chunk_key : key) {
902  CHECK_GE(chunk_key.size(), size_t(2));
903  if (chunk_key[1] < 0) {
904  return;
905  }
906  }
907  std::lock_guard<std::mutex> hash_type_cache_lock(hash_type_cache_mutex_);
908  hash_type_cache_[key] = hash_type;
909 }
910 
911 std::pair<HashType, bool> HashTypeCache::get(const std::vector<ChunkKey>& key) {
912  std::lock_guard<std::mutex> hash_type_cache_lock(hash_type_cache_mutex_);
913  const auto it = hash_type_cache_.find(key);
914  if (it == hash_type_cache_.end()) {
915  return {HashType::OneToOne, false};
916  }
917  return {it->second, true};
918 }
919 
921  std::lock_guard<std::mutex> hash_type_cache_lock(hash_type_cache_mutex_);
922  hash_type_cache_.clear();
923 }
size_t offsetBufferOff() const noexceptoverride
catalog_(nullptr)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
void putHashTableOnCpuToCache(const HashTableCacheKey &, std::shared_ptr< HashTable > &hash_table)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
static std::unique_ptr< HashTableCache< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
Definition: HashJoin.h:221
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
Definition: HashJoin.cpp:195
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex hash_type_cache_mutex_
ExecutorDeviceType
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
static std::map< std::vector< ChunkKey >, HashType > hash_type_cache_
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
Definition: HashJoin.cpp:35
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
Definition: HashJoin.h:257
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
size_t getKeyBufferSize() const noexcept
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:318
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
size_t getComponentBufferSize() const noexceptoverride
void freeHashBufferMemory()
Definition: HashJoin.h:244
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:209
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
int getInnerTableRteIdx() const noexceptoverride
static std::pair< HashType, bool > get(const std::vector< ChunkKey > &key)
std::string to_string(char const *&&v)
virtual ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
const std::vector< InputTableInfo > & query_infos_
std::pair< std::optional< size_t >, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
virtual llvm::Value * codegenKey(const CompilationOptions &)
virtual int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
const std::vector< JoinColumnTypeInfo > join_column_types
Definition: HashJoin.h:70
void reify(const HashType preferred_layout)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
HashType getHashType() const noexceptoverride
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:216
ColumnCacheMap & column_cache_
#define LL_INT(v)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id)
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
Definition: HashJoin.cpp:674
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const size_t key_component_width, const size_t key_component_count)
#define LL_CONTEXT
#define AUTOMATIC_IR_METADATA(CGENSTATE)
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
Definition: HashJoin.cpp:495
virtual void reifyWithLayout(const HashType layout)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
HashTable * getHashTableForDevice(const size_t device_id) const
Definition: HashJoin.h:202
#define VLOGGING(n)
Definition: Logger.h:195
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
Definition: HashJoin.cpp:293
#define CHECK_LT(x, y)
Definition: Logger.h:207
int getInnerTableId() const noexceptoverride
int8_t * alloc(const size_t num_bytes) override
#define LL_BUILDER
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id)
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
Definition: HashJoin.h:122
std::optional< HashType > layout_override_
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
Definition: HashTable.cpp:226
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
const Catalog_Namespace::Catalog * catalog_
std::shared_ptr< HashTable > initHashTableOnCpuFromCache(const HashTableCacheKey &)
uint64_t ThreadId
Definition: Logger.h:306
const Data_Namespace::MemoryLevel memory_level_
llvm::Value * hashPtr(const size_t index)
ThreadId thread_id()
Definition: Logger.cpp:732
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const int64_t num_elems)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
#define CHECK(condition)
Definition: Logger.h:197
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Definition: sqldefs.h:31
virtual size_t getKeyComponentCount() const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
virtual size_t getKeyComponentWidth() const
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
Definition: HashTable.cpp:139
Allocate GPU memory using GpuBuffers via DataMgr.
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
static void set(const std::vector< ChunkKey > &key, const HashType hash_type)
std::vector< JoinBucketInfo > join_buckets
Definition: HashJoin.h:72
int cpu_threads()
Definition: thread_count.h:24
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Definition: HashJoin.cpp:523
HashType
Definition: HashTable.h:19
virtual void reifyForDevice(const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
const std::vector< JoinColumn > join_columns
Definition: HashJoin.h:69
#define VLOG(n)
Definition: Logger.h:291
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept
Definition: HashJoin.h:118
const std::shared_ptr< Analyzer::BinOper > condition_
size_t countBufferOff() const noexceptoverride
const std::vector< ChunkKey > chunk_keys