OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BaselineJoinHashTable.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "BaselineJoinHashTable.h"
18 #include "CodeGenerator.h"
19 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "ExpressionRewrite.h"
22 #include "HashJoinKeyHandlers.h"
23 #include "JoinHashTableGpuUtils.h"
24 
26 
27 #include <future>
28 
29 std::vector<std::pair<BaselineJoinHashTable::HashTableCacheKey,
33 
35 std::shared_ptr<BaselineJoinHashTable> BaselineJoinHashTable::getInstance(
36  const std::shared_ptr<Analyzer::BinOper> condition,
37  const std::vector<InputTableInfo>& query_infos,
38  const Data_Namespace::MemoryLevel memory_level,
39  const HashType preferred_hash_type,
40  const int device_count,
41  ColumnCacheMap& column_cache,
42  Executor* executor) {
43  decltype(std::chrono::steady_clock::now()) ts1, ts2;
44 
45  if (VLOGGING(1)) {
46  VLOG(1) << "Building keyed hash table " << getHashTypeString(preferred_hash_type)
47  << " for qual: " << condition->toString();
48  ts1 = std::chrono::steady_clock::now();
49  }
50  auto inner_outer_pairs = normalize_column_pairs(
51  condition.get(), *executor->getCatalog(), executor->getTemporaryTables());
52 
53  const auto& query_info =
54  get_inner_query_info(getInnerTableId(inner_outer_pairs), query_infos).info;
55  const auto total_entries = 2 * query_info.getNumTuplesUpperBound();
56  if (total_entries > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
57  throw TooManyHashEntries();
58  }
59  const auto shard_count = memory_level == Data_Namespace::GPU_LEVEL
61  condition.get(), executor, inner_outer_pairs)
62  : 0;
63  const auto entries_per_device =
64  get_entries_per_device(total_entries, shard_count, device_count, memory_level);
65  auto join_hash_table = std::shared_ptr<BaselineJoinHashTable>(
66  new BaselineJoinHashTable(condition,
67  query_infos,
68  memory_level,
69  preferred_hash_type,
70  entries_per_device,
71  column_cache,
72  executor,
73  inner_outer_pairs,
74  device_count));
75  join_hash_table->checkHashJoinReplicationConstraint(getInnerTableId(inner_outer_pairs));
76  try {
77  join_hash_table->reify();
78  } catch (const TableMustBeReplicated& e) {
79  // Throw a runtime error to abort the query
80  join_hash_table->freeHashBufferMemory();
81  throw std::runtime_error(e.what());
82  } catch (const HashJoinFail& e) {
83  // HashJoinFail exceptions log an error and trigger a retry with a join loop (if
84  // possible)
85  join_hash_table->freeHashBufferMemory();
86  throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns "
87  "involved in equijoin | ") +
88  e.what());
89  } catch (const ColumnarConversionNotSupported& e) {
90  throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") +
91  e.what());
92  } catch (const OutOfMemory& e) {
93  throw HashJoinFail(
94  std::string("Ran out of memory while building hash tables for equijoin | ") +
95  e.what());
96  } catch (const std::exception& e) {
97  throw std::runtime_error(
98  std::string("Fatal error while attempting to build hash tables for join: ") +
99  e.what());
100  }
101  if (VLOGGING(1)) {
102  ts2 = std::chrono::steady_clock::now();
103  VLOG(1) << "Built keyed hash table "
104  << getHashTypeString(join_hash_table->getHashType()) << " in "
105  << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
106  << " ms";
107  }
108  return join_hash_table;
109 }
110 
112  const std::shared_ptr<Analyzer::BinOper> condition,
113  const std::vector<InputTableInfo>& query_infos,
114  const Data_Namespace::MemoryLevel memory_level,
115  const HashType preferred_hash_type,
116  const size_t entry_count,
117  ColumnCacheMap& column_cache,
118  Executor* executor,
119  const std::vector<InnerOuter>& inner_outer_pairs,
120  const int device_count)
121  : condition_(condition)
122  , query_infos_(query_infos)
123  , memory_level_(memory_level)
124  , layout_(preferred_hash_type)
125  , entry_count_(entry_count)
126  , emitted_keys_count_(0)
127  , executor_(executor)
128  , column_cache_(column_cache)
129  , inner_outer_pairs_(inner_outer_pairs)
130  , catalog_(executor->getCatalog())
131  , device_count_(device_count)
132 #ifdef HAVE_CUDA
133  , block_size_(memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
134  ? executor->blockSize()
135  : 0)
136  , grid_size_(memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
137  ? executor->gridSize()
138  : 0) {
140 }
141 #else
142 {
143  CHECK_GT(device_count_, 0);
144 }
145 #endif // HAVE_CUDA
146 
148  const Analyzer::BinOper* condition,
149  const Executor* executor,
150  const std::vector<InnerOuter>& inner_outer_pairs) {
151  for (const auto& inner_outer_pair : inner_outer_pairs) {
152  const auto pair_shard_count = get_shard_count(inner_outer_pair, executor);
153  if (pair_shard_count) {
154  return pair_shard_count;
155  }
156  }
157  return 0;
158 }
159 
161  const int device_id) const noexcept {
162  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
163  return 0;
164  }
165 #ifdef HAVE_CUDA
166  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
167  if (device_type == ExecutorDeviceType::CPU) {
168  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
169  } else {
170  return gpu_hash_table_buff_[device_id]
171  ? reinterpret_cast<CUdeviceptr>(
172  gpu_hash_table_buff_[device_id]->getMemoryPtr())
173  : reinterpret_cast<CUdeviceptr>(nullptr);
174  }
175 #else
176  CHECK(device_type == ExecutorDeviceType::CPU);
177  return reinterpret_cast<int64_t>(&(*cpu_hash_table_buff_)[0]);
178 #endif
179 }
180 
182  const int device_id) const noexcept {
183  if (device_type == ExecutorDeviceType::CPU && !cpu_hash_table_buff_) {
184  return 0;
185  }
186 #ifdef HAVE_CUDA
187  CHECK_LT(static_cast<size_t>(device_id), gpu_hash_table_buff_.size());
188  if (device_type == ExecutorDeviceType::CPU) {
189  return cpu_hash_table_buff_->size() *
190  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
191  } else {
192  return gpu_hash_table_buff_[device_id]
193  ? gpu_hash_table_buff_[device_id]->reservedSize()
194  : 0;
195  }
196 #else
197  CHECK(device_type == ExecutorDeviceType::CPU);
198  return cpu_hash_table_buff_->size() *
199  sizeof(decltype(cpu_hash_table_buff_)::element_type::value_type);
200 #endif
201 }
202 
204  const int device_id,
205  bool raw) const {
206  auto buffer = getJoinHashBuffer(device_type, device_id);
207  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
208 #ifdef HAVE_CUDA
209  std::unique_ptr<int8_t[]> buffer_copy;
210  if (device_type == ExecutorDeviceType::GPU) {
211  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
212 
214  buffer_copy.get(),
215  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
216  buffer_size,
217  device_id);
218  }
219  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
220 #else
221  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
222 #endif // HAVE_CUDA
223  auto ptr2 = ptr1 + offsetBufferOff();
224  auto ptr3 = ptr1 + countBufferOff();
225  auto ptr4 = ptr1 + payloadBufferOff();
227  !condition_->is_overlaps_oper() ? "keyed" : "geo",
232  entry_count_,
233  ptr1,
234  ptr2,
235  ptr3,
236  ptr4,
237  buffer_size,
238  raw);
239 }
240 
241 std::set<DecodedJoinHashBufferEntry> BaselineJoinHashTable::toSet(
242  const ExecutorDeviceType device_type,
243  const int device_id) const {
244  auto buffer = getJoinHashBuffer(device_type, device_id);
245  auto buffer_size = getJoinHashBufferSize(device_type, device_id);
246 #ifdef HAVE_CUDA
247  std::unique_ptr<int8_t[]> buffer_copy;
248  if (device_type == ExecutorDeviceType::GPU) {
249  buffer_copy = std::make_unique<int8_t[]>(buffer_size);
250 
252  buffer_copy.get(),
253  reinterpret_cast<CUdeviceptr>(reinterpret_cast<int8_t*>(buffer)),
254  buffer_size,
255  device_id);
256  }
257  auto ptr1 = buffer_copy ? buffer_copy.get() : reinterpret_cast<const int8_t*>(buffer);
258 #else
259  auto ptr1 = reinterpret_cast<const int8_t*>(buffer);
260 #endif // HAVE_CUDA
261  auto ptr2 = ptr1 + offsetBufferOff();
262  auto ptr3 = ptr1 + countBufferOff();
263  auto ptr4 = ptr1 + payloadBufferOff();
268  entry_count_,
269  ptr1,
270  ptr2,
271  ptr3,
272  ptr4,
273  buffer_size);
274 }
275 
277  const {
278  std::vector<const void*> sd_inner_proxy_per_key;
279  std::vector<const void*> sd_outer_proxy_per_key;
280  std::vector<ChunkKey> cache_key_chunks; // used for the cache key
281  for (const auto& inner_outer_pair : inner_outer_pairs_) {
282  const auto inner_col = inner_outer_pair.first;
283  const auto outer_col = inner_outer_pair.second;
284  const auto& inner_ti = inner_col->get_type_info();
285  const auto& outer_ti = outer_col->get_type_info();
286  ChunkKey cache_key_chunks_for_column{catalog_->getCurrentDB().dbId,
287  inner_col->get_table_id(),
288  inner_col->get_column_id()};
289  if (inner_ti.is_string() &&
290  !(inner_ti.get_comp_param() == outer_ti.get_comp_param())) {
291  CHECK(outer_ti.is_string());
292  CHECK(inner_ti.get_compression() == kENCODING_DICT &&
293  outer_ti.get_compression() == kENCODING_DICT);
294  const auto sd_inner_proxy = executor_->getStringDictionaryProxy(
295  inner_ti.get_comp_param(), executor_->getRowSetMemoryOwner(), true);
296  const auto sd_outer_proxy = executor_->getStringDictionaryProxy(
297  outer_ti.get_comp_param(), executor_->getRowSetMemoryOwner(), true);
298  CHECK(sd_inner_proxy && sd_outer_proxy);
299  sd_inner_proxy_per_key.push_back(sd_inner_proxy);
300  sd_outer_proxy_per_key.push_back(sd_outer_proxy);
301  cache_key_chunks_for_column.push_back(sd_outer_proxy->getGeneration());
302  } else {
303  sd_inner_proxy_per_key.emplace_back();
304  sd_outer_proxy_per_key.emplace_back();
305  }
306  cache_key_chunks.push_back(cache_key_chunks_for_column);
307  }
308  return {sd_inner_proxy_per_key, sd_outer_proxy_per_key, cache_key_chunks};
309 }
310 
312  auto timer = DEBUG_TIMER(__func__);
314 #ifdef HAVE_CUDA
315  gpu_hash_table_buff_.resize(device_count_);
316 #endif // HAVE_CUDA
317  const auto composite_key_info = getCompositeKeyInfo();
318  const auto type_and_found = HashTypeCache::get(composite_key_info.cache_key_chunks);
319  const auto layout = type_and_found.second ? type_and_found.first : layout_;
320 
321  if (condition_->is_overlaps_oper()) {
322  CHECK_EQ(inner_outer_pairs_.size(), size_t(1));
324 
325  if (inner_outer_pairs_[0].second->get_type_info().is_array()) {
327  } else {
329  }
330  try {
331  reifyWithLayout(layout);
332  return;
333  } catch (const std::exception& e) {
334  VLOG(1) << "Caught exception while building overlaps baseline hash table: "
335  << e.what();
336  throw;
337  }
338  }
339 
340  try {
341  reifyWithLayout(layout);
342  } catch (const std::exception& e) {
343  VLOG(1) << "Caught exception while building baseline hash table: " << e.what();
345  HashTypeCache::set(composite_key_info.cache_key_chunks,
348  }
349 }
350 
352  const JoinHashTableInterface::HashType layout) {
353  layout_ = layout;
354  const auto& query_info = get_inner_query_info(getInnerTableId(), query_infos_).info;
355  if (query_info.fragments.empty()) {
356  return;
357  }
358  auto& data_mgr = catalog_->getDataMgr();
359  std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
361  for (int device_id = 0; device_id < device_count_; ++device_id) {
362  dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(&data_mgr, device_id));
363  }
364  }
365  std::vector<BaselineJoinHashTable::ColumnsForDevice> columns_per_device;
366  const auto shard_count = shardCount();
367  for (int device_id = 0; device_id < device_count_; ++device_id) {
368  const auto fragments =
369  shard_count
370  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
371  : query_info.fragments;
372  const auto columns_for_device =
373  fetchColumnsForDevice(fragments,
374  device_id,
376  ? dev_buff_owners[device_id].get()
377  : nullptr);
378  columns_per_device.push_back(columns_for_device);
379  }
381  CHECK(!columns_per_device.front().join_columns.empty());
382  emitted_keys_count_ = columns_per_device.front().join_columns.front().num_elems;
383  size_t tuple_count;
384  std::tie(tuple_count, std::ignore) = approximateTupleCount(columns_per_device);
385  const auto entry_count = 2 * std::max(tuple_count, size_t(1));
386 
387  entry_count_ =
388  get_entries_per_device(entry_count, shard_count, device_count_, memory_level_);
389  }
390  std::vector<std::future<void>> init_threads;
391  for (int device_id = 0; device_id < device_count_; ++device_id) {
392  const auto fragments =
393  shard_count
394  ? only_shards_for_device(query_info.fragments, device_id, device_count_)
395  : query_info.fragments;
396  init_threads.push_back(std::async(std::launch::async,
398  this,
399  columns_per_device[device_id],
400  layout,
401  device_id,
402  logger::thread_id()));
403  }
404  for (auto& init_thread : init_threads) {
405  init_thread.wait();
406  }
407  for (auto& init_thread : init_threads) {
408  init_thread.get();
409  }
410 }
411 
413  const std::vector<ColumnsForDevice>& columns_per_device) const {
414  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
415  CountDistinctDescriptor count_distinct_desc{
417  0,
418  11,
419  true,
420  effective_memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL
423  1};
424  const auto padded_size_bytes = count_distinct_desc.bitmapPaddedSizeBytes();
425 
426  CHECK(!columns_per_device.empty() && !columns_per_device.front().join_columns.empty());
427 
428  if (effective_memory_level == Data_Namespace::MemoryLevel::CPU_LEVEL) {
429  const auto composite_key_info = getCompositeKeyInfo();
430  HashTableCacheKey cache_key{columns_per_device.front().join_columns.front().num_elems,
431  composite_key_info.cache_key_chunks,
432  condition_->get_optype()};
433  const auto cached_count_info = getApproximateTupleCountFromCache(cache_key);
434  if (cached_count_info.first >= 0) {
435  return std::make_pair(cached_count_info.first, cached_count_info.second);
436  }
437  int thread_count = cpu_threads();
438  std::vector<uint8_t> hll_buffer_all_cpus(thread_count * padded_size_bytes);
439  auto hll_result = &hll_buffer_all_cpus[0];
440 
441  approximate_distinct_tuples(hll_result,
442  count_distinct_desc.bitmap_sz_bits,
443  padded_size_bytes,
444  columns_per_device.front().join_columns,
445  columns_per_device.front().join_column_types,
446  thread_count);
447  for (int i = 1; i < thread_count; ++i) {
448  hll_unify(hll_result,
449  hll_result + i * padded_size_bytes,
450  1 << count_distinct_desc.bitmap_sz_bits);
451  }
452  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
453  }
454 #ifdef HAVE_CUDA
455  auto& data_mgr = catalog_->getDataMgr();
456  std::vector<std::vector<uint8_t>> host_hll_buffers(device_count_);
457  for (auto& host_hll_buffer : host_hll_buffers) {
458  host_hll_buffer.resize(count_distinct_desc.bitmapPaddedSizeBytes());
459  }
460  std::vector<std::future<void>> approximate_distinct_device_threads;
461  for (int device_id = 0; device_id < device_count_; ++device_id) {
462  approximate_distinct_device_threads.emplace_back(std::async(
463  std::launch::async,
464  [device_id,
465  &columns_per_device,
466  &count_distinct_desc,
467  &data_mgr,
468  &host_hll_buffers,
469  this] {
470  CudaAllocator allocator(&data_mgr, device_id);
471  auto device_hll_buffer =
472  allocator.alloc(count_distinct_desc.bitmapPaddedSizeBytes());
473  data_mgr.getCudaMgr()->zeroDeviceMem(
474  device_hll_buffer, count_distinct_desc.bitmapPaddedSizeBytes(), device_id);
475  const auto& columns_for_device = columns_per_device[device_id];
476  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(
477  columns_for_device.join_columns, allocator);
478  auto join_column_types_gpu = transfer_vector_of_flat_objects_to_gpu(
479  columns_for_device.join_column_types, allocator);
480  const auto key_handler =
481  GenericKeyHandler(columns_for_device.join_columns.size(),
482  true,
483  join_columns_gpu,
484  join_column_types_gpu,
485  nullptr,
486  nullptr);
487  const auto key_handler_gpu =
488  transfer_flat_object_to_gpu(key_handler, allocator);
490  reinterpret_cast<uint8_t*>(device_hll_buffer),
491  count_distinct_desc.bitmap_sz_bits,
492  key_handler_gpu,
493  columns_for_device.join_columns[0].num_elems,
494  block_size_,
495  grid_size_);
496 
497  auto& host_hll_buffer = host_hll_buffers[device_id];
498  copy_from_gpu(&data_mgr,
499  &host_hll_buffer[0],
500  reinterpret_cast<CUdeviceptr>(device_hll_buffer),
501  count_distinct_desc.bitmapPaddedSizeBytes(),
502  device_id);
503  }));
504  }
505  for (auto& child : approximate_distinct_device_threads) {
506  child.get();
507  }
508  CHECK_EQ(Data_Namespace::MemoryLevel::GPU_LEVEL, effective_memory_level);
509  auto& result_hll_buffer = host_hll_buffers.front();
510  auto hll_result = reinterpret_cast<int32_t*>(&result_hll_buffer[0]);
511  for (int device_id = 1; device_id < device_count_; ++device_id) {
512  auto& host_hll_buffer = host_hll_buffers[device_id];
513  hll_unify(hll_result,
514  reinterpret_cast<int32_t*>(&host_hll_buffer[0]),
515  1 << count_distinct_desc.bitmap_sz_bits);
516  }
517  return std::make_pair(hll_size(hll_result, count_distinct_desc.bitmap_sz_bits), 0);
518 #else
519  UNREACHABLE();
520  return {0, 0};
521 #endif // HAVE_CUDA
522 }
523 
525  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
526  const int device_id,
527  DeviceAllocator* dev_buff_owner) {
528  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
529 
530  std::vector<JoinColumn> join_columns;
531  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
532  std::vector<JoinColumnTypeInfo> join_column_types;
533  std::vector<JoinBucketInfo> join_bucket_info;
534  std::vector<std::shared_ptr<void>> malloc_owner;
535  for (const auto& inner_outer_pair : inner_outer_pairs_) {
536  const auto inner_col = inner_outer_pair.first;
537  const auto inner_cd = get_column_descriptor_maybe(
538  inner_col->get_column_id(), inner_col->get_table_id(), *catalog_);
539  if (inner_cd && inner_cd->isVirtualCol) {
541  }
542  join_columns.emplace_back(fetchJoinColumn(inner_col,
543  fragments,
544  effective_memory_level,
545  device_id,
546  chunks_owner,
547  dev_buff_owner,
548  malloc_owner,
549  executor_,
550  &column_cache_));
551  const auto& ti = inner_col->get_type_info();
552  join_column_types.emplace_back(JoinColumnTypeInfo{static_cast<size_t>(ti.get_size()),
553  0,
554  0,
556  isBitwiseEq(),
557  0,
559  }
560  return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
561 }
562 
565  const int device_id,
566  const logger::ThreadId parent_thread_id) {
567  DEBUG_TIMER_NEW_THREAD(parent_thread_id);
568  const auto effective_memory_level = getEffectiveMemoryLevel(inner_outer_pairs_);
569  const auto err = initHashTableForDevice(columns_for_device.join_columns,
570  columns_for_device.join_column_types,
571  columns_for_device.join_buckets,
572  layout,
573  effective_memory_level,
574  device_id);
575  if (err) {
576  switch (err) {
578  throw FailedToFetchColumn();
581  default:
582  throw HashJoinFail(
583  std::string("Unrecognized error when initializing baseline hash table (") +
584  std::to_string(err) + std::string(")"));
585  }
586  }
587 }
588 
591  return 0;
592  }
595 }
596 
598  for (const auto& inner_outer_pair : inner_outer_pairs_) {
599  const auto inner_col = inner_outer_pair.first;
600  const auto& inner_col_ti = inner_col->get_type_info();
601  if (inner_col_ti.get_logical_size() > 4) {
602  CHECK_EQ(8, inner_col_ti.get_logical_size());
603  return 8;
604  }
605  }
606  return 4;
607 }
608 
610  return inner_outer_pairs_.size();
611 }
612 
614  const std::vector<InnerOuter>& inner_outer_pairs) const {
615  for (const auto& inner_outer_pair : inner_outer_pairs) {
617  inner_outer_pair.first, inner_outer_pair.second, executor_)) {
619  }
620  }
621  return memory_level_;
622 }
623 
625  const std::vector<JoinColumn>& join_columns,
626  const std::vector<JoinColumnTypeInfo>& join_column_types,
627  const std::vector<JoinBucketInfo>& join_bucket_info,
628  const JoinHashTableInterface::HashType layout) {
629  auto timer = DEBUG_TIMER(__func__);
630  const auto composite_key_info = getCompositeKeyInfo();
631  CHECK(!join_columns.empty());
632  HashTableCacheKey cache_key{join_columns.front().num_elems,
633  composite_key_info.cache_key_chunks,
634  condition_->get_optype()};
635  initHashTableOnCpuFromCache(cache_key);
636  if (cpu_hash_table_buff_) {
637  return 0;
638  }
639  const auto key_component_width = getKeyComponentWidth();
640  const auto key_component_count = getKeyComponentCount();
641  const auto entry_size =
642  (key_component_count +
643  (layout == JoinHashTableInterface::HashType::OneToOne ? 1 : 0)) *
644  key_component_width;
645  const auto keys_for_all_rows = join_columns.front().num_elems;
646  const size_t one_to_many_hash_entries =
648  ? 2 * entry_count_ + keys_for_all_rows
649  : 0;
650  const size_t hash_table_size =
651  entry_size * entry_count_ + one_to_many_hash_entries * sizeof(int32_t);
652 
653  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
654  if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
655  throw TooManyHashEntries();
656  }
657 
658  VLOG(1) << "Initializing CPU Join Hash Table with " << entry_count_
659  << " hash entries and " << one_to_many_hash_entries
660  << " entries in the one to many buffer";
661  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
662 
663  cpu_hash_table_buff_.reset(new std::vector<int8_t>(hash_table_size));
664  int thread_count = cpu_threads();
665  std::vector<std::future<void>> init_cpu_buff_threads;
666  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
667  init_cpu_buff_threads.emplace_back(
668  std::async(std::launch::async,
669  [this,
670  key_component_count,
671  key_component_width,
672  thread_idx,
673  thread_count,
674  layout] {
675  switch (key_component_width) {
676  case 4:
678  &(*cpu_hash_table_buff_)[0],
679  entry_count_,
680  key_component_count,
682  -1,
683  thread_idx,
684  thread_count);
685  break;
686  case 8:
688  &(*cpu_hash_table_buff_)[0],
689  entry_count_,
690  key_component_count,
692  -1,
693  thread_idx,
694  thread_count);
695  break;
696  default:
697  CHECK(false);
698  }
699  }));
700  }
701  for (auto& child : init_cpu_buff_threads) {
702  child.get();
703  }
704  std::vector<std::future<int>> fill_cpu_buff_threads;
705  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
706  fill_cpu_buff_threads.emplace_back(std::async(
707  std::launch::async,
708  [this,
709  &composite_key_info,
710  &join_columns,
711  &join_column_types,
712  key_component_count,
713  key_component_width,
714  layout,
715  thread_idx,
716  thread_count] {
717  switch (key_component_width) {
718  case 4: {
719  const auto key_handler =
720  GenericKeyHandler(key_component_count,
721  true,
722  &join_columns[0],
723  &join_column_types[0],
724  &composite_key_info.sd_inner_proxy_per_key[0],
725  &composite_key_info.sd_outer_proxy_per_key[0]);
727  &(*cpu_hash_table_buff_)[0],
728  entry_count_,
729  -1,
730  key_component_count,
732  &key_handler,
733  join_columns[0].num_elems,
734  thread_idx,
735  thread_count);
736  break;
737  }
738  case 8: {
739  const auto key_handler =
740  GenericKeyHandler(key_component_count,
741  true,
742  &join_columns[0],
743  &join_column_types[0],
744  &composite_key_info.sd_inner_proxy_per_key[0],
745  &composite_key_info.sd_outer_proxy_per_key[0]);
747  &(*cpu_hash_table_buff_)[0],
748  entry_count_,
749  -1,
750  key_component_count,
752  &key_handler,
753  join_columns[0].num_elems,
754  thread_idx,
755  thread_count);
756  break;
757  }
758  default:
759  CHECK(false);
760  }
761  return -1;
762  }));
763  }
764  int err = 0;
765  for (auto& child : fill_cpu_buff_threads) {
766  int partial_err = child.get();
767  if (partial_err) {
768  err = partial_err;
769  }
770  }
771  if (err) {
772  cpu_hash_table_buff_.reset();
773  return err;
774  }
776  auto one_to_many_buff = reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] +
777  entry_count_ * entry_size);
778  init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
779  switch (key_component_width) {
780  case 4: {
781  const auto composite_key_dict =
782  reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0]);
784  composite_key_dict,
785  entry_count_,
786  -1,
787  key_component_count,
788  join_columns,
789  join_column_types,
790  join_bucket_info,
791  composite_key_info.sd_inner_proxy_per_key,
792  composite_key_info.sd_outer_proxy_per_key,
793  thread_count);
794  break;
795  }
796  case 8: {
797  const auto composite_key_dict =
798  reinterpret_cast<int64_t*>(&(*cpu_hash_table_buff_)[0]);
800  composite_key_dict,
801  entry_count_,
802  -1,
803  key_component_count,
804  join_columns,
805  join_column_types,
806  join_bucket_info,
807  composite_key_info.sd_inner_proxy_per_key,
808  composite_key_info.sd_outer_proxy_per_key,
809  thread_count);
810  break;
811  }
812  default:
813  CHECK(false);
814  }
815  }
816  if (!err && getInnerTableId() > 0) {
817  putHashTableOnCpuToCache(cache_key);
818  }
819  return err;
820 }
821 
823  const std::vector<JoinColumn>& join_columns,
824  const std::vector<JoinColumnTypeInfo>& join_column_types,
825  const std::vector<JoinBucketInfo>& join_bucket_info,
827  const size_t key_component_width,
828  const size_t key_component_count,
829  const int device_id) {
830  auto timer = DEBUG_TIMER(__func__);
831  int err = 0;
832 #ifdef HAVE_CUDA
833  auto& data_mgr = catalog_->getDataMgr();
834  CudaAllocator allocator(&data_mgr, device_id);
835  auto dev_err_buff = reinterpret_cast<CUdeviceptr>(allocator.alloc(sizeof(int)));
836  copy_to_gpu(&data_mgr, dev_err_buff, &err, sizeof(err), device_id);
837  switch (key_component_width) {
838  case 4:
840  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
841  entry_count_,
842  key_component_count,
844  -1,
845  block_size_,
846  grid_size_);
847  break;
848  case 8:
850  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
851  entry_count_,
852  key_component_count,
854  -1,
855  block_size_,
856  grid_size_);
857  break;
858  default:
859  UNREACHABLE();
860  }
861  auto join_columns_gpu = transfer_vector_of_flat_objects_to_gpu(join_columns, allocator);
862  auto hash_buff =
863  reinterpret_cast<int8_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
864  auto join_column_types_gpu =
865  transfer_vector_of_flat_objects_to_gpu(join_column_types, allocator);
866 
867  const auto key_handler = GenericKeyHandler(key_component_count,
868  true,
869  join_columns_gpu,
870  join_column_types_gpu,
871  nullptr,
872  nullptr);
873  const auto key_handler_gpu = transfer_flat_object_to_gpu(key_handler, allocator);
874  switch (key_component_width) {
875  case 4: {
877  hash_buff,
878  entry_count_,
879  -1,
880  key_component_count,
882  reinterpret_cast<int*>(dev_err_buff),
883  key_handler_gpu,
884  join_columns.front().num_elems,
885  block_size_,
886  grid_size_);
887  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
888  break;
889  }
890  case 8: {
892  hash_buff,
893  entry_count_,
894  -1,
895  key_component_count,
897  reinterpret_cast<int*>(dev_err_buff),
898  key_handler_gpu,
899  join_columns.front().num_elems,
900  block_size_,
901  grid_size_);
902  copy_from_gpu(&data_mgr, &err, dev_err_buff, sizeof(err), device_id);
903  break;
904  }
905  default:
906  UNREACHABLE();
907  }
908  if (err) {
909  return err;
910  }
912  const auto entry_size = key_component_count * key_component_width;
913  auto one_to_many_buff = reinterpret_cast<int32_t*>(
914  gpu_hash_table_buff_[device_id]->getMemoryPtr() + entry_count_ * entry_size);
915  switch (key_component_width) {
916  case 4: {
917  const auto composite_key_dict =
918  reinterpret_cast<int32_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
920  one_to_many_buff, entry_count_, -1, block_size_, grid_size_);
922  composite_key_dict,
923  entry_count_,
924  -1,
925  key_component_count,
926  key_handler_gpu,
927  join_columns.front().num_elems,
928  block_size_,
929  grid_size_);
930  break;
931  }
932  case 8: {
933  const auto composite_key_dict =
934  reinterpret_cast<int64_t*>(gpu_hash_table_buff_[device_id]->getMemoryPtr());
936  one_to_many_buff, entry_count_, -1, block_size_, grid_size_);
938  composite_key_dict,
939  entry_count_,
940  -1,
941  key_handler_gpu,
942  join_columns.front().num_elems,
943  block_size_,
944  grid_size_);
945  break;
946  }
947  default:
948  UNREACHABLE();
949  }
950  }
951 #else
952  UNREACHABLE();
953 #endif
954  return err;
955 }
956 
958  const std::vector<JoinColumn>& join_columns,
959  const std::vector<JoinColumnTypeInfo>& join_column_types,
960  const std::vector<JoinBucketInfo>& join_bucket_info,
962  const Data_Namespace::MemoryLevel effective_memory_level,
963  const int device_id) {
964  auto timer = DEBUG_TIMER(__func__);
965  const auto key_component_width = getKeyComponentWidth();
966  const auto key_component_count = getKeyComponentCount();
967  int err = 0;
968 #ifdef HAVE_CUDA
969  auto& data_mgr = catalog_->getDataMgr();
971  const auto entry_size =
972  (key_component_count +
973  (layout == JoinHashTableInterface::HashType::OneToOne ? 1 : 0)) *
974  key_component_width;
975  const auto keys_for_all_rows = emitted_keys_count_;
976  const size_t one_to_many_hash_entries = layoutRequiresAdditionalBuffers(layout)
977  ? 2 * entry_count_ + keys_for_all_rows
978  : 0;
979  const size_t hash_table_size =
980  entry_size * entry_count_ + one_to_many_hash_entries * sizeof(int32_t);
981 
982  // We can't allocate more than 2GB contiguous memory on GPU and each entry is 4 bytes.
983  if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
984  throw TooManyHashEntries();
985  }
986 
987  VLOG(1) << "Initializing GPU Hash Table for device " << device_id << " with "
988  << entry_count_ << " hash entries and " << one_to_many_hash_entries
989  << " entries in the one to many buffer";
990  VLOG(1) << "Total hash table size: " << hash_table_size << " Bytes";
991  gpu_hash_table_buff_[device_id] =
992  CudaAllocator::allocGpuAbstractBuffer(&data_mgr, hash_table_size, device_id);
993  }
994 #else
995  CHECK_EQ(Data_Namespace::CPU_LEVEL, effective_memory_level);
996 #endif
997  if (effective_memory_level == Data_Namespace::CPU_LEVEL) {
998  std::lock_guard<std::mutex> cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_);
999  err = initHashTableOnCpu(join_columns, join_column_types, join_bucket_info, layout);
1000  // Transfer the hash table on the GPU if we've only built it on CPU
1001  // but the query runs on GPU (join on dictionary encoded columns).
1002  // Don't transfer the buffer if there was an error since we'll bail anyway.
1003  if (memory_level_ == Data_Namespace::GPU_LEVEL && !err) {
1004 #ifdef HAVE_CUDA
1005  copy_to_gpu(
1006  &data_mgr,
1007  reinterpret_cast<CUdeviceptr>(gpu_hash_table_buff_[device_id]->getMemoryPtr()),
1008  &(*cpu_hash_table_buff_)[0],
1009  cpu_hash_table_buff_->size() * sizeof((*cpu_hash_table_buff_)[0]),
1010  device_id);
1011 #else
1012  CHECK(false);
1013 #endif
1014  }
1015  } else {
1016  err = initHashTableOnGpu(join_columns,
1017  join_column_types,
1018  join_bucket_info,
1019  layout,
1020  key_component_width,
1021  key_component_count,
1022  device_id);
1023  }
1024  return err;
1025 }
1026 
1027 #define LL_CONTEXT executor_->cgen_state_->context_
1028 #define LL_BUILDER executor_->cgen_state_->ir_builder_
1029 #define LL_INT(v) executor_->cgen_state_->llInt(v)
1030 #define LL_FP(v) executor_->cgen_state_->llFp(v)
1031 #define ROW_FUNC executor_->cgen_state_->row_func_
1032 
1034  const size_t index) {
1036  const auto key_component_width = getKeyComponentWidth();
1037  CHECK(key_component_width == 4 || key_component_width == 8);
1038  auto key_buff_lv = codegenKey(co);
1039  const auto hash_ptr = hashPtr(index);
1040  const auto key_ptr_lv =
1041  LL_BUILDER.CreatePointerCast(key_buff_lv, llvm::Type::getInt8PtrTy(LL_CONTEXT));
1042  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1043  return executor_->cgen_state_->emitExternalCall(
1044  "baseline_hash_join_idx_" + std::to_string(key_component_width * 8),
1045  get_int_type(64, LL_CONTEXT),
1046  {hash_ptr, key_ptr_lv, key_size_lv, LL_INT(entry_count_)});
1047 }
1048 
1050  const CompilationOptions& co,
1051  const size_t index) {
1052  const auto key_component_width = getKeyComponentWidth();
1053  CHECK(key_component_width == 4 || key_component_width == 8);
1054  auto key_buff_lv = codegenKey(co);
1056  auto hash_ptr = JoinHashTable::codegenHashTableLoad(index, executor_);
1057  const auto composite_dict_ptr_type =
1058  llvm::Type::getIntNPtrTy(LL_CONTEXT, key_component_width * 8);
1059  const auto composite_key_dict =
1060  hash_ptr->getType()->isPointerTy()
1061  ? LL_BUILDER.CreatePointerCast(hash_ptr, composite_dict_ptr_type)
1062  : LL_BUILDER.CreateIntToPtr(hash_ptr, composite_dict_ptr_type);
1063  const auto key_component_count = getKeyComponentCount();
1064  const auto key = executor_->cgen_state_->emitExternalCall(
1065  "get_composite_key_index_" + std::to_string(key_component_width * 8),
1066  get_int_type(64, LL_CONTEXT),
1067  {key_buff_lv,
1068  LL_INT(key_component_count),
1069  composite_key_dict,
1070  LL_INT(entry_count_)});
1071  auto one_to_many_ptr = hash_ptr;
1072  if (one_to_many_ptr->getType()->isPointerTy()) {
1073  one_to_many_ptr =
1074  LL_BUILDER.CreatePtrToInt(hash_ptr, llvm::Type::getInt64Ty(LL_CONTEXT));
1075  } else {
1076  CHECK(one_to_many_ptr->getType()->isIntegerTy(64));
1077  }
1078  const auto composite_key_dict_size = offsetBufferOff();
1079  one_to_many_ptr =
1080  LL_BUILDER.CreateAdd(one_to_many_ptr, LL_INT(composite_key_dict_size));
1082  {one_to_many_ptr, key, LL_INT(int64_t(0)), LL_INT(entry_count_ - 1)},
1083  false,
1084  false,
1085  false,
1087  executor_);
1088 }
1089 
1091  return getKeyBufferSize();
1092 }
1093 
1094 size_t BaselineJoinHashTable::countBufferOff() const noexcept {
1097  } else {
1098  return getKeyBufferSize();
1099  }
1100 }
1101 
1105  } else {
1106  return getKeyBufferSize();
1107  }
1108 }
1109 
1111  // TODO: use freeHashBufferMemory?
1112 #ifdef HAVE_CUDA
1113  CHECK(executor_);
1114  CHECK(executor_->catalog_);
1115  auto& data_mgr = executor_->catalog_->getDataMgr();
1116  for (auto& gpu_buffer : gpu_hash_table_buff_) {
1117  if (gpu_buffer) {
1118  data_mgr.free(gpu_buffer);
1119  }
1120  }
1121 #endif
1122 }
1123 
1125  const auto key_component_width = getKeyComponentWidth();
1126  CHECK(key_component_width == 4 || key_component_width == 8);
1127  const auto key_component_count = getKeyComponentCount();
1129  return entry_count_ * key_component_count * key_component_width;
1130  } else {
1131  return entry_count_ * (key_component_count + 1) * key_component_width;
1132  }
1133 }
1134 
1136  return entry_count_ * sizeof(int32_t);
1137 }
1138 
1140  const auto key_component_width = getKeyComponentWidth();
1141  CHECK(key_component_width == 4 || key_component_width == 8);
1142  const auto key_size_lv = LL_INT(getKeyComponentCount() * key_component_width);
1143  llvm::Value* key_buff_lv{nullptr};
1144  switch (key_component_width) {
1145  case 4:
1146  key_buff_lv =
1147  LL_BUILDER.CreateAlloca(llvm::Type::getInt32Ty(LL_CONTEXT), key_size_lv);
1148  break;
1149  case 8:
1150  key_buff_lv =
1151  LL_BUILDER.CreateAlloca(llvm::Type::getInt64Ty(LL_CONTEXT), key_size_lv);
1152  break;
1153  default:
1154  CHECK(false);
1155  }
1156 
1157  CodeGenerator code_generator(executor_);
1158  for (size_t i = 0; i < getKeyComponentCount(); ++i) {
1159  const auto key_comp_dest_lv = LL_BUILDER.CreateGEP(key_buff_lv, LL_INT(i));
1160  const auto& inner_outer_pair = inner_outer_pairs_[i];
1161  const auto outer_col = inner_outer_pair.second;
1162  const auto col_lvs = code_generator.codegen(outer_col, true, co);
1163  CHECK_EQ(size_t(1), col_lvs.size());
1164  const auto col_lv = LL_BUILDER.CreateSExt(
1165  col_lvs.front(), get_int_type(key_component_width * 8, LL_CONTEXT));
1166  LL_BUILDER.CreateStore(col_lv, key_comp_dest_lv);
1167  }
1168  return key_buff_lv;
1169 }
1170 
1171 llvm::Value* BaselineJoinHashTable::hashPtr(const size_t index) {
1172  auto hash_ptr = JoinHashTable::codegenHashTableLoad(index, executor_);
1173  const auto pi8_type = llvm::Type::getInt8PtrTy(LL_CONTEXT);
1174  return hash_ptr->getType()->isPointerTy()
1175  ? LL_BUILDER.CreatePointerCast(hash_ptr, pi8_type)
1176  : LL_BUILDER.CreateIntToPtr(hash_ptr, pi8_type);
1177 }
1178 
1179 #undef ROW_FUNC
1180 #undef LL_INT
1181 #undef LL_BUILDER
1182 #undef LL_CONTEXT
1183 
1185  try {
1187  } catch (...) {
1188  CHECK(false);
1189  }
1190  return 0;
1191 }
1192 
1194  CHECK(!inner_outer_pairs_.empty());
1195  const auto first_inner_col = inner_outer_pairs_.front().first;
1196  return first_inner_col->get_rte_idx();
1197 }
1198 
1200  return layout_;
1201 }
1202 
1204  const std::vector<InnerOuter>& inner_outer_pairs) {
1205  CHECK(!inner_outer_pairs.empty());
1206  const auto first_inner_col = inner_outer_pairs.front().first;
1207  return first_inner_col->get_table_id();
1208 }
1209 
1211  if (!g_cluster) {
1212  return;
1213  }
1214  if (table_id >= 0) {
1215  const auto inner_td = catalog_->getMetadataForTable(table_id);
1216  CHECK(inner_td);
1217  const auto shard_count = shardCount();
1218  if (!shard_count && !table_is_replicated(inner_td)) {
1219  throw TableMustBeReplicated(inner_td->tableName);
1220  }
1221  }
1222 }
1223 
1226  std::lock_guard<std::mutex> hash_table_cache_lock(hash_table_cache_mutex_);
1227  for (const auto& kv : hash_table_cache_) {
1228  if (kv.first == key) {
1229  return &kv.second;
1230  }
1231  }
1232  return nullptr;
1233 }
1234 
1236  auto timer = DEBUG_TIMER(__func__);
1237  VLOG(1) << "Checking CPU hash table cache.";
1238  std::lock_guard<std::mutex> hash_table_cache_lock(hash_table_cache_mutex_);
1239  if (hash_table_cache_.size() == 0) {
1240  VLOG(1) << "CPU hash table cache was empty.";
1241  }
1242  for (const auto& kv : hash_table_cache_) {
1243  if (kv.first == key) {
1244  VLOG(1) << "Found a suitable hash table in the cache.";
1245  cpu_hash_table_buff_ = kv.second.buffer;
1246  layout_ = kv.second.type;
1247  entry_count_ = kv.second.entry_count;
1248  emitted_keys_count_ = kv.second.emitted_keys_count;
1249  break;
1250  } else {
1251  VLOG(1) << hash_table_cache_.size()
1252  << " hash tables found in cache. None were suitable for this query.";
1253  }
1254  }
1255 }
1256 
1258  for (auto chunk_key : key.chunk_keys) {
1259  CHECK_GE(chunk_key.size(), size_t(2));
1260  if (chunk_key[1] < 0) {
1261  return;
1262  }
1263  }
1264 
1265  std::lock_guard<std::mutex> hash_table_cache_lock(hash_table_cache_mutex_);
1266  VLOG(1) << "Storing hash table in cache.";
1267  for (const auto& kv : hash_table_cache_) {
1268  if (std::get<0>(kv) == key) {
1269  return;
1270  }
1271  }
1272  hash_table_cache_.emplace_back(
1273  key,
1276 }
1277 
1279  const HashTableCacheKey& key) const {
1280  for (auto chunk_key : key.chunk_keys) {
1281  CHECK_GE(chunk_key.size(), size_t(2));
1282  if (chunk_key[1] < 0) {
1283  return std::make_pair(-1, 0);
1284  ;
1285  }
1286  }
1287 
1288  std::lock_guard<std::mutex> hash_table_cache_lock(hash_table_cache_mutex_);
1289  for (const auto& kv : hash_table_cache_) {
1290  if (kv.first == key) {
1291  return std::make_pair(kv.second.entry_count / 2, kv.second.emitted_keys_count);
1292  }
1293  }
1294  return std::make_pair(-1, 0);
1295 }
1296 
1298  return condition_->get_optype() == kBW_EQ;
1299 }
1300 
1302 #ifdef HAVE_CUDA
1304 #endif
1306 }
1307 
1309 #ifdef HAVE_CUDA
1310  auto& data_mgr = catalog_->getDataMgr();
1311  for (auto& buf : gpu_hash_table_buff_) {
1312  if (buf) {
1313  CudaAllocator::freeGpuAbstractBuffer(&data_mgr, buf);
1314  buf = nullptr;
1315  }
1316  }
1317 #else
1318  CHECK(false);
1319 #endif // HAVE_CUDA
1320 }
1321 
1323  cpu_hash_table_buff_.reset();
1324 }
1325 
1326 std::map<std::vector<ChunkKey>, JoinHashTableInterface::HashType>
1329 
1330 void HashTypeCache::set(const std::vector<ChunkKey>& key,
1331  const JoinHashTableInterface::HashType hash_type) {
1332  for (auto chunk_key : key) {
1333  CHECK_GE(chunk_key.size(), size_t(2));
1334  if (chunk_key[1] < 0) {
1335  return;
1336  }
1337  }
1338  std::lock_guard<std::mutex> hash_type_cache_lock(hash_type_cache_mutex_);
1339  hash_type_cache_[key] = hash_type;
1340 }
1341 
1342 std::pair<JoinHashTableInterface::HashType, bool> HashTypeCache::get(
1343  const std::vector<ChunkKey>& key) {
1344  std::lock_guard<std::mutex> hash_type_cache_lock(hash_type_cache_mutex_);
1345  const auto it = hash_type_cache_.find(key);
1346  if (it == hash_type_cache_.end()) {
1348  }
1349  return {it->second, true};
1350 }
size_t offsetBufferOff() const noexceptoverride
catalog_(nullptr)
bool layoutRequiresAdditionalBuffers(JoinHashTableInterface::HashType layout) const noexceptoverride
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:35
static void set(const std::vector< ChunkKey > &key, const JoinHashTableInterface::HashType hash_type)
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
static std::map< std::vector< ChunkKey >, JoinHashTableInterface::HashType > hash_type_cache_
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
void putHashTableOnCpuToCache(const HashTableCacheKey &)
virtual int initHashTableOnGpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout, const size_t key_component_width, const size_t key_component_count, const int device_id)
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const GenericKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
static std::mutex hash_type_cache_mutex_
std::shared_ptr< std::vector< int8_t > > cpu_hash_table_buff_
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ExecutorDeviceType
size_t getComponentBufferSize() const noexcept
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
virtual ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool needs_dictionary_translation(const Analyzer::ColumnVar *inner_col, const Analyzer::Expr *outer_col_expr, const Executor *executor)
BaselineJoinHashTable(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const size_t entry_count, ColumnCacheMap &column_cache, Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs, const int device_count)
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void hll_unify(T1 *lhs, T2 *rhs, const size_t m)
Definition: HyperLogLog.h:109
unsigned long long CUdeviceptr
Definition: nocuda.h:27
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::pair< ssize_t, size_t > getApproximateTupleCountFromCache(const HashTableCacheKey &) const
size_t getKeyBufferSize() const noexcept
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
Definition: Logger.h:315
JoinHashTableInterface::HashType layout_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
llvm::Type * get_int_type(const int width, llvm::LLVMContext &context)
size_t hll_size(const T *M, const size_t bitmap_sz_bits)
Definition: HyperLogLog.h:90
#define CHECK_GT(x, y)
Definition: Logger.h:209
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query&#39;s parse tree etc.
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
int getInnerTableRteIdx() const noexceptoverride
std::string to_string(char const *&&v)
const HashTableCacheValue * findHashTableOnCpuInCache(const HashTableCacheKey &)
const std::vector< InputTableInfo > & query_infos_
virtual llvm::Value * codegenKey(const CompilationOptions &)
size_t payloadBufferOff() const noexceptoverride
std::vector< InnerOuter > inner_outer_pairs_
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
virtual void reifyWithLayout(const JoinHashTableInterface::HashType layout)
void reifyForDevice(const ColumnsForDevice &columns_for_device, const JoinHashTableInterface::HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
static const int ERR_FAILED_TO_FETCH_COLUMN
CHECK(cgen_state)
void copy_to_gpu(Data_Namespace::DataMgr *data_mgr, CUdeviceptr dst, const void *src, const size_t num_bytes, const int device_id)
Definition: GpuMemUtils.cpp:30
CompositeKeyInfo getCompositeKeyInfo() const
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:149
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
ColumnCacheMap & column_cache_
JoinHashTableInterface::HashType getHashType() const noexceptoverride
#define LL_INT(v)
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
T * transfer_flat_object_to_gpu(const T &object, CudaAllocator &allocator)
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
virtual std::pair< size_t, size_t > approximateTupleCount(const std::vector< ColumnsForDevice > &) const
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
const std::vector< JoinColumnTypeInfo > join_column_types
#define LL_CONTEXT
int initHashTableForDevice(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_buckets, const JoinHashTableInterface::HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
#define VLOGGING(n)
Definition: Logger.h:195
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
std::vector< llvm::Value * > codegen(const Analyzer::Expr *, const bool fetch_columns, const CompilationOptions &)
Definition: IRCodegen.cpp:26
#define CHECK_LT(x, y)
Definition: Logger.h:207
int getInnerTableId() const noexceptoverride
int8_t * alloc(const size_t num_bytes) override
#define LL_BUILDER
static const int ERR_FAILED_TO_JOIN_ON_VIRTUAL_COLUMN
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const GenericKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
bool table_is_replicated(const TableDescriptor *td)
void checkHashJoinReplicationConstraint(const int table_id) const
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
static void freeGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, Data_Namespace::AbstractBuffer *ab)
const Catalog_Namespace::Catalog * catalog_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
const std::vector< JoinColumn > join_columns
virtual DecodedJoinHashBufferSet toSet(const ExecutorDeviceType device_type, const int device_id) const =0
std::vector< JoinBucketInfo > join_buckets
uint64_t ThreadId
Definition: Logger.h:306
static std::mutex hash_table_cache_mutex_
void init_hash_join_buff_on_device(int32_t *buff, const int32_t entry_count, const int32_t invalid_slot_val, const size_t block_size_x, const size_t grid_size_x)
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
const Data_Namespace::MemoryLevel memory_level_
virtual int initHashTableOnCpu(const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const JoinHashTableInterface::HashType layout)
llvm::Value * hashPtr(const size_t index)
ThreadId thread_id()
Definition: Logger.cpp:715
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
void initHashTableOnCpuFromCache(const HashTableCacheKey &)
T * transfer_vector_of_flat_objects_to_gpu(const std::vector< T > &vec, CudaAllocator &allocator)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
Definition: sqldefs.h:31
virtual size_t getKeyComponentCount() const
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
static std::vector< std::pair< HashTableCacheKey, HashTableCacheValue > > hash_table_cache_
bool g_cluster
virtual size_t getKeyComponentWidth() const
static std::string getHashTypeString(HashType ht) noexcept
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type, const int device_id) const noexceptoverride
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
Allocate GPU memory using GpuBuffers via DataMgr.
static size_t getShardCountForCondition(const Analyzer::BinOper *condition, const Executor *executor, const std::vector< InnerOuter > &inner_outer_pairs)
int cpu_threads()
Definition: thread_count.h:25
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
virtual std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const =0
#define VLOG(n)
Definition: Logger.h:291
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const std::shared_ptr< Analyzer::BinOper > condition_
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static std::pair< JoinHashTableInterface::HashType, bool > get(const std::vector< ChunkKey > &key)
size_t countBufferOff() const noexceptoverride