OmniSciDB  8a228a1076
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
19 #include "../../Shared/shard_key.h"
20 #include "../CompareKeysInl.h"
21 #include "../HyperLogLogRank.h"
22 #include "../JoinHashTable/HashJoinKeyHandlers.h"
23 #include "../JoinHashTable/JoinColumnIterator.h"
24 #include "../MurmurHash1Inl.h"
25 #ifdef __CUDACC__
26 #include "../DecodersImpl.h"
27 #include "../GpuRtConstants.h"
28 #include "../JoinHashTable/JoinHashImpl.h"
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #include <future>
38 #endif
39 
40 #if HAVE_CUDA
41 #include <thrust/scan.h>
42 #endif
43 #include "../../Shared/funcannotations.h"
44 
45 #include <cmath>
46 #include <numeric>
47 
48 #ifndef __CUDACC__
49 namespace {
50 
71 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
72  const int64_t min_elem,
73  const int64_t max_elem,
74  const void* sd_inner_proxy,
75  const void* sd_outer_proxy) {
76  CHECK(sd_outer_proxy);
77  const auto sd_inner_dict_proxy =
78  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
79  const auto sd_outer_dict_proxy =
80  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
81  const auto elem_str = sd_inner_dict_proxy->getString(elem);
82  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
83  if (outer_id > max_elem || outer_id < min_elem) {
85  }
86  return outer_id;
87 }
88 
89 } // namespace
90 #endif
91 
92 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
93  const int32_t hash_entry_count,
94  const int32_t invalid_slot_val,
95  const int32_t cpu_thread_idx,
96  const int32_t cpu_thread_count) {
97 #ifdef __CUDACC__
98  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
99  int32_t step = blockDim.x * gridDim.x;
100 #else
101  int32_t start = cpu_thread_idx;
102  int32_t step = cpu_thread_count;
103 #endif
104  for (int32_t i = start; i < hash_entry_count; i += step) {
105  groups_buffer[i] = invalid_slot_val;
106  }
107 }
108 
109 #ifdef __CUDACC__
110 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
111 #else
112 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
113 #endif
114 
115 template <typename SLOT_SELECTOR>
116 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
117  const int32_t invalid_slot_val,
118  const JoinColumn join_column,
119  const JoinColumnTypeInfo type_info,
120  const void* sd_inner_proxy,
121  const void* sd_outer_proxy,
122  const int32_t cpu_thread_idx,
123  const int32_t cpu_thread_count,
124  SLOT_SELECTOR slot_sel) {
125 #ifdef __CUDACC__
126  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
127  int32_t step = blockDim.x * gridDim.x;
128 #else
129  int32_t start = cpu_thread_idx;
130  int32_t step = cpu_thread_count;
131 #endif
132  JoinColumnTyped col{&join_column, &type_info};
133  for (auto item : col.slice(start, step)) {
134  const size_t index = item.index;
135  int64_t elem = item.element;
136  if (elem == type_info.null_val) {
137  if (type_info.uses_bw_eq) {
138  elem = type_info.translated_null_val;
139  } else {
140  continue;
141  }
142  }
143 #ifndef __CUDACC__
144  if (sd_inner_proxy &&
145  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
146  const auto outer_id = translate_str_id_to_outer_dict(
147  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
148  if (outer_id == StringDictionary::INVALID_STR_ID) {
149  continue;
150  }
151  elem = outer_id;
152  }
153  CHECK_GE(elem, type_info.min_val)
154  << "Element " << elem << " less than min val " << type_info.min_val;
155 #endif
156  int32_t* entry_ptr = slot_sel(elem);
157  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
158  return -1;
159  }
160  }
161  return 0;
162 };
163 
165  const int32_t invalid_slot_val,
166  const JoinColumn join_column,
167  const JoinColumnTypeInfo type_info,
168  const void* sd_inner_proxy,
169  const void* sd_outer_proxy,
170  const int32_t cpu_thread_idx,
171  const int32_t cpu_thread_count,
172  const int64_t bucket_normalization) {
173  auto slot_selector = [&](auto elem) {
175  buff, elem, type_info.min_val, bucket_normalization);
176  };
177  return fill_hash_join_buff_impl(buff,
178  invalid_slot_val,
179  join_column,
180  type_info,
181  sd_inner_proxy,
182  sd_outer_proxy,
183  cpu_thread_idx,
184  cpu_thread_count,
185  slot_selector);
186 }
187 
188 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
189  const int32_t invalid_slot_val,
190  const JoinColumn join_column,
191  const JoinColumnTypeInfo type_info,
192  const void* sd_inner_proxy,
193  const void* sd_outer_proxy,
194  const int32_t cpu_thread_idx,
195  const int32_t cpu_thread_count) {
196  auto slot_selector = [&](auto elem) {
197  return SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
198  };
199  return fill_hash_join_buff_impl(buff,
200  invalid_slot_val,
201  join_column,
202  type_info,
203  sd_inner_proxy,
204  sd_outer_proxy,
205  cpu_thread_idx,
206  cpu_thread_count,
207  slot_selector);
208 }
209 
210 template <typename SLOT_SELECTOR>
212  const int32_t invalid_slot_val,
213  const JoinColumn join_column,
214  const JoinColumnTypeInfo type_info,
215  const ShardInfo shard_info,
216  const void* sd_inner_proxy,
217  const void* sd_outer_proxy,
218  const int32_t cpu_thread_idx,
219  const int32_t cpu_thread_count,
220  SLOT_SELECTOR slot_sel) {
221 #ifdef __CUDACC__
222  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
223  int32_t step = blockDim.x * gridDim.x;
224 #else
225  int32_t start = cpu_thread_idx;
226  int32_t step = cpu_thread_count;
227 #endif
228  JoinColumnTyped col{&join_column, &type_info};
229  for (auto item : col.slice(start, step)) {
230  const size_t index = item.index;
231  int64_t elem = item.element;
232  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
233  if (shard != shard_info.shard) {
234  continue;
235  }
236  if (elem == type_info.null_val) {
237  if (type_info.uses_bw_eq) {
238  elem = type_info.translated_null_val;
239  } else {
240  continue;
241  }
242  }
243 #ifndef __CUDACC__
244  if (sd_inner_proxy &&
245  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
246  const auto outer_id = translate_str_id_to_outer_dict(
247  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
248  if (outer_id == StringDictionary::INVALID_STR_ID) {
249  continue;
250  }
251  elem = outer_id;
252  }
253  CHECK_GE(elem, type_info.min_val)
254  << "Element " << elem << " less than min val " << type_info.min_val;
255 #endif
256  int32_t* entry_ptr = slot_sel(elem, shard);
257  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
258  return -1;
259  }
260  }
261  return 0;
262 }
263 
265  int32_t* buff,
266  const int32_t invalid_slot_val,
267  const JoinColumn join_column,
268  const JoinColumnTypeInfo type_info,
269  const ShardInfo shard_info,
270  const void* sd_inner_proxy,
271  const void* sd_outer_proxy,
272  const int32_t cpu_thread_idx,
273  const int32_t cpu_thread_count,
274  const int64_t bucket_normalization) {
275  auto slot_selector = [&](auto elem, auto shard) -> auto {
277  elem,
278  type_info.min_val,
279  shard_info.entry_count_per_shard,
280  shard,
281  shard_info.num_shards,
282  shard_info.device_count,
283  bucket_normalization);
284  };
285 
287  invalid_slot_val,
288  join_column,
289  type_info,
290  shard_info,
291  sd_inner_proxy,
292  sd_outer_proxy,
293  cpu_thread_idx,
294  cpu_thread_count,
295  slot_selector);
296 }
297 
299  const int32_t invalid_slot_val,
300  const JoinColumn join_column,
301  const JoinColumnTypeInfo type_info,
302  const ShardInfo shard_info,
303  const void* sd_inner_proxy,
304  const void* sd_outer_proxy,
305  const int32_t cpu_thread_idx,
306  const int32_t cpu_thread_count) {
307  auto slot_selector = [&](auto elem, auto shard) {
308  return SUFFIX(get_hash_slot_sharded_opt)(buff,
309  elem,
310  type_info.min_val,
311  shard_info.entry_count_per_shard,
312  shard,
313  shard_info.num_shards,
314  shard_info.device_count);
315  };
317  invalid_slot_val,
318  join_column,
319  type_info,
320  shard_info,
321  sd_inner_proxy,
322  sd_outer_proxy,
323  cpu_thread_idx,
324  cpu_thread_count,
325  slot_selector);
326 }
327 
328 template <typename T>
330  const size_t entry_count,
331  const size_t key_component_count,
332  const bool with_val_slot,
333  const int32_t invalid_slot_val,
334  const int32_t cpu_thread_idx,
335  const int32_t cpu_thread_count) {
336 #ifdef __CUDACC__
337  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
338  int32_t step = blockDim.x * gridDim.x;
339 #else
340  int32_t start = cpu_thread_idx;
341  int32_t step = cpu_thread_count;
342 #endif
343  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
344  const T empty_key = SUFFIX(get_invalid_key)<T>();
345  for (uint32_t h = start; h < entry_count; h += step) {
346  uint32_t off = h * hash_entry_size;
347  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
348  for (size_t i = 0; i < key_component_count; ++i) {
349  row_ptr[i] = empty_key;
350  }
351  if (with_val_slot) {
352  row_ptr[key_component_count] = invalid_slot_val;
353  }
354  }
355 }
356 
357 #ifdef __CUDACC__
358 template <typename T>
359 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
360  const uint32_t h,
361  const T* key,
362  const size_t key_component_count,
363  const size_t hash_entry_size) {
364  uint32_t off = h * hash_entry_size;
365  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
366  const T empty_key = SUFFIX(get_invalid_key)<T>();
367  {
368  const T old = atomicCAS(row_ptr, empty_key, *key);
369  if (empty_key == old && key_component_count > 1) {
370  for (size_t i = 1; i <= key_component_count - 1; ++i) {
371  atomicExch(row_ptr + i, key[i]);
372  }
373  }
374  }
375  if (key_component_count > 1) {
376  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
377  // spin until the winning thread has finished writing the entire key and the init
378  // value
379  }
380  }
381  bool match = true;
382  for (uint32_t i = 0; i < key_component_count; ++i) {
383  if (row_ptr[i] != key[i]) {
384  match = false;
385  break;
386  }
387  }
388 
389  if (match) {
390  return reinterpret_cast<T*>(row_ptr + key_component_count);
391  }
392  return nullptr;
393 }
394 #else
395 
396 #define cas_cst(ptr, expected, desired) \
397  __atomic_compare_exchange_n( \
398  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
399 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
400 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
401 
402 template <typename T>
404  const uint32_t h,
405  const T* key,
406  const size_t key_component_count,
407  const size_t hash_entry_size) {
408  uint32_t off = h * hash_entry_size;
409  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
410  T empty_key = SUFFIX(get_invalid_key)<T>();
411  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
412  if (UNLIKELY(*key == write_pending)) {
413  // Address the singularity case where the first column contains the pending
414  // write special value. Should never happen, but avoid doing wrong things.
415  return nullptr;
416  }
417  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
418  if (success) {
419  if (key_component_count > 1) {
420  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
421  }
422  store_cst(row_ptr, *key);
423  return reinterpret_cast<T*>(row_ptr + key_component_count);
424  }
425  while (load_cst(row_ptr) == write_pending) {
426  // spin until the winning thread has finished writing the entire key
427  }
428  for (size_t i = 0; i < key_component_count; ++i) {
429  if (load_cst(row_ptr + i) != key[i]) {
430  return nullptr;
431  }
432  }
433  return reinterpret_cast<T*>(row_ptr + key_component_count);
434 }
435 
436 #undef load_cst
437 #undef store_cst
438 #undef cas_cst
439 
440 #endif // __CUDACC__
441 
442 template <typename T>
443 DEVICE int write_baseline_hash_slot(const int32_t val,
444  int8_t* hash_buff,
445  const size_t entry_count,
446  const T* key,
447  const size_t key_component_count,
448  const bool with_val_slot,
449  const int32_t invalid_slot_val,
450  const size_t key_size_in_bytes,
451  const size_t hash_entry_size) {
452  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
453  T* matching_group = get_matching_baseline_hash_slot_at(
454  hash_buff, h, key, key_component_count, hash_entry_size);
455  if (!matching_group) {
456  uint32_t h_probe = (h + 1) % entry_count;
457  while (h_probe != h) {
458  matching_group = get_matching_baseline_hash_slot_at(
459  hash_buff, h_probe, key, key_component_count, hash_entry_size);
460  if (matching_group) {
461  break;
462  }
463  h_probe = (h_probe + 1) % entry_count;
464  }
465  }
466  if (!matching_group) {
467  return -2;
468  }
469  if (!with_val_slot) {
470  return 0;
471  }
472  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
473  return -1;
474  }
475  return 0;
476 }
477 
478 template <typename T, typename FILL_HANDLER>
480  const size_t entry_count,
481  const int32_t invalid_slot_val,
482  const size_t key_component_count,
483  const bool with_val_slot,
484  const FILL_HANDLER* f,
485  const size_t num_elems,
486  const int32_t cpu_thread_idx,
487  const int32_t cpu_thread_count) {
488 #ifdef __CUDACC__
489  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
490  int32_t step = blockDim.x * gridDim.x;
491 #else
492  int32_t start = cpu_thread_idx;
493  int32_t step = cpu_thread_count;
494 #endif
495 
496  T key_scratch_buff[g_maximum_conditions_to_coalesce];
497  const size_t key_size_in_bytes = key_component_count * sizeof(T);
498  const size_t hash_entry_size =
499  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
500  auto key_buff_handler = [hash_buff,
501  entry_count,
502  with_val_slot,
503  invalid_slot_val,
504  key_size_in_bytes,
505  hash_entry_size](const size_t entry_idx,
506  const T* key_scratch_buffer,
507  const size_t key_component_count) {
508  return write_baseline_hash_slot<T>(entry_idx,
509  hash_buff,
510  entry_count,
511  key_scratch_buffer,
512  key_component_count,
513  with_val_slot,
514  invalid_slot_val,
515  key_size_in_bytes,
516  hash_entry_size);
517  };
518 
519  JoinColumnTuple cols(
520  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
521  for (auto& it : cols.slice(start, step)) {
522  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
523  if (err) {
524  return err;
525  }
526  }
527  return 0;
528 }
529 
530 #undef mapd_cas
531 
532 #ifdef __CUDACC__
533 #define mapd_add(address, val) atomicAdd(address, val)
534 #else
535 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
536 #endif
537 
538 template <typename SLOT_SELECTOR>
539 DEVICE void count_matches_impl(int32_t* count_buff,
540  const int32_t invalid_slot_val,
541  const JoinColumn join_column,
542  const JoinColumnTypeInfo type_info
543 #ifndef __CUDACC__
544  ,
545  const void* sd_inner_proxy,
546  const void* sd_outer_proxy,
547  const int32_t cpu_thread_idx,
548  const int32_t cpu_thread_count
549 #endif
550  ,
551  SLOT_SELECTOR slot_selector) {
552 #ifdef __CUDACC__
553  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
554  int32_t step = blockDim.x * gridDim.x;
555 #else
556  int32_t start = cpu_thread_idx;
557  int32_t step = cpu_thread_count;
558 #endif
559  JoinColumnTyped col{&join_column, &type_info};
560  for (auto item : col.slice(start, step)) {
561  int64_t elem = item.element;
562  if (elem == type_info.null_val) {
563  if (type_info.uses_bw_eq) {
564  elem = type_info.translated_null_val;
565  } else {
566  continue;
567  }
568  }
569 #ifndef __CUDACC__
570  if (sd_inner_proxy &&
571  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
572  const auto outer_id = translate_str_id_to_outer_dict(
573  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
574  if (outer_id == StringDictionary::INVALID_STR_ID) {
575  continue;
576  }
577  elem = outer_id;
578  }
579  CHECK_GE(elem, type_info.min_val)
580  << "Element " << elem << " less than min val " << type_info.min_val;
581 #endif
582  auto* entry_ptr = slot_selector(count_buff, elem);
583  mapd_add(entry_ptr, int32_t(1));
584  }
585 }
586 
587 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
588  const int32_t invalid_slot_val,
589  const JoinColumn join_column,
590  const JoinColumnTypeInfo type_info
591 #ifndef __CUDACC__
592  ,
593  const void* sd_inner_proxy,
594  const void* sd_outer_proxy,
595  const int32_t cpu_thread_idx,
596  const int32_t cpu_thread_count
597 #endif
598 ) {
599  auto slot_sel = [&type_info](auto count_buff, auto elem) {
600  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
601  };
602  count_matches_impl(count_buff,
603  invalid_slot_val,
604  join_column,
605  type_info
606 #ifndef __CUDACC__
607  ,
608  sd_inner_proxy,
609  sd_outer_proxy,
610  cpu_thread_idx,
611  cpu_thread_count
612 #endif
613  ,
614  slot_sel);
615 }
616 
617 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
618  const int32_t invalid_slot_val,
619  const JoinColumn join_column,
620  const JoinColumnTypeInfo type_info
621 #ifndef __CUDACC__
622  ,
623  const void* sd_inner_proxy,
624  const void* sd_outer_proxy,
625  const int32_t cpu_thread_idx,
626  const int32_t cpu_thread_count
627 #endif
628  ,
629  const int64_t bucket_normalization) {
630  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
632  count_buff, elem, type_info.min_val, bucket_normalization);
633  };
634  count_matches_impl(count_buff,
635  invalid_slot_val,
636  join_column,
637  type_info
638 #ifndef __CUDACC__
639  ,
640  sd_inner_proxy,
641  sd_outer_proxy,
642  cpu_thread_idx,
643  cpu_thread_count
644 #endif
645  ,
646  slot_sel);
647 }
648 
649 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
650  const int32_t invalid_slot_val,
651  const JoinColumn join_column,
652  const JoinColumnTypeInfo type_info,
653  const ShardInfo shard_info
654 #ifndef __CUDACC__
655  ,
656  const void* sd_inner_proxy,
657  const void* sd_outer_proxy,
658  const int32_t cpu_thread_idx,
659  const int32_t cpu_thread_count
660 #endif
661 ) {
662 #ifdef __CUDACC__
663  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
664  int32_t step = blockDim.x * gridDim.x;
665 #else
666  int32_t start = cpu_thread_idx;
667  int32_t step = cpu_thread_count;
668 #endif
669  JoinColumnTyped col{&join_column, &type_info};
670  for (auto item : col.slice(start, step)) {
671  int64_t elem = item.element;
672  if (elem == type_info.null_val) {
673  if (type_info.uses_bw_eq) {
674  elem = type_info.translated_null_val;
675  } else {
676  continue;
677  }
678  }
679 #ifndef __CUDACC__
680  if (sd_inner_proxy &&
681  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
682  const auto outer_id = translate_str_id_to_outer_dict(
683  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
684  if (outer_id == StringDictionary::INVALID_STR_ID) {
685  continue;
686  }
687  elem = outer_id;
688  }
689  CHECK_GE(elem, type_info.min_val)
690  << "Element " << elem << " less than min val " << type_info.min_val;
691 #endif
692  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
693  elem,
694  type_info.min_val,
695  shard_info.entry_count_per_shard,
696  shard_info.num_shards,
697  shard_info.device_count);
698  mapd_add(entry_ptr, int32_t(1));
699  }
700 }
701 
702 template <typename T>
704  const T* key,
705  const size_t key_component_count,
706  const T* composite_key_dict,
707  const size_t entry_count,
708  const size_t key_size_in_bytes) {
709  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
710  uint32_t off = h * key_component_count;
711  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
712  return &composite_key_dict[off];
713  }
714  uint32_t h_probe = (h + 1) % entry_count;
715  while (h_probe != h) {
716  off = h_probe * key_component_count;
717  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
718  return &composite_key_dict[off];
719  }
720  h_probe = (h_probe + 1) % entry_count;
721  }
722 #ifndef __CUDACC__
723  CHECK(false);
724 #else
725  assert(false);
726 #endif
727  return nullptr;
728 }
729 
730 template <typename T, typename KEY_HANDLER>
731 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
732  const T* composite_key_dict,
733  const size_t entry_count,
734  const KEY_HANDLER* f,
735  const size_t num_elems
736 #ifndef __CUDACC__
737  ,
738  const int32_t cpu_thread_idx,
739  const int32_t cpu_thread_count
740 #endif
741 ) {
742 #ifdef __CUDACC__
743  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
744  int32_t step = blockDim.x * gridDim.x;
745 #else
746  int32_t start = cpu_thread_idx;
747  int32_t step = cpu_thread_count;
748 #endif
749 #ifdef __CUDACC__
750  assert(composite_key_dict);
751 #endif
752  T key_scratch_buff[g_maximum_conditions_to_coalesce];
753  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
754  auto key_buff_handler = [composite_key_dict,
755  entry_count,
756  count_buff,
757  key_size_in_bytes](const size_t row_entry_idx,
758  const T* key_scratch_buff,
759  const size_t key_component_count) {
760  const auto matching_group =
762  key_component_count,
763  composite_key_dict,
764  entry_count,
765  key_size_in_bytes);
766  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
767  mapd_add(&count_buff[entry_idx], int32_t(1));
768  return 0;
769  };
770 
771  JoinColumnTuple cols(
772  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
773  for (auto& it : cols.slice(start, step)) {
774  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
775  }
776 }
777 
778 template <typename SLOT_SELECTOR>
779 DEVICE void fill_row_ids_impl(int32_t* buff,
780  const int32_t hash_entry_count,
781  const int32_t invalid_slot_val,
782  const JoinColumn join_column,
783  const JoinColumnTypeInfo type_info
784 #ifndef __CUDACC__
785  ,
786  const void* sd_inner_proxy,
787  const void* sd_outer_proxy,
788  const int32_t cpu_thread_idx,
789  const int32_t cpu_thread_count
790 #endif
791  ,
792  SLOT_SELECTOR slot_selector) {
793  int32_t* pos_buff = buff;
794  int32_t* count_buff = buff + hash_entry_count;
795  int32_t* id_buff = count_buff + hash_entry_count;
796 
797 #ifdef __CUDACC__
798  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
799  int32_t step = blockDim.x * gridDim.x;
800 #else
801  int32_t start = cpu_thread_idx;
802  int32_t step = cpu_thread_count;
803 #endif
804  JoinColumnTyped col{&join_column, &type_info};
805  for (auto item : col.slice(start, step)) {
806  const size_t index = item.index;
807  int64_t elem = item.element;
808  if (elem == type_info.null_val) {
809  if (type_info.uses_bw_eq) {
810  elem = type_info.translated_null_val;
811  } else {
812  continue;
813  }
814  }
815 #ifndef __CUDACC__
816  if (sd_inner_proxy &&
817  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
818  const auto outer_id = translate_str_id_to_outer_dict(
819  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
820  if (outer_id == StringDictionary::INVALID_STR_ID) {
821  continue;
822  }
823  elem = outer_id;
824  }
825  CHECK_GE(elem, type_info.min_val)
826  << "Element " << elem << " less than min val " << type_info.min_val;
827 #endif
828  auto pos_ptr = slot_selector(pos_buff, elem);
829 #ifndef __CUDACC__
830  CHECK_NE(*pos_ptr, invalid_slot_val);
831 #endif
832  const auto bin_idx = pos_ptr - pos_buff;
833  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
834  id_buff[id_buff_idx] = static_cast<int32_t>(index);
835  }
836 }
837 
838 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
839  const int32_t hash_entry_count,
840  const int32_t invalid_slot_val,
841  const JoinColumn join_column,
842  const JoinColumnTypeInfo type_info
843 #ifndef __CUDACC__
844  ,
845  const void* sd_inner_proxy,
846  const void* sd_outer_proxy,
847  const int32_t cpu_thread_idx,
848  const int32_t cpu_thread_count
849 #endif
850 ) {
851  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
852  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
853  };
854 
855  fill_row_ids_impl(buff,
856  hash_entry_count,
857  invalid_slot_val,
858  join_column,
859  type_info
860 #ifndef __CUDACC__
861  ,
862  sd_inner_proxy,
863  sd_outer_proxy,
864  cpu_thread_idx,
865  cpu_thread_count
866 #endif
867  ,
868  slot_sel);
869 }
870 
872  const int32_t hash_entry_count,
873  const int32_t invalid_slot_val,
874  const JoinColumn join_column,
875  const JoinColumnTypeInfo type_info
876 #ifndef __CUDACC__
877  ,
878  const void* sd_inner_proxy,
879  const void* sd_outer_proxy,
880  const int32_t cpu_thread_idx,
881  const int32_t cpu_thread_count
882 #endif
883  ,
884  const int64_t bucket_normalization) {
885  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
887  pos_buff, elem, type_info.min_val, bucket_normalization);
888  };
889  fill_row_ids_impl(buff,
890  hash_entry_count,
891  invalid_slot_val,
892  join_column,
893  type_info
894 #ifndef __CUDACC__
895  ,
896  sd_inner_proxy,
897  sd_outer_proxy,
898  cpu_thread_idx,
899  cpu_thread_count
900 #endif
901  ,
902  slot_sel);
903 }
904 
905 template <typename SLOT_SELECTOR>
907  const int32_t hash_entry_count,
908  const int32_t invalid_slot_val,
909  const JoinColumn join_column,
910  const JoinColumnTypeInfo type_info,
911  const ShardInfo shard_info
912 #ifndef __CUDACC__
913  ,
914  const void* sd_inner_proxy,
915  const void* sd_outer_proxy,
916  const int32_t cpu_thread_idx,
917  const int32_t cpu_thread_count
918 #endif
919  ,
920  SLOT_SELECTOR slot_selector) {
921 
922  int32_t* pos_buff = buff;
923  int32_t* count_buff = buff + hash_entry_count;
924  int32_t* id_buff = count_buff + hash_entry_count;
925 
926 #ifdef __CUDACC__
927  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
928  int32_t step = blockDim.x * gridDim.x;
929 #else
930  int32_t start = cpu_thread_idx;
931  int32_t step = cpu_thread_count;
932 #endif
933  JoinColumnTyped col{&join_column, &type_info};
934  for (auto item : col.slice(start, step)) {
935  const size_t index = item.index;
936  int64_t elem = item.element;
937  if (elem == type_info.null_val) {
938  if (type_info.uses_bw_eq) {
939  elem = type_info.translated_null_val;
940  } else {
941  continue;
942  }
943  }
944 #ifndef __CUDACC__
945  if (sd_inner_proxy &&
946  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
947  const auto outer_id = translate_str_id_to_outer_dict(
948  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
949  if (outer_id == StringDictionary::INVALID_STR_ID) {
950  continue;
951  }
952  elem = outer_id;
953  }
954  CHECK_GE(elem, type_info.min_val)
955  << "Element " << elem << " less than min val " << type_info.min_val;
956 #endif
957  auto* pos_ptr = slot_selector(pos_buff, elem);
958 #ifndef __CUDACC__
959  CHECK_NE(*pos_ptr, invalid_slot_val);
960 #endif
961  const auto bin_idx = pos_ptr - pos_buff;
962  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
963  id_buff[id_buff_idx] = static_cast<int32_t>(index);
964  }
965 }
966 
967 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
968  const int32_t hash_entry_count,
969  const int32_t invalid_slot_val,
970  const JoinColumn join_column,
971  const JoinColumnTypeInfo type_info,
972  const ShardInfo shard_info
973 #ifndef __CUDACC__
974  ,
975  const void* sd_inner_proxy,
976  const void* sd_outer_proxy,
977  const int32_t cpu_thread_idx,
978  const int32_t cpu_thread_count
979 #endif
980 ) {
981  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
982  return SUFFIX(get_hash_slot_sharded)(pos_buff,
983  elem,
984  type_info.min_val,
985  shard_info.entry_count_per_shard,
986  shard_info.num_shards,
987  shard_info.device_count);
988  };
989 
990  fill_row_ids_impl(buff,
991  hash_entry_count,
992  invalid_slot_val,
993  join_column,
994  type_info
995 #ifndef __CUDACC__
996  ,
997  sd_inner_proxy,
998  sd_outer_proxy,
999  cpu_thread_idx,
1000  cpu_thread_count
1001 #endif
1002  ,
1003  slot_sel);
1004 }
1005 
1007  const int32_t hash_entry_count,
1008  const int32_t invalid_slot_val,
1009  const JoinColumn join_column,
1010  const JoinColumnTypeInfo type_info,
1011  const ShardInfo shard_info
1012 #ifndef __CUDACC__
1013  ,
1014  const void* sd_inner_proxy,
1015  const void* sd_outer_proxy,
1016  const int32_t cpu_thread_idx,
1017  const int32_t cpu_thread_count
1018 #endif
1019  ,
1020  const int64_t bucket_normalization) {
1021  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1022  auto elem) {
1023  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1024  elem,
1025  type_info.min_val,
1026  shard_info.entry_count_per_shard,
1027  shard_info.num_shards,
1028  shard_info.device_count,
1029  bucket_normalization);
1030  };
1031 
1032  fill_row_ids_impl(buff,
1033  hash_entry_count,
1034  invalid_slot_val,
1035  join_column,
1036  type_info
1037 #ifndef __CUDACC__
1038  ,
1039  sd_inner_proxy,
1040  sd_outer_proxy,
1041  cpu_thread_idx,
1042  cpu_thread_count
1043 #endif
1044  ,
1045  slot_sel);
1046 }
1047 
1048 template <typename T, typename KEY_HANDLER>
1050  const T* composite_key_dict,
1051  const size_t hash_entry_count,
1052  const int32_t invalid_slot_val,
1053  const KEY_HANDLER* f,
1054  const size_t num_elems
1055 #ifndef __CUDACC__
1056  ,
1057  const int32_t cpu_thread_idx,
1058  const int32_t cpu_thread_count
1059 #endif
1060 ) {
1061  int32_t* pos_buff = buff;
1062  int32_t* count_buff = buff + hash_entry_count;
1063  int32_t* id_buff = count_buff + hash_entry_count;
1064 #ifdef __CUDACC__
1065  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1066  int32_t step = blockDim.x * gridDim.x;
1067 #else
1068  int32_t start = cpu_thread_idx;
1069  int32_t step = cpu_thread_count;
1070 #endif
1071 
1072  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1073 #ifdef __CUDACC__
1074  assert(composite_key_dict);
1075 #endif
1076  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1077  auto key_buff_handler = [composite_key_dict,
1078  hash_entry_count,
1079  pos_buff,
1080  invalid_slot_val,
1081  count_buff,
1082  id_buff,
1083  key_size_in_bytes](const size_t row_index,
1084  const T* key_scratch_buff,
1085  const size_t key_component_count) {
1086  const T* matching_group =
1088  key_component_count,
1089  composite_key_dict,
1090  hash_entry_count,
1091  key_size_in_bytes);
1092  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1093  int32_t* pos_ptr = pos_buff + entry_idx;
1094 #ifndef __CUDACC__
1095  CHECK_NE(*pos_ptr, invalid_slot_val);
1096 #endif
1097  const auto bin_idx = pos_ptr - pos_buff;
1098  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1099  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1100  return 0;
1101  };
1102 
1103  JoinColumnTuple cols(
1104  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1105  for (auto& it : cols.slice(start, step)) {
1106  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1107  }
1108  return;
1109 }
1110 
1111 #undef mapd_add
1112 
1113 template <typename KEY_HANDLER>
1115  int32_t* row_count_buffer,
1116  const uint32_t b,
1117  const size_t num_elems,
1118  const KEY_HANDLER* f
1119 #ifndef __CUDACC__
1120  ,
1121  const int32_t cpu_thread_idx,
1122  const int32_t cpu_thread_count
1123 #endif
1124 ) {
1125 #ifdef __CUDACC__
1126  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1127  int32_t step = blockDim.x * gridDim.x;
1128 #else
1129  int32_t start = cpu_thread_idx;
1130  int32_t step = cpu_thread_count;
1131 #endif
1132 
1133  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1134  const size_t entry_idx,
1135  const int64_t* key_scratch_buff,
1136  const size_t key_component_count) {
1137  if (row_count_buffer) {
1138  row_count_buffer[entry_idx] += 1;
1139  }
1140 
1141  const uint64_t hash =
1142  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1143  const uint32_t index = hash >> (64 - b);
1144  const auto rank = get_rank(hash << b, 64 - b);
1145 #ifdef __CUDACC__
1146  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1147 #else
1148  hll_buffer[index] = std::max(hll_buffer[index], rank);
1149 #endif
1150 
1151  return 0;
1152  };
1153 
1154  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1155 
1156  JoinColumnTuple cols(
1157  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1158  for (auto& it : cols.slice(start, step)) {
1159  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1160  }
1161 }
1162 
1163 #ifdef __CUDACC__
1164 namespace {
1165 // TODO(adb): put these in a header file so they are not duplicated between here and
1166 // cuda_mapd_rt.cu
1167 __device__ double atomicMin(double* address, double val) {
1168  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1169  unsigned long long int old = *address_as_ull, assumed;
1170 
1171  do {
1172  assumed = old;
1173  old = atomicCAS(address_as_ull,
1174  assumed,
1175  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1176  } while (assumed != old);
1177 
1178  return __longlong_as_double(old);
1179 }
1180 } // namespace
1181 #endif
1182 
1183 template <size_t N>
1184 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1185  const JoinColumn* join_column,
1186  const JoinColumnTypeInfo* type_info,
1187  const double bucket_sz_threshold,
1188  const int32_t cpu_thread_idx,
1189  const int32_t cpu_thread_count) {
1190 #ifdef __CUDACC__
1191  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1192  int32_t step = blockDim.x * gridDim.x;
1193 #else
1194  int32_t start = cpu_thread_idx;
1195  int32_t step = cpu_thread_count;
1196 #endif
1197  JoinColumnIterator it(join_column, type_info, start, step);
1198  for (; it; ++it) {
1199  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1200  double bounds[2 * N];
1201  for (size_t j = 0; j < 2 * N; j++) {
1202  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1203  }
1204 
1205  for (size_t j = 0; j < N; j++) {
1206  const auto diff = bounds[j + N] - bounds[j];
1207 #ifdef __CUDACC__
1208  if (diff > bucket_sz_threshold) {
1209  atomicMin(&bucket_sizes_for_thread[j], diff);
1210  }
1211 #else
1212  if (diff > bucket_sz_threshold && diff < bucket_sizes_for_thread[j]) {
1213  bucket_sizes_for_thread[j] = diff;
1214  }
1215 #endif
1216  }
1217  }
1218 }
1219 
1220 #ifndef __CUDACC__
1221 
1222 template <typename InputIterator, typename OutputIterator>
1223 void inclusive_scan(InputIterator first,
1224  InputIterator last,
1225  OutputIterator out,
1226  const size_t thread_count) {
1227  using ElementType = typename InputIterator::value_type;
1228  using OffsetType = typename InputIterator::difference_type;
1229  const OffsetType elem_count = last - first;
1230  if (elem_count < 10000 || thread_count <= 1) {
1231  ElementType sum = 0;
1232  for (auto iter = first; iter != last; ++iter, ++out) {
1233  *out = sum += *iter;
1234  }
1235  return;
1236  }
1237 
1238  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1239  OffsetType start_off = 0;
1240  OffsetType end_off = std::min(step, elem_count);
1241  std::vector<ElementType> partial_sums(thread_count);
1242  std::vector<std::future<void>> counter_threads;
1243  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1244  start_off = std::min(start_off + step, elem_count),
1245  end_off = std::min(start_off + step, elem_count)) {
1246  counter_threads.push_back(std::async(
1247  std::launch::async,
1248  [first, out](
1249  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1250  ElementType sum = 0;
1251  for (auto in_iter = first + start, out_iter = out + start;
1252  in_iter != (first + end);
1253  ++in_iter, ++out_iter) {
1254  *out_iter = sum += *in_iter;
1255  }
1256  partial_sum = sum;
1257  },
1258  std::ref(partial_sums[thread_idx]),
1259  start_off,
1260  end_off));
1261  }
1262  for (auto& child : counter_threads) {
1263  child.get();
1264  }
1265 
1266  ElementType sum = 0;
1267  for (auto& s : partial_sums) {
1268  s += sum;
1269  sum = s;
1270  }
1271 
1272  counter_threads.clear();
1273  start_off = std::min(step, elem_count);
1274  end_off = std::min(start_off + step, elem_count);
1275  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1276  start_off = std::min(start_off + step, elem_count),
1277  end_off = std::min(start_off + step, elem_count)) {
1278  counter_threads.push_back(std::async(
1279  std::launch::async,
1280  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1281  for (auto iter = out + start; iter != (out + end); ++iter) {
1282  *iter += prev_sum;
1283  }
1284  },
1285  partial_sums[thread_idx],
1286  start_off,
1287  end_off));
1288  }
1289  for (auto& child : counter_threads) {
1290  child.get();
1291  }
1292 }
1293 
1294 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1296  const int32_t hash_entry_count,
1297  const int32_t invalid_slot_val,
1298  const JoinColumn& join_column,
1299  const JoinColumnTypeInfo& type_info,
1300  const void* sd_inner_proxy,
1301  const void* sd_outer_proxy,
1302  const unsigned cpu_thread_count,
1303  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1304  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1305  int32_t* pos_buff = buff;
1306  int32_t* count_buff = buff + hash_entry_count;
1307  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1308  std::vector<std::future<void>> counter_threads;
1309  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1310  counter_threads.push_back(std::async(
1311  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1312  }
1313 
1314  for (auto& child : counter_threads) {
1315  child.get();
1316  }
1317 
1318  std::vector<int32_t> count_copy(hash_entry_count, 0);
1319  CHECK_GT(hash_entry_count, int32_t(0));
1320  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1321 #if HAVE_CUDA
1322  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1323 #else
1325  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1326 #endif
1327  std::vector<std::future<void>> pos_threads;
1328  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1329  pos_threads.push_back(std::async(
1330  std::launch::async,
1331  [&](size_t thread_idx) {
1332  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1333  i += cpu_thread_count) {
1334  if (count_buff[i]) {
1335  pos_buff[i] = count_copy[i];
1336  }
1337  }
1338  },
1339  cpu_thread_idx));
1340  }
1341  for (auto& child : pos_threads) {
1342  child.get();
1343  }
1344 
1345  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1346  std::vector<std::future<void>> rowid_threads;
1347  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1348  rowid_threads.push_back(std::async(
1349  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1350  }
1351 
1352  for (auto& child : rowid_threads) {
1353  child.get();
1354  }
1355 }
1356 
1357 void fill_one_to_many_hash_table(int32_t* buff,
1358  const HashEntryInfo hash_entry_info,
1359  const int32_t invalid_slot_val,
1360  const JoinColumn& join_column,
1361  const JoinColumnTypeInfo& type_info,
1362  const void* sd_inner_proxy,
1363  const void* sd_outer_proxy,
1364  const unsigned cpu_thread_count) {
1365  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1366  invalid_slot_val,
1367  &join_column,
1368  &type_info,
1369  sd_inner_proxy,
1370  sd_outer_proxy](auto cpu_thread_idx,
1371  auto cpu_thread_count) {
1373  (count_buff,
1374  invalid_slot_val,
1375  join_column,
1376  type_info,
1377  sd_inner_proxy,
1378  sd_outer_proxy,
1379  cpu_thread_idx,
1380  cpu_thread_count);
1381  };
1382  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1383  buff,
1384  invalid_slot_val,
1385  &join_column,
1386  &type_info,
1387  sd_inner_proxy,
1388  sd_outer_proxy](auto cpu_thread_idx,
1389  auto cpu_thread_count) {
1391  (buff,
1392  hash_entry_count,
1393  invalid_slot_val,
1394  join_column,
1395  type_info,
1396  sd_inner_proxy,
1397  sd_outer_proxy,
1398  cpu_thread_idx,
1399  cpu_thread_count);
1400  };
1401 
1403  hash_entry_info.hash_entry_count,
1404  invalid_slot_val,
1405  join_column,
1406  type_info,
1407  sd_inner_proxy,
1408  sd_outer_proxy,
1409  cpu_thread_count,
1410  launch_count_matches,
1411  launch_fill_row_ids);
1412 }
1413 
1415  const HashEntryInfo hash_entry_info,
1416  const int32_t invalid_slot_val,
1417  const JoinColumn& join_column,
1418  const JoinColumnTypeInfo& type_info,
1419  const void* sd_inner_proxy,
1420  const void* sd_outer_proxy,
1421  const unsigned cpu_thread_count) {
1422  auto bucket_normalization = hash_entry_info.bucket_normalization;
1423  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1424  auto launch_count_matches = [bucket_normalization,
1425  count_buff = buff + hash_entry_count,
1426  invalid_slot_val,
1427  &join_column,
1428  &type_info,
1429  sd_inner_proxy,
1430  sd_outer_proxy](auto cpu_thread_idx,
1431  auto cpu_thread_count) {
1433  (count_buff,
1434  invalid_slot_val,
1435  join_column,
1436  type_info,
1437  sd_inner_proxy,
1438  sd_outer_proxy,
1439  cpu_thread_idx,
1440  cpu_thread_count,
1441  bucket_normalization);
1442  };
1443  auto launch_fill_row_ids = [bucket_normalization,
1444  hash_entry_count,
1445  buff,
1446  invalid_slot_val,
1447  &join_column,
1448  &type_info,
1449  sd_inner_proxy,
1450  sd_outer_proxy](auto cpu_thread_idx,
1451  auto cpu_thread_count) {
1453  (buff,
1454  hash_entry_count,
1455  invalid_slot_val,
1456  join_column,
1457  type_info,
1458  sd_inner_proxy,
1459  sd_outer_proxy,
1460  cpu_thread_idx,
1461  cpu_thread_count,
1462  bucket_normalization);
1463  };
1464 
1466  hash_entry_count,
1467  invalid_slot_val,
1468  join_column,
1469  type_info,
1470  sd_inner_proxy,
1471  sd_outer_proxy,
1472  cpu_thread_count,
1473  launch_count_matches,
1474  launch_fill_row_ids);
1475 }
1476 
1477 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1479  int32_t* buff,
1480  const int32_t hash_entry_count,
1481  const int32_t invalid_slot_val,
1482  const JoinColumn& join_column,
1483  const JoinColumnTypeInfo& type_info,
1484  const ShardInfo& shard_info,
1485  const void* sd_inner_proxy,
1486  const void* sd_outer_proxy,
1487  const unsigned cpu_thread_count,
1488  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1489  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1490  int32_t* pos_buff = buff;
1491  int32_t* count_buff = buff + hash_entry_count;
1492  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1493  std::vector<std::future<void>> counter_threads;
1494  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1495  counter_threads.push_back(std::async(
1496  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1497  }
1498 
1499  for (auto& child : counter_threads) {
1500  child.get();
1501  }
1502 
1503  std::vector<int32_t> count_copy(hash_entry_count, 0);
1504  CHECK_GT(hash_entry_count, int32_t(0));
1505  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1507  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1508  std::vector<std::future<void>> pos_threads;
1509  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1510  pos_threads.push_back(std::async(
1511  std::launch::async,
1512  [&](const unsigned thread_idx) {
1513  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1514  i += cpu_thread_count) {
1515  if (count_buff[i]) {
1516  pos_buff[i] = count_copy[i];
1517  }
1518  }
1519  },
1520  cpu_thread_idx));
1521  }
1522  for (auto& child : pos_threads) {
1523  child.get();
1524  }
1525 
1526  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1527  std::vector<std::future<void>> rowid_threads;
1528  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1529  rowid_threads.push_back(std::async(
1530  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1531  }
1532 
1533  for (auto& child : rowid_threads) {
1534  child.get();
1535  }
1536 }
1537 
1539  const int32_t hash_entry_count,
1540 
1541  const int32_t invalid_slot_val,
1542  const JoinColumn& join_column,
1543  const JoinColumnTypeInfo& type_info,
1544  const ShardInfo& shard_info,
1545  const void* sd_inner_proxy,
1546  const void* sd_outer_proxy,
1547  const unsigned cpu_thread_count) {
1548  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1549  invalid_slot_val,
1550  &join_column,
1551  &type_info,
1552  &shard_info
1553 #ifndef __CUDACC__
1554  ,
1555  sd_inner_proxy,
1556  sd_outer_proxy
1557 #endif
1558  ](auto cpu_thread_idx, auto cpu_thread_count) {
1559  return SUFFIX(count_matches_sharded)(count_buff,
1560  invalid_slot_val,
1561  join_column,
1562  type_info,
1563  shard_info
1564 #ifndef __CUDACC__
1565  ,
1566  sd_inner_proxy,
1567  sd_outer_proxy,
1568  cpu_thread_idx,
1569  cpu_thread_count
1570 #endif
1571  );
1572  };
1573 
1574  auto launch_fill_row_ids = [buff,
1575  hash_entry_count,
1576  invalid_slot_val,
1577  &join_column,
1578  &type_info,
1579  &shard_info
1580 #ifndef __CUDACC__
1581  ,
1582  sd_inner_proxy,
1583  sd_outer_proxy
1584 #endif
1585  ](auto cpu_thread_idx, auto cpu_thread_count) {
1586  return SUFFIX(fill_row_ids_sharded)(buff,
1587  hash_entry_count,
1588  invalid_slot_val,
1589  join_column,
1590  type_info,
1591  shard_info
1592 #ifndef __CUDACC__
1593  ,
1594  sd_inner_proxy,
1595  sd_outer_proxy,
1596  cpu_thread_idx,
1597  cpu_thread_count);
1598 #endif
1599  };
1600 
1602  hash_entry_count,
1603  invalid_slot_val,
1604  join_column,
1605  type_info,
1606  shard_info
1607 #ifndef __CUDACC__
1608  ,
1609  sd_inner_proxy,
1610  sd_outer_proxy,
1611  cpu_thread_count
1612 #endif
1613  ,
1614  launch_count_matches,
1615  launch_fill_row_ids);
1616 }
1617 
1618 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1619  const int32_t entry_count,
1620  const size_t key_component_count,
1621  const bool with_val_slot,
1622  const int32_t invalid_slot_val,
1623  const int32_t cpu_thread_idx,
1624  const int32_t cpu_thread_count) {
1625  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1626  entry_count,
1627  key_component_count,
1628  with_val_slot,
1629  invalid_slot_val,
1630  cpu_thread_idx,
1631  cpu_thread_count);
1632 }
1633 
1634 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1635  const int32_t entry_count,
1636  const size_t key_component_count,
1637  const bool with_val_slot,
1638  const int32_t invalid_slot_val,
1639  const int32_t cpu_thread_idx,
1640  const int32_t cpu_thread_count) {
1641  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1642  entry_count,
1643  key_component_count,
1644  with_val_slot,
1645  invalid_slot_val,
1646  cpu_thread_idx,
1647  cpu_thread_count);
1648 }
1649 
1650 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1651  const size_t entry_count,
1652  const int32_t invalid_slot_val,
1653  const size_t key_component_count,
1654  const bool with_val_slot,
1655  const GenericKeyHandler* key_handler,
1656  const size_t num_elems,
1657  const int32_t cpu_thread_idx,
1658  const int32_t cpu_thread_count) {
1659  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1660  entry_count,
1661  invalid_slot_val,
1662  key_component_count,
1663  with_val_slot,
1664  key_handler,
1665  num_elems,
1666  cpu_thread_idx,
1667  cpu_thread_count);
1668 }
1669 
1671  const size_t entry_count,
1672  const int32_t invalid_slot_val,
1673  const size_t key_component_count,
1674  const bool with_val_slot,
1675  const OverlapsKeyHandler* key_handler,
1676  const size_t num_elems,
1677  const int32_t cpu_thread_idx,
1678  const int32_t cpu_thread_count) {
1679  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1680  entry_count,
1681  invalid_slot_val,
1682  key_component_count,
1683  with_val_slot,
1684  key_handler,
1685  num_elems,
1686  cpu_thread_idx,
1687  cpu_thread_count);
1688 }
1689 
1690 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1691  const size_t entry_count,
1692  const int32_t invalid_slot_val,
1693  const size_t key_component_count,
1694  const bool with_val_slot,
1695  const GenericKeyHandler* key_handler,
1696  const size_t num_elems,
1697  const int32_t cpu_thread_idx,
1698  const int32_t cpu_thread_count) {
1699  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1700  entry_count,
1701  invalid_slot_val,
1702  key_component_count,
1703  with_val_slot,
1704  key_handler,
1705  num_elems,
1706  cpu_thread_idx,
1707  cpu_thread_count);
1708 }
1709 
1711  const size_t entry_count,
1712  const int32_t invalid_slot_val,
1713  const size_t key_component_count,
1714  const bool with_val_slot,
1715  const OverlapsKeyHandler* key_handler,
1716  const size_t num_elems,
1717  const int32_t cpu_thread_idx,
1718  const int32_t cpu_thread_count) {
1719  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1720  entry_count,
1721  invalid_slot_val,
1722  key_component_count,
1723  with_val_slot,
1724  key_handler,
1725  num_elems,
1726  cpu_thread_idx,
1727  cpu_thread_count);
1728 }
1729 
1730 template <typename T>
1732  int32_t* buff,
1733  const T* composite_key_dict,
1734  const size_t hash_entry_count,
1735  const int32_t invalid_slot_val,
1736  const size_t key_component_count,
1737  const std::vector<JoinColumn>& join_column_per_key,
1738  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1739  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1740  const std::vector<const void*>& sd_inner_proxy_per_key,
1741  const std::vector<const void*>& sd_outer_proxy_per_key,
1742  const size_t cpu_thread_count) {
1743  int32_t* pos_buff = buff;
1744  int32_t* count_buff = buff + hash_entry_count;
1745  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1746  std::vector<std::future<void>> counter_threads;
1747  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1748  if (join_buckets_per_key.size() > 0) {
1749  counter_threads.push_back(
1750  std::async(std::launch::async,
1751  [count_buff,
1752  composite_key_dict,
1753  &hash_entry_count,
1754  &join_buckets_per_key,
1755  &join_column_per_key,
1756  cpu_thread_idx,
1757  cpu_thread_count] {
1758  const auto key_handler = OverlapsKeyHandler(
1759  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1760  &join_column_per_key[0],
1761  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1762  count_matches_baseline(count_buff,
1763  composite_key_dict,
1764  hash_entry_count,
1765  &key_handler,
1766  join_column_per_key[0].num_elems,
1767  cpu_thread_idx,
1768  cpu_thread_count);
1769  }));
1770  } else {
1771  counter_threads.push_back(std::async(
1772  std::launch::async,
1773  [count_buff,
1774  composite_key_dict,
1775  &key_component_count,
1776  &hash_entry_count,
1777  &join_column_per_key,
1778  &type_info_per_key,
1779  &sd_inner_proxy_per_key,
1780  &sd_outer_proxy_per_key,
1781  cpu_thread_idx,
1782  cpu_thread_count] {
1783  const auto key_handler = GenericKeyHandler(key_component_count,
1784  true,
1785  &join_column_per_key[0],
1786  &type_info_per_key[0],
1787  &sd_inner_proxy_per_key[0],
1788  &sd_outer_proxy_per_key[0]);
1789  count_matches_baseline(count_buff,
1790  composite_key_dict,
1791  hash_entry_count,
1792  &key_handler,
1793  join_column_per_key[0].num_elems,
1794  cpu_thread_idx,
1795  cpu_thread_count);
1796  }));
1797  }
1798  }
1799 
1800  for (auto& child : counter_threads) {
1801  child.get();
1802  }
1803 
1804  std::vector<int32_t> count_copy(hash_entry_count, 0);
1805  CHECK_GT(hash_entry_count, 0u);
1806  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1808  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1809  std::vector<std::future<void>> pos_threads;
1810  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1811  pos_threads.push_back(std::async(
1812  std::launch::async,
1813  [&](const int thread_idx) {
1814  for (size_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1815  if (count_buff[i]) {
1816  pos_buff[i] = count_copy[i];
1817  }
1818  }
1819  },
1820  cpu_thread_idx));
1821  }
1822  for (auto& child : pos_threads) {
1823  child.get();
1824  }
1825 
1826  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1827  std::vector<std::future<void>> rowid_threads;
1828  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1829  if (join_buckets_per_key.size() > 0) {
1830  rowid_threads.push_back(
1831  std::async(std::launch::async,
1832  [buff,
1833  composite_key_dict,
1834  hash_entry_count,
1835  invalid_slot_val,
1836  &join_column_per_key,
1837  &join_buckets_per_key,
1838  cpu_thread_idx,
1839  cpu_thread_count] {
1840  const auto key_handler = OverlapsKeyHandler(
1841  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1842  &join_column_per_key[0],
1843  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1845  (buff,
1846  composite_key_dict,
1847  hash_entry_count,
1848  invalid_slot_val,
1849  &key_handler,
1850  join_column_per_key[0].num_elems,
1851  cpu_thread_idx,
1852  cpu_thread_count);
1853  }));
1854  } else {
1855  rowid_threads.push_back(std::async(std::launch::async,
1856  [buff,
1857  composite_key_dict,
1858  hash_entry_count,
1859  invalid_slot_val,
1860  key_component_count,
1861  &join_column_per_key,
1862  &type_info_per_key,
1863  &sd_inner_proxy_per_key,
1864  &sd_outer_proxy_per_key,
1865  cpu_thread_idx,
1866  cpu_thread_count] {
1867  const auto key_handler = GenericKeyHandler(
1868  key_component_count,
1869  true,
1870  &join_column_per_key[0],
1871  &type_info_per_key[0],
1872  &sd_inner_proxy_per_key[0],
1873  &sd_outer_proxy_per_key[0]);
1875  (buff,
1876  composite_key_dict,
1877  hash_entry_count,
1878  invalid_slot_val,
1879  &key_handler,
1880  join_column_per_key[0].num_elems,
1881  cpu_thread_idx,
1882  cpu_thread_count);
1883  }));
1884  }
1885  }
1886 
1887  for (auto& child : rowid_threads) {
1888  child.get();
1889  }
1890 }
1891 
1893  int32_t* buff,
1894  const int32_t* composite_key_dict,
1895  const size_t hash_entry_count,
1896  const int32_t invalid_slot_val,
1897  const size_t key_component_count,
1898  const std::vector<JoinColumn>& join_column_per_key,
1899  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1900  const std::vector<JoinBucketInfo>& join_bucket_info,
1901  const std::vector<const void*>& sd_inner_proxy_per_key,
1902  const std::vector<const void*>& sd_outer_proxy_per_key,
1903  const int32_t cpu_thread_count) {
1904  fill_one_to_many_baseline_hash_table<int32_t>(buff,
1905  composite_key_dict,
1906  hash_entry_count,
1907  invalid_slot_val,
1908  key_component_count,
1909  join_column_per_key,
1910  type_info_per_key,
1911  join_bucket_info,
1912  sd_inner_proxy_per_key,
1913  sd_outer_proxy_per_key,
1914  cpu_thread_count);
1915 }
1916 
1918  int32_t* buff,
1919  const int64_t* composite_key_dict,
1920  const size_t hash_entry_count,
1921  const int32_t invalid_slot_val,
1922  const size_t key_component_count,
1923  const std::vector<JoinColumn>& join_column_per_key,
1924  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1925  const std::vector<JoinBucketInfo>& join_bucket_info,
1926  const std::vector<const void*>& sd_inner_proxy_per_key,
1927  const std::vector<const void*>& sd_outer_proxy_per_key,
1928  const int32_t cpu_thread_count) {
1929  fill_one_to_many_baseline_hash_table<int64_t>(buff,
1930  composite_key_dict,
1931  hash_entry_count,
1932  invalid_slot_val,
1933  key_component_count,
1934  join_column_per_key,
1935  type_info_per_key,
1936  join_bucket_info,
1937  sd_inner_proxy_per_key,
1938  sd_outer_proxy_per_key,
1939  cpu_thread_count);
1940 }
1941 
1942 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
1943  const uint32_t b,
1944  const size_t padded_size_bytes,
1945  const std::vector<JoinColumn>& join_column_per_key,
1946  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1947  const int thread_count) {
1948  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1949  CHECK(!join_column_per_key.empty());
1950 
1951  std::vector<std::future<void>> approx_distinct_threads;
1952  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1953  approx_distinct_threads.push_back(std::async(
1954  std::launch::async,
1955  [&join_column_per_key,
1956  &type_info_per_key,
1957  b,
1958  hll_buffer_all_cpus,
1959  padded_size_bytes,
1960  thread_idx,
1961  thread_count] {
1962  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1963 
1964  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
1965  false,
1966  &join_column_per_key[0],
1967  &type_info_per_key[0],
1968  nullptr,
1969  nullptr);
1971  nullptr,
1972  b,
1973  join_column_per_key[0].num_elems,
1974  &key_handler,
1975  thread_idx,
1976  thread_count);
1977  }));
1978  }
1979  for (auto& child : approx_distinct_threads) {
1980  child.get();
1981  }
1982 }
1983 
1985  uint8_t* hll_buffer_all_cpus,
1986  std::vector<int32_t>& row_counts,
1987  const uint32_t b,
1988  const size_t padded_size_bytes,
1989  const std::vector<JoinColumn>& join_column_per_key,
1990  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1991  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1992  const int thread_count) {
1993  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
1994  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1995  CHECK(!join_column_per_key.empty());
1996 
1997  std::vector<std::future<void>> approx_distinct_threads;
1998  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1999  approx_distinct_threads.push_back(std::async(
2000  std::launch::async,
2001  [&join_column_per_key,
2002  &join_buckets_per_key,
2003  &row_counts,
2004  b,
2005  hll_buffer_all_cpus,
2006  padded_size_bytes,
2007  thread_idx,
2008  thread_count] {
2009  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2010 
2011  const auto key_handler = OverlapsKeyHandler(
2012  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
2013  &join_column_per_key[0],
2014  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
2016  row_counts.data(),
2017  b,
2018  join_column_per_key[0].num_elems,
2019  &key_handler,
2020  thread_idx,
2021  thread_count);
2022  }));
2023  }
2024  for (auto& child : approx_distinct_threads) {
2025  child.get();
2026  }
2027 
2029  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2030 }
2031 
2032 void compute_bucket_sizes(std::vector<double>& bucket_sizes_for_dimension,
2033  const JoinColumn& join_column,
2034  const JoinColumnTypeInfo& type_info,
2035  const double bucket_size_threshold,
2036  const int thread_count) {
2037  std::vector<std::vector<double>> bucket_sizes_for_threads;
2038  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2039  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(),
2040  std::numeric_limits<double>::max());
2041  }
2042  std::vector<std::future<void>> threads;
2043  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2044  threads.push_back(std::async(std::launch::async,
2045  compute_bucket_sizes_impl<2>,
2046  bucket_sizes_for_threads[thread_idx].data(),
2047  &join_column,
2048  &type_info,
2049  bucket_size_threshold,
2050  thread_idx,
2051  thread_count));
2052  }
2053  for (auto& child : threads) {
2054  child.get();
2055  }
2056 
2057  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2058  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2059  if (bucket_sizes_for_threads[thread_idx][i] < bucket_sizes_for_dimension[i]) {
2060  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2061  }
2062  }
2063  }
2064 }
2065 
2066 #endif // ifndef __CUDACC__
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
const size_t num_shards
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const size_t entry_count, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const size_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
size_t getNormalizedHashEntryCount() const
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:31
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const size_t hash_entry_size)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:74
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count, const size_t key_size_in_bytes)
#define GLOBAL
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define SUFFIX(name)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define CHECK_GT(x, y)
Definition: Logger.h:209
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
DEVICE int SUFFIX() fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const FILL_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEVICE
DEVICE FORCE_INLINE const int8_t * ptr() const
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:39
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:45
static constexpr int32_t INVALID_STR_ID
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const size_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE T SUFFIX() get_invalid_key()
std::string getString(int32_t string_id) const
#define CHECK_NE(x, y)
Definition: Logger.h:206
#define mapd_cas(address, compare, val)
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int64_t bucket_normalization
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
const size_t shard
size_t hash_entry_count
#define UNLIKELY(x)
Definition: likely.h:20
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:89
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:60
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
#define store_cst(ptr, val)
#define CHECK(condition)
Definition: Logger.h:197
void fill_one_to_many_hash_table_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:126
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const size_t cpu_thread_count)
#define mapd_add(address, val)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
DEVICE void fill_row_ids_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)