OmniSciDB  0b528656ed
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 #include "../Shared/shard_key.h"
19 #include "CompareKeysInl.h"
20 #include "HashJoinKeyHandlers.h"
21 #include "HyperLogLogRank.h"
22 #include "JoinColumnIterator.h"
23 #include "MurmurHash1Inl.h"
24 #ifdef __CUDACC__
25 #include "DecodersImpl.h"
26 #include "GpuRtConstants.h"
27 #include "JoinHashImpl.h"
28 #else
29 #include "Shared/Logger.h"
30 
31 #include "RuntimeFunctions.h"
32 #include "Shared/likely.h"
35 
36 #include <future>
37 #endif
38 
39 #if HAVE_CUDA
40 #include <thrust/scan.h>
41 #endif
42 #include "../Shared/funcannotations.h"
43 
44 #include <cmath>
45 #include <numeric>
46 
47 #ifndef __CUDACC__
48 namespace {
49 
70 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
71  const int64_t min_elem,
72  const int64_t max_elem,
73  const void* sd_inner_proxy,
74  const void* sd_outer_proxy) {
75  CHECK(sd_outer_proxy);
76  const auto sd_inner_dict_proxy =
77  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
78  const auto sd_outer_dict_proxy =
79  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
80  const auto elem_str = sd_inner_dict_proxy->getString(elem);
81  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
82  if (outer_id > max_elem || outer_id < min_elem) {
84  }
85  return outer_id;
86 }
87 
88 } // namespace
89 #endif
90 
91 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
92  const int32_t hash_entry_count,
93  const int32_t invalid_slot_val,
94  const int32_t cpu_thread_idx,
95  const int32_t cpu_thread_count) {
96 #ifdef __CUDACC__
97  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
98  int32_t step = blockDim.x * gridDim.x;
99 #else
100  int32_t start = cpu_thread_idx;
101  int32_t step = cpu_thread_count;
102 #endif
103  for (int32_t i = start; i < hash_entry_count; i += step) {
104  groups_buffer[i] = invalid_slot_val;
105  }
106 }
107 
108 #ifdef __CUDACC__
109 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
110 #else
111 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
112 #endif
113 
114 template <typename SLOT_SELECTOR>
115 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
116  const int32_t invalid_slot_val,
117  const JoinColumn join_column,
118  const JoinColumnTypeInfo type_info,
119  const void* sd_inner_proxy,
120  const void* sd_outer_proxy,
121  const int32_t cpu_thread_idx,
122  const int32_t cpu_thread_count,
123  SLOT_SELECTOR slot_sel) {
124 #ifdef __CUDACC__
125  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
126  int32_t step = blockDim.x * gridDim.x;
127 #else
128  int32_t start = cpu_thread_idx;
129  int32_t step = cpu_thread_count;
130 #endif
131  JoinColumnTyped col{&join_column, &type_info};
132  for (auto item : col.slice(start, step)) {
133  const size_t index = item.index;
134  int64_t elem = item.element;
135  if (elem == type_info.null_val) {
136  if (type_info.uses_bw_eq) {
137  elem = type_info.translated_null_val;
138  } else {
139  continue;
140  }
141  }
142 #ifndef __CUDACC__
143  if (sd_inner_proxy &&
144  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
145  const auto outer_id = translate_str_id_to_outer_dict(
146  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
147  if (outer_id == StringDictionary::INVALID_STR_ID) {
148  continue;
149  }
150  elem = outer_id;
151  }
152  CHECK_GE(elem, type_info.min_val)
153  << "Element " << elem << " less than min val " << type_info.min_val;
154 #endif
155  int32_t* entry_ptr = slot_sel(elem);
156  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
157  return -1;
158  }
159  }
160  return 0;
161 };
162 
164  const int32_t invalid_slot_val,
165  const JoinColumn join_column,
166  const JoinColumnTypeInfo type_info,
167  const void* sd_inner_proxy,
168  const void* sd_outer_proxy,
169  const int32_t cpu_thread_idx,
170  const int32_t cpu_thread_count,
171  const int64_t bucket_normalization) {
172  auto slot_selector = [&](auto elem) {
174  buff, elem, type_info.min_val, bucket_normalization);
175  };
176  return fill_hash_join_buff_impl(buff,
177  invalid_slot_val,
178  join_column,
179  type_info,
180  sd_inner_proxy,
181  sd_outer_proxy,
182  cpu_thread_idx,
183  cpu_thread_count,
184  slot_selector);
185 }
186 
187 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
188  const int32_t invalid_slot_val,
189  const JoinColumn join_column,
190  const JoinColumnTypeInfo type_info,
191  const void* sd_inner_proxy,
192  const void* sd_outer_proxy,
193  const int32_t cpu_thread_idx,
194  const int32_t cpu_thread_count) {
195  auto slot_selector = [&](auto elem) {
196  return SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
197  };
198  return fill_hash_join_buff_impl(buff,
199  invalid_slot_val,
200  join_column,
201  type_info,
202  sd_inner_proxy,
203  sd_outer_proxy,
204  cpu_thread_idx,
205  cpu_thread_count,
206  slot_selector);
207 }
208 
209 template <typename SLOT_SELECTOR>
211  const int32_t invalid_slot_val,
212  const JoinColumn join_column,
213  const JoinColumnTypeInfo type_info,
214  const ShardInfo shard_info,
215  const void* sd_inner_proxy,
216  const void* sd_outer_proxy,
217  const int32_t cpu_thread_idx,
218  const int32_t cpu_thread_count,
219  SLOT_SELECTOR slot_sel) {
220 #ifdef __CUDACC__
221  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
222  int32_t step = blockDim.x * gridDim.x;
223 #else
224  int32_t start = cpu_thread_idx;
225  int32_t step = cpu_thread_count;
226 #endif
227  JoinColumnTyped col{&join_column, &type_info};
228  for (auto item : col.slice(start, step)) {
229  const size_t index = item.index;
230  int64_t elem = item.element;
231  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
232  if (shard != shard_info.shard) {
233  continue;
234  }
235  if (elem == type_info.null_val) {
236  if (type_info.uses_bw_eq) {
237  elem = type_info.translated_null_val;
238  } else {
239  continue;
240  }
241  }
242 #ifndef __CUDACC__
243  if (sd_inner_proxy &&
244  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
245  const auto outer_id = translate_str_id_to_outer_dict(
246  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
247  if (outer_id == StringDictionary::INVALID_STR_ID) {
248  continue;
249  }
250  elem = outer_id;
251  }
252  CHECK_GE(elem, type_info.min_val)
253  << "Element " << elem << " less than min val " << type_info.min_val;
254 #endif
255  int32_t* entry_ptr = slot_sel(elem);
256  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
257  return -1;
258  }
259  }
260  return 0;
261 }
262 
264  int32_t* buff,
265  const int32_t invalid_slot_val,
266  const JoinColumn join_column,
267  const JoinColumnTypeInfo type_info,
268  const ShardInfo shard_info,
269  const void* sd_inner_proxy,
270  const void* sd_outer_proxy,
271  const int32_t cpu_thread_idx,
272  const int32_t cpu_thread_count,
273  const int64_t bucket_normalization) {
274  auto slot_selector = [&](auto elem) -> auto {
276  elem,
277  type_info.min_val,
278  shard_info.entry_count_per_shard,
279  shard_info.num_shards,
280  shard_info.device_count,
281  bucket_normalization);
282  };
283 
285  invalid_slot_val,
286  join_column,
287  type_info,
288  shard_info,
289  sd_inner_proxy,
290  sd_outer_proxy,
291  cpu_thread_idx,
292  cpu_thread_count,
293  slot_selector);
294 }
295 
297  const int32_t invalid_slot_val,
298  const JoinColumn join_column,
299  const JoinColumnTypeInfo type_info,
300  const ShardInfo shard_info,
301  const void* sd_inner_proxy,
302  const void* sd_outer_proxy,
303  const int32_t cpu_thread_idx,
304  const int32_t cpu_thread_count) {
305  auto slot_selector = [&](auto elem) {
306  return SUFFIX(get_hash_slot_sharded)(buff,
307  elem,
308  type_info.min_val,
309  shard_info.entry_count_per_shard,
310  shard_info.num_shards,
311  shard_info.device_count);
312  };
314  invalid_slot_val,
315  join_column,
316  type_info,
317  shard_info,
318  sd_inner_proxy,
319  sd_outer_proxy,
320  cpu_thread_idx,
321  cpu_thread_count,
322  slot_selector);
323 }
324 
325 template <typename T>
327  const size_t entry_count,
328  const size_t key_component_count,
329  const bool with_val_slot,
330  const int32_t invalid_slot_val,
331  const int32_t cpu_thread_idx,
332  const int32_t cpu_thread_count) {
333 #ifdef __CUDACC__
334  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
335  int32_t step = blockDim.x * gridDim.x;
336 #else
337  int32_t start = cpu_thread_idx;
338  int32_t step = cpu_thread_count;
339 #endif
340  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
341  const T empty_key = SUFFIX(get_invalid_key)<T>();
342  for (uint32_t h = start; h < entry_count; h += step) {
343  uint32_t off = h * hash_entry_size;
344  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
345  for (size_t i = 0; i < key_component_count; ++i) {
346  row_ptr[i] = empty_key;
347  }
348  if (with_val_slot) {
349  row_ptr[key_component_count] = invalid_slot_val;
350  }
351  }
352 }
353 
354 #ifdef __CUDACC__
355 template <typename T>
356 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
357  const uint32_t h,
358  const T* key,
359  const size_t key_component_count,
360  const size_t hash_entry_size) {
361  uint32_t off = h * hash_entry_size;
362  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
363  const T empty_key = SUFFIX(get_invalid_key)<T>();
364  {
365  const T old = atomicCAS(row_ptr, empty_key, *key);
366  if (empty_key == old && key_component_count > 1) {
367  for (size_t i = 1; i <= key_component_count - 1; ++i) {
368  atomicExch(row_ptr + i, key[i]);
369  }
370  }
371  }
372  if (key_component_count > 1) {
373  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
374  // spin until the winning thread has finished writing the entire key and the init
375  // value
376  }
377  }
378  bool match = true;
379  for (uint32_t i = 0; i < key_component_count; ++i) {
380  if (row_ptr[i] != key[i]) {
381  match = false;
382  break;
383  }
384  }
385 
386  if (match) {
387  return reinterpret_cast<T*>(row_ptr + key_component_count);
388  }
389  return nullptr;
390 }
391 #else
392 
393 #define cas_cst(ptr, expected, desired) \
394  __atomic_compare_exchange_n( \
395  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
396 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
397 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
398 
399 template <typename T>
401  const uint32_t h,
402  const T* key,
403  const size_t key_component_count,
404  const size_t hash_entry_size) {
405  uint32_t off = h * hash_entry_size;
406  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
407  T empty_key = SUFFIX(get_invalid_key)<T>();
408  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
409  if (UNLIKELY(*key == write_pending)) {
410  // Address the singularity case where the first column contains the pending
411  // write special value. Should never happen, but avoid doing wrong things.
412  return nullptr;
413  }
414  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
415  if (success) {
416  if (key_component_count > 1) {
417  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
418  }
419  store_cst(row_ptr, *key);
420  return reinterpret_cast<T*>(row_ptr + key_component_count);
421  }
422  while (load_cst(row_ptr) == write_pending) {
423  // spin until the winning thread has finished writing the entire key
424  }
425  for (size_t i = 0; i < key_component_count; ++i) {
426  if (load_cst(row_ptr + i) != key[i]) {
427  return nullptr;
428  }
429  }
430  return reinterpret_cast<T*>(row_ptr + key_component_count);
431 }
432 
433 #undef load_cst
434 #undef store_cst
435 #undef cas_cst
436 
437 #endif // __CUDACC__
438 
439 template <typename T>
440 DEVICE int write_baseline_hash_slot(const int32_t val,
441  int8_t* hash_buff,
442  const size_t entry_count,
443  const T* key,
444  const size_t key_component_count,
445  const bool with_val_slot,
446  const int32_t invalid_slot_val,
447  const size_t key_size_in_bytes,
448  const size_t hash_entry_size) {
449  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
450  T* matching_group = get_matching_baseline_hash_slot_at(
451  hash_buff, h, key, key_component_count, hash_entry_size);
452  if (!matching_group) {
453  uint32_t h_probe = (h + 1) % entry_count;
454  while (h_probe != h) {
455  matching_group = get_matching_baseline_hash_slot_at(
456  hash_buff, h_probe, key, key_component_count, hash_entry_size);
457  if (matching_group) {
458  break;
459  }
460  h_probe = (h_probe + 1) % entry_count;
461  }
462  }
463  if (!matching_group) {
464  return -2;
465  }
466  if (!with_val_slot) {
467  return 0;
468  }
469  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
470  return -1;
471  }
472  return 0;
473 }
474 
475 template <typename T, typename FILL_HANDLER>
477  const size_t entry_count,
478  const int32_t invalid_slot_val,
479  const size_t key_component_count,
480  const bool with_val_slot,
481  const FILL_HANDLER* f,
482  const size_t num_elems,
483  const int32_t cpu_thread_idx,
484  const int32_t cpu_thread_count) {
485 #ifdef __CUDACC__
486  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
487  int32_t step = blockDim.x * gridDim.x;
488 #else
489  int32_t start = cpu_thread_idx;
490  int32_t step = cpu_thread_count;
491 #endif
492 
493  T key_scratch_buff[g_maximum_conditions_to_coalesce];
494  const size_t key_size_in_bytes = key_component_count * sizeof(T);
495  const size_t hash_entry_size =
496  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
497  auto key_buff_handler = [hash_buff,
498  entry_count,
499  with_val_slot,
500  invalid_slot_val,
501  key_size_in_bytes,
502  hash_entry_size](const size_t entry_idx,
503  const T* key_scratch_buffer,
504  const size_t key_component_count) {
505  return write_baseline_hash_slot<T>(entry_idx,
506  hash_buff,
507  entry_count,
508  key_scratch_buffer,
509  key_component_count,
510  with_val_slot,
511  invalid_slot_val,
512  key_size_in_bytes,
513  hash_entry_size);
514  };
515 
516  JoinColumnTuple cols(
517  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
518  for (auto& it : cols.slice(start, step)) {
519  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
520  if (err) {
521  return err;
522  }
523  }
524  return 0;
525 }
526 
527 #undef mapd_cas
528 
529 #ifdef __CUDACC__
530 #define mapd_add(address, val) atomicAdd(address, val)
531 #else
532 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
533 #endif
534 
535 template <typename SLOT_SELECTOR>
536 DEVICE void count_matches_impl(int32_t* count_buff,
537  const int32_t invalid_slot_val,
538  const JoinColumn join_column,
539  const JoinColumnTypeInfo type_info
540 #ifndef __CUDACC__
541  ,
542  const void* sd_inner_proxy,
543  const void* sd_outer_proxy,
544  const int32_t cpu_thread_idx,
545  const int32_t cpu_thread_count
546 #endif
547  ,
548  SLOT_SELECTOR slot_selector) {
549 #ifdef __CUDACC__
550  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
551  int32_t step = blockDim.x * gridDim.x;
552 #else
553  int32_t start = cpu_thread_idx;
554  int32_t step = cpu_thread_count;
555 #endif
556  JoinColumnTyped col{&join_column, &type_info};
557  for (auto item : col.slice(start, step)) {
558  int64_t elem = item.element;
559  if (elem == type_info.null_val) {
560  if (type_info.uses_bw_eq) {
561  elem = type_info.translated_null_val;
562  } else {
563  continue;
564  }
565  }
566 #ifndef __CUDACC__
567  if (sd_inner_proxy &&
568  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
569  const auto outer_id = translate_str_id_to_outer_dict(
570  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
571  if (outer_id == StringDictionary::INVALID_STR_ID) {
572  continue;
573  }
574  elem = outer_id;
575  }
576  CHECK_GE(elem, type_info.min_val)
577  << "Element " << elem << " less than min val " << type_info.min_val;
578 #endif
579  auto* entry_ptr = slot_selector(count_buff, elem);
580  mapd_add(entry_ptr, int32_t(1));
581  }
582 }
583 
584 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
585  const int32_t invalid_slot_val,
586  const JoinColumn join_column,
587  const JoinColumnTypeInfo type_info
588 #ifndef __CUDACC__
589  ,
590  const void* sd_inner_proxy,
591  const void* sd_outer_proxy,
592  const int32_t cpu_thread_idx,
593  const int32_t cpu_thread_count
594 #endif
595 ) {
596  auto slot_sel = [&type_info](auto count_buff, auto elem) {
597  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
598  };
599  count_matches_impl(count_buff,
600  invalid_slot_val,
601  join_column,
602  type_info
603 #ifndef __CUDACC__
604  ,
605  sd_inner_proxy,
606  sd_outer_proxy,
607  cpu_thread_idx,
608  cpu_thread_count
609 #endif
610  ,
611  slot_sel);
612 }
613 
614 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
615  const int32_t invalid_slot_val,
616  const JoinColumn join_column,
617  const JoinColumnTypeInfo type_info
618 #ifndef __CUDACC__
619  ,
620  const void* sd_inner_proxy,
621  const void* sd_outer_proxy,
622  const int32_t cpu_thread_idx,
623  const int32_t cpu_thread_count
624 #endif
625  ,
626  const int64_t bucket_normalization) {
627  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
629  count_buff, elem, type_info.min_val, bucket_normalization);
630  };
631  count_matches_impl(count_buff,
632  invalid_slot_val,
633  join_column,
634  type_info
635 #ifndef __CUDACC__
636  ,
637  sd_inner_proxy,
638  sd_outer_proxy,
639  cpu_thread_idx,
640  cpu_thread_count
641 #endif
642  ,
643  slot_sel);
644 }
645 
646 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
647  const int32_t invalid_slot_val,
648  const JoinColumn join_column,
649  const JoinColumnTypeInfo type_info,
650  const ShardInfo shard_info
651 #ifndef __CUDACC__
652  ,
653  const void* sd_inner_proxy,
654  const void* sd_outer_proxy,
655  const int32_t cpu_thread_idx,
656  const int32_t cpu_thread_count
657 #endif
658 ) {
659 #ifdef __CUDACC__
660  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
661  int32_t step = blockDim.x * gridDim.x;
662 #else
663  int32_t start = cpu_thread_idx;
664  int32_t step = cpu_thread_count;
665 #endif
666  JoinColumnTyped col{&join_column, &type_info};
667  for (auto item : col.slice(start, step)) {
668  int64_t elem = item.element;
669  if (elem == type_info.null_val) {
670  if (type_info.uses_bw_eq) {
671  elem = type_info.translated_null_val;
672  } else {
673  continue;
674  }
675  }
676 #ifndef __CUDACC__
677  if (sd_inner_proxy &&
678  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
679  const auto outer_id = translate_str_id_to_outer_dict(
680  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
681  if (outer_id == StringDictionary::INVALID_STR_ID) {
682  continue;
683  }
684  elem = outer_id;
685  }
686  CHECK_GE(elem, type_info.min_val)
687  << "Element " << elem << " less than min val " << type_info.min_val;
688 #endif
689  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
690  elem,
691  type_info.min_val,
692  shard_info.entry_count_per_shard,
693  shard_info.num_shards,
694  shard_info.device_count);
695  mapd_add(entry_ptr, int32_t(1));
696  }
697 }
698 
699 template <typename T>
701  const T* key,
702  const size_t key_component_count,
703  const T* composite_key_dict,
704  const size_t entry_count,
705  const size_t key_size_in_bytes) {
706  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
707  uint32_t off = h * key_component_count;
708  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
709  return &composite_key_dict[off];
710  }
711  uint32_t h_probe = (h + 1) % entry_count;
712  while (h_probe != h) {
713  off = h_probe * key_component_count;
714  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
715  return &composite_key_dict[off];
716  }
717  h_probe = (h_probe + 1) % entry_count;
718  }
719 #ifndef __CUDACC__
720  CHECK(false);
721 #else
722  assert(false);
723 #endif
724  return nullptr;
725 }
726 
727 template <typename T, typename KEY_HANDLER>
728 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
729  const T* composite_key_dict,
730  const size_t entry_count,
731  const KEY_HANDLER* f,
732  const size_t num_elems
733 #ifndef __CUDACC__
734  ,
735  const int32_t cpu_thread_idx,
736  const int32_t cpu_thread_count
737 #endif
738 ) {
739 #ifdef __CUDACC__
740  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
741  int32_t step = blockDim.x * gridDim.x;
742 #else
743  int32_t start = cpu_thread_idx;
744  int32_t step = cpu_thread_count;
745 #endif
746 #ifdef __CUDACC__
747  assert(composite_key_dict);
748 #endif
749  T key_scratch_buff[g_maximum_conditions_to_coalesce];
750  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
751  auto key_buff_handler = [composite_key_dict,
752  entry_count,
753  count_buff,
754  key_size_in_bytes](const size_t row_entry_idx,
755  const T* key_scratch_buff,
756  const size_t key_component_count) {
757  const auto matching_group =
759  key_component_count,
760  composite_key_dict,
761  entry_count,
762  key_size_in_bytes);
763  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
764  mapd_add(&count_buff[entry_idx], int32_t(1));
765  return 0;
766  };
767 
768  JoinColumnTuple cols(
769  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
770  for (auto& it : cols.slice(start, step)) {
771  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
772  }
773 }
774 
775 template <typename SLOT_SELECTOR>
776 DEVICE void fill_row_ids_impl(int32_t* buff,
777  const int32_t hash_entry_count,
778  const int32_t invalid_slot_val,
779  const JoinColumn join_column,
780  const JoinColumnTypeInfo type_info
781 #ifndef __CUDACC__
782  ,
783  const void* sd_inner_proxy,
784  const void* sd_outer_proxy,
785  const int32_t cpu_thread_idx,
786  const int32_t cpu_thread_count
787 #endif
788  ,
789  SLOT_SELECTOR slot_selector) {
790  int32_t* pos_buff = buff;
791  int32_t* count_buff = buff + hash_entry_count;
792  int32_t* id_buff = count_buff + hash_entry_count;
793 
794 #ifdef __CUDACC__
795  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
796  int32_t step = blockDim.x * gridDim.x;
797 #else
798  int32_t start = cpu_thread_idx;
799  int32_t step = cpu_thread_count;
800 #endif
801  JoinColumnTyped col{&join_column, &type_info};
802  for (auto item : col.slice(start, step)) {
803  const size_t index = item.index;
804  int64_t elem = item.element;
805  if (elem == type_info.null_val) {
806  if (type_info.uses_bw_eq) {
807  elem = type_info.translated_null_val;
808  } else {
809  continue;
810  }
811  }
812 #ifndef __CUDACC__
813  if (sd_inner_proxy &&
814  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
815  const auto outer_id = translate_str_id_to_outer_dict(
816  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
817  if (outer_id == StringDictionary::INVALID_STR_ID) {
818  continue;
819  }
820  elem = outer_id;
821  }
822  CHECK_GE(elem, type_info.min_val)
823  << "Element " << elem << " less than min val " << type_info.min_val;
824 #endif
825  auto pos_ptr = slot_selector(pos_buff, elem);
826 #ifndef __CUDACC__
827  CHECK_NE(*pos_ptr, invalid_slot_val);
828 #endif
829  const auto bin_idx = pos_ptr - pos_buff;
830  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
831  id_buff[id_buff_idx] = static_cast<int32_t>(index);
832  }
833 }
834 
835 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
836  const int32_t hash_entry_count,
837  const int32_t invalid_slot_val,
838  const JoinColumn join_column,
839  const JoinColumnTypeInfo type_info
840 #ifndef __CUDACC__
841  ,
842  const void* sd_inner_proxy,
843  const void* sd_outer_proxy,
844  const int32_t cpu_thread_idx,
845  const int32_t cpu_thread_count
846 #endif
847 ) {
848  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
849  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
850  };
851 
852  fill_row_ids_impl(buff,
853  hash_entry_count,
854  invalid_slot_val,
855  join_column,
856  type_info
857 #ifndef __CUDACC__
858  ,
859  sd_inner_proxy,
860  sd_outer_proxy,
861  cpu_thread_idx,
862  cpu_thread_count
863 #endif
864  ,
865  slot_sel);
866 }
867 
869  const int32_t hash_entry_count,
870  const int32_t invalid_slot_val,
871  const JoinColumn join_column,
872  const JoinColumnTypeInfo type_info
873 #ifndef __CUDACC__
874  ,
875  const void* sd_inner_proxy,
876  const void* sd_outer_proxy,
877  const int32_t cpu_thread_idx,
878  const int32_t cpu_thread_count
879 #endif
880  ,
881  const int64_t bucket_normalization) {
882  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
884  pos_buff, elem, type_info.min_val, bucket_normalization);
885  };
886  fill_row_ids_impl(buff,
887  hash_entry_count,
888  invalid_slot_val,
889  join_column,
890  type_info
891 #ifndef __CUDACC__
892  ,
893  sd_inner_proxy,
894  sd_outer_proxy,
895  cpu_thread_idx,
896  cpu_thread_count
897 #endif
898  ,
899  slot_sel);
900 }
901 
902 template <typename SLOT_SELECTOR>
904  const int32_t hash_entry_count,
905  const int32_t invalid_slot_val,
906  const JoinColumn join_column,
907  const JoinColumnTypeInfo type_info,
908  const ShardInfo shard_info
909 #ifndef __CUDACC__
910  ,
911  const void* sd_inner_proxy,
912  const void* sd_outer_proxy,
913  const int32_t cpu_thread_idx,
914  const int32_t cpu_thread_count
915 #endif
916  ,
917  SLOT_SELECTOR slot_selector) {
918 
919  int32_t* pos_buff = buff;
920  int32_t* count_buff = buff + hash_entry_count;
921  int32_t* id_buff = count_buff + hash_entry_count;
922 
923 #ifdef __CUDACC__
924  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
925  int32_t step = blockDim.x * gridDim.x;
926 #else
927  int32_t start = cpu_thread_idx;
928  int32_t step = cpu_thread_count;
929 #endif
930  JoinColumnTyped col{&join_column, &type_info};
931  for (auto item : col.slice(start, step)) {
932  const size_t index = item.index;
933  int64_t elem = item.element;
934  if (elem == type_info.null_val) {
935  if (type_info.uses_bw_eq) {
936  elem = type_info.translated_null_val;
937  } else {
938  continue;
939  }
940  }
941 #ifndef __CUDACC__
942  if (sd_inner_proxy &&
943  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
944  const auto outer_id = translate_str_id_to_outer_dict(
945  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
946  if (outer_id == StringDictionary::INVALID_STR_ID) {
947  continue;
948  }
949  elem = outer_id;
950  }
951  CHECK_GE(elem, type_info.min_val)
952  << "Element " << elem << " less than min val " << type_info.min_val;
953 #endif
954  auto* pos_ptr = slot_selector(pos_buff, elem);
955 #ifndef __CUDACC__
956  CHECK_NE(*pos_ptr, invalid_slot_val);
957 #endif
958  const auto bin_idx = pos_ptr - pos_buff;
959  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
960  id_buff[id_buff_idx] = static_cast<int32_t>(index);
961  }
962 }
963 
964 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
965  const int32_t hash_entry_count,
966  const int32_t invalid_slot_val,
967  const JoinColumn join_column,
968  const JoinColumnTypeInfo type_info,
969  const ShardInfo shard_info
970 #ifndef __CUDACC__
971  ,
972  const void* sd_inner_proxy,
973  const void* sd_outer_proxy,
974  const int32_t cpu_thread_idx,
975  const int32_t cpu_thread_count
976 #endif
977 ) {
978  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
979  return SUFFIX(get_hash_slot_sharded)(pos_buff,
980  elem,
981  type_info.min_val,
982  shard_info.entry_count_per_shard,
983  shard_info.num_shards,
984  shard_info.device_count);
985  };
986 
987  fill_row_ids_impl(buff,
988  hash_entry_count,
989  invalid_slot_val,
990  join_column,
991  type_info
992 #ifndef __CUDACC__
993  ,
994  sd_inner_proxy,
995  sd_outer_proxy,
996  cpu_thread_idx,
997  cpu_thread_count
998 #endif
999  ,
1000  slot_sel);
1001 }
1002 
1004  const int32_t hash_entry_count,
1005  const int32_t invalid_slot_val,
1006  const JoinColumn join_column,
1007  const JoinColumnTypeInfo type_info,
1008  const ShardInfo shard_info
1009 #ifndef __CUDACC__
1010  ,
1011  const void* sd_inner_proxy,
1012  const void* sd_outer_proxy,
1013  const int32_t cpu_thread_idx,
1014  const int32_t cpu_thread_count
1015 #endif
1016  ,
1017  const int64_t bucket_normalization) {
1018  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1019  auto elem) {
1020  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1021  elem,
1022  type_info.min_val,
1023  shard_info.entry_count_per_shard,
1024  shard_info.num_shards,
1025  shard_info.device_count,
1026  bucket_normalization);
1027  };
1028 
1029  fill_row_ids_impl(buff,
1030  hash_entry_count,
1031  invalid_slot_val,
1032  join_column,
1033  type_info
1034 #ifndef __CUDACC__
1035  ,
1036  sd_inner_proxy,
1037  sd_outer_proxy,
1038  cpu_thread_idx,
1039  cpu_thread_count
1040 #endif
1041  ,
1042  slot_sel);
1043 }
1044 
1045 template <typename T, typename KEY_HANDLER>
1047  const T* composite_key_dict,
1048  const size_t hash_entry_count,
1049  const int32_t invalid_slot_val,
1050  const KEY_HANDLER* f,
1051  const size_t num_elems
1052 #ifndef __CUDACC__
1053  ,
1054  const int32_t cpu_thread_idx,
1055  const int32_t cpu_thread_count
1056 #endif
1057 ) {
1058  int32_t* pos_buff = buff;
1059  int32_t* count_buff = buff + hash_entry_count;
1060  int32_t* id_buff = count_buff + hash_entry_count;
1061 #ifdef __CUDACC__
1062  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1063  int32_t step = blockDim.x * gridDim.x;
1064 #else
1065  int32_t start = cpu_thread_idx;
1066  int32_t step = cpu_thread_count;
1067 #endif
1068 
1069  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1070 #ifdef __CUDACC__
1071  assert(composite_key_dict);
1072 #endif
1073  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1074  auto key_buff_handler = [composite_key_dict,
1075  hash_entry_count,
1076  pos_buff,
1077  invalid_slot_val,
1078  count_buff,
1079  id_buff,
1080  key_size_in_bytes](const size_t row_index,
1081  const T* key_scratch_buff,
1082  const size_t key_component_count) {
1083  const T* matching_group =
1085  key_component_count,
1086  composite_key_dict,
1087  hash_entry_count,
1088  key_size_in_bytes);
1089  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1090  int32_t* pos_ptr = pos_buff + entry_idx;
1091 #ifndef __CUDACC__
1092  CHECK_NE(*pos_ptr, invalid_slot_val);
1093 #endif
1094  const auto bin_idx = pos_ptr - pos_buff;
1095  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1096  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1097  return 0;
1098  };
1099 
1100  JoinColumnTuple cols(
1101  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1102  for (auto& it : cols.slice(start, step)) {
1103  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1104  }
1105  return;
1106 }
1107 
1108 #undef mapd_add
1109 
1110 template <typename KEY_HANDLER>
1112  int32_t* row_count_buffer,
1113  const uint32_t b,
1114  const size_t num_elems,
1115  const KEY_HANDLER* f
1116 #ifndef __CUDACC__
1117  ,
1118  const int32_t cpu_thread_idx,
1119  const int32_t cpu_thread_count
1120 #endif
1121 ) {
1122 #ifdef __CUDACC__
1123  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1124  int32_t step = blockDim.x * gridDim.x;
1125 #else
1126  int32_t start = cpu_thread_idx;
1127  int32_t step = cpu_thread_count;
1128 #endif
1129 
1130  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1131  const size_t entry_idx,
1132  const int64_t* key_scratch_buff,
1133  const size_t key_component_count) {
1134  if (row_count_buffer) {
1135  row_count_buffer[entry_idx] += 1;
1136  }
1137 
1138  const uint64_t hash =
1139  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1140  const uint32_t index = hash >> (64 - b);
1141  const auto rank = get_rank(hash << b, 64 - b);
1142 #ifdef __CUDACC__
1143  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1144 #else
1145  hll_buffer[index] = std::max(hll_buffer[index], rank);
1146 #endif
1147 
1148  return 0;
1149  };
1150 
1151  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1152 
1153  JoinColumnTuple cols(
1154  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1155  for (auto& it : cols.slice(start, step)) {
1156  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1157  }
1158 }
1159 
1160 #ifdef __CUDACC__
1161 namespace {
1162 // TODO(adb): put these in a header file so they are not duplicated between here and
1163 // cuda_mapd_rt.cu
1164 __device__ double atomicMin(double* address, double val) {
1165  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1166  unsigned long long int old = *address_as_ull, assumed;
1167 
1168  do {
1169  assumed = old;
1170  old = atomicCAS(address_as_ull,
1171  assumed,
1172  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1173  } while (assumed != old);
1174 
1175  return __longlong_as_double(old);
1176 }
1177 } // namespace
1178 #endif
1179 
1180 template <size_t N>
1181 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1182  const JoinColumn* join_column,
1183  const JoinColumnTypeInfo* type_info,
1184  const double bucket_sz_threshold,
1185  const int32_t cpu_thread_idx,
1186  const int32_t cpu_thread_count) {
1187 #ifdef __CUDACC__
1188  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1189  int32_t step = blockDim.x * gridDim.x;
1190 #else
1191  int32_t start = cpu_thread_idx;
1192  int32_t step = cpu_thread_count;
1193 #endif
1194  JoinColumnIterator it(join_column, type_info, start, step);
1195  for (; it; ++it) {
1196  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1197  double bounds[2 * N];
1198  for (size_t j = 0; j < 2 * N; j++) {
1199  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1200  }
1201 
1202  for (size_t j = 0; j < N; j++) {
1203  const auto diff = bounds[j + N] - bounds[j];
1204 #ifdef __CUDACC__
1205  if (diff > bucket_sz_threshold) {
1206  atomicMin(&bucket_sizes_for_thread[j], diff);
1207  }
1208 #else
1209  if (diff > bucket_sz_threshold && diff < bucket_sizes_for_thread[j]) {
1210  bucket_sizes_for_thread[j] = diff;
1211  }
1212 #endif
1213  }
1214  }
1215 }
1216 
1217 #ifndef __CUDACC__
1218 
1219 template <typename InputIterator, typename OutputIterator>
1220 void inclusive_scan(InputIterator first,
1221  InputIterator last,
1222  OutputIterator out,
1223  const size_t thread_count) {
1224  using ElementType = typename InputIterator::value_type;
1225  using OffsetType = typename InputIterator::difference_type;
1226  const OffsetType elem_count = last - first;
1227  if (elem_count < 10000 || thread_count <= 1) {
1228  ElementType sum = 0;
1229  for (auto iter = first; iter != last; ++iter, ++out) {
1230  *out = sum += *iter;
1231  }
1232  return;
1233  }
1234 
1235  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1236  OffsetType start_off = 0;
1237  OffsetType end_off = std::min(step, elem_count);
1238  std::vector<ElementType> partial_sums(thread_count);
1239  std::vector<std::future<void>> counter_threads;
1240  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1241  start_off = std::min(start_off + step, elem_count),
1242  end_off = std::min(start_off + step, elem_count)) {
1243  counter_threads.push_back(std::async(
1244  std::launch::async,
1245  [first, out](
1246  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1247  ElementType sum = 0;
1248  for (auto in_iter = first + start, out_iter = out + start;
1249  in_iter != (first + end);
1250  ++in_iter, ++out_iter) {
1251  *out_iter = sum += *in_iter;
1252  }
1253  partial_sum = sum;
1254  },
1255  std::ref(partial_sums[thread_idx]),
1256  start_off,
1257  end_off));
1258  }
1259  for (auto& child : counter_threads) {
1260  child.get();
1261  }
1262 
1263  ElementType sum = 0;
1264  for (auto& s : partial_sums) {
1265  s += sum;
1266  sum = s;
1267  }
1268 
1269  counter_threads.clear();
1270  start_off = std::min(step, elem_count);
1271  end_off = std::min(start_off + step, elem_count);
1272  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1273  start_off = std::min(start_off + step, elem_count),
1274  end_off = std::min(start_off + step, elem_count)) {
1275  counter_threads.push_back(std::async(
1276  std::launch::async,
1277  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1278  for (auto iter = out + start; iter != (out + end); ++iter) {
1279  *iter += prev_sum;
1280  }
1281  },
1282  partial_sums[thread_idx],
1283  start_off,
1284  end_off));
1285  }
1286  for (auto& child : counter_threads) {
1287  child.get();
1288  }
1289 }
1290 
1291 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1293  const int32_t hash_entry_count,
1294  const int32_t invalid_slot_val,
1295  const JoinColumn& join_column,
1296  const JoinColumnTypeInfo& type_info,
1297  const void* sd_inner_proxy,
1298  const void* sd_outer_proxy,
1299  const unsigned cpu_thread_count,
1300  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1301  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1302  int32_t* pos_buff = buff;
1303  int32_t* count_buff = buff + hash_entry_count;
1304  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1305  std::vector<std::future<void>> counter_threads;
1306  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1307  counter_threads.push_back(std::async(
1308  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1309  }
1310 
1311  for (auto& child : counter_threads) {
1312  child.get();
1313  }
1314 
1315  std::vector<int32_t> count_copy(hash_entry_count, 0);
1316  CHECK_GT(hash_entry_count, int32_t(0));
1317  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1318 #if HAVE_CUDA
1319  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1320 #else
1322  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1323 #endif
1324  std::vector<std::future<void>> pos_threads;
1325  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1326  pos_threads.push_back(std::async(
1327  std::launch::async,
1328  [&](size_t thread_idx) {
1329  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1330  i += cpu_thread_count) {
1331  if (count_buff[i]) {
1332  pos_buff[i] = count_copy[i];
1333  }
1334  }
1335  },
1336  cpu_thread_idx));
1337  }
1338  for (auto& child : pos_threads) {
1339  child.get();
1340  }
1341 
1342  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1343  std::vector<std::future<void>> rowid_threads;
1344  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1345  rowid_threads.push_back(std::async(
1346  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1347  }
1348 
1349  for (auto& child : rowid_threads) {
1350  child.get();
1351  }
1352 }
1353 
1354 void fill_one_to_many_hash_table(int32_t* buff,
1355  const HashEntryInfo hash_entry_info,
1356  const int32_t invalid_slot_val,
1357  const JoinColumn& join_column,
1358  const JoinColumnTypeInfo& type_info,
1359  const void* sd_inner_proxy,
1360  const void* sd_outer_proxy,
1361  const unsigned cpu_thread_count) {
1362  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1363  invalid_slot_val,
1364  &join_column,
1365  &type_info,
1366  sd_inner_proxy,
1367  sd_outer_proxy](auto cpu_thread_idx,
1368  auto cpu_thread_count) {
1370  (count_buff,
1371  invalid_slot_val,
1372  join_column,
1373  type_info,
1374  sd_inner_proxy,
1375  sd_outer_proxy,
1376  cpu_thread_idx,
1377  cpu_thread_count);
1378  };
1379  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1380  buff,
1381  invalid_slot_val,
1382  &join_column,
1383  &type_info,
1384  sd_inner_proxy,
1385  sd_outer_proxy](auto cpu_thread_idx,
1386  auto cpu_thread_count) {
1388  (buff,
1389  hash_entry_count,
1390  invalid_slot_val,
1391  join_column,
1392  type_info,
1393  sd_inner_proxy,
1394  sd_outer_proxy,
1395  cpu_thread_idx,
1396  cpu_thread_count);
1397  };
1398 
1400  hash_entry_info.hash_entry_count,
1401  invalid_slot_val,
1402  join_column,
1403  type_info,
1404  sd_inner_proxy,
1405  sd_outer_proxy,
1406  cpu_thread_count,
1407  launch_count_matches,
1408  launch_fill_row_ids);
1409 }
1410 
1412  const HashEntryInfo hash_entry_info,
1413  const int32_t invalid_slot_val,
1414  const JoinColumn& join_column,
1415  const JoinColumnTypeInfo& type_info,
1416  const void* sd_inner_proxy,
1417  const void* sd_outer_proxy,
1418  const unsigned cpu_thread_count) {
1419  auto bucket_normalization = hash_entry_info.bucket_normalization;
1420  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1421  auto launch_count_matches = [bucket_normalization,
1422  count_buff = buff + hash_entry_count,
1423  invalid_slot_val,
1424  &join_column,
1425  &type_info,
1426  sd_inner_proxy,
1427  sd_outer_proxy](auto cpu_thread_idx,
1428  auto cpu_thread_count) {
1430  (count_buff,
1431  invalid_slot_val,
1432  join_column,
1433  type_info,
1434  sd_inner_proxy,
1435  sd_outer_proxy,
1436  cpu_thread_idx,
1437  cpu_thread_count,
1438  bucket_normalization);
1439  };
1440  auto launch_fill_row_ids = [bucket_normalization,
1441  hash_entry_count,
1442  buff,
1443  invalid_slot_val,
1444  &join_column,
1445  &type_info,
1446  sd_inner_proxy,
1447  sd_outer_proxy](auto cpu_thread_idx,
1448  auto cpu_thread_count) {
1450  (buff,
1451  hash_entry_count,
1452  invalid_slot_val,
1453  join_column,
1454  type_info,
1455  sd_inner_proxy,
1456  sd_outer_proxy,
1457  cpu_thread_idx,
1458  cpu_thread_count,
1459  bucket_normalization);
1460  };
1461 
1463  hash_entry_count,
1464  invalid_slot_val,
1465  join_column,
1466  type_info,
1467  sd_inner_proxy,
1468  sd_outer_proxy,
1469  cpu_thread_count,
1470  launch_count_matches,
1471  launch_fill_row_ids);
1472 }
1473 
1474 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1476  int32_t* buff,
1477  const int32_t hash_entry_count,
1478  const int32_t invalid_slot_val,
1479  const JoinColumn& join_column,
1480  const JoinColumnTypeInfo& type_info,
1481  const ShardInfo& shard_info,
1482  const void* sd_inner_proxy,
1483  const void* sd_outer_proxy,
1484  const unsigned cpu_thread_count,
1485  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1486  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1487  int32_t* pos_buff = buff;
1488  int32_t* count_buff = buff + hash_entry_count;
1489  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1490  std::vector<std::future<void>> counter_threads;
1491  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1492  counter_threads.push_back(std::async(
1493  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1494  }
1495 
1496  for (auto& child : counter_threads) {
1497  child.get();
1498  }
1499 
1500  std::vector<int32_t> count_copy(hash_entry_count, 0);
1501  CHECK_GT(hash_entry_count, int32_t(0));
1502  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1504  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1505  std::vector<std::future<void>> pos_threads;
1506  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1507  pos_threads.push_back(std::async(
1508  std::launch::async,
1509  [&](const unsigned thread_idx) {
1510  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1511  i += cpu_thread_count) {
1512  if (count_buff[i]) {
1513  pos_buff[i] = count_copy[i];
1514  }
1515  }
1516  },
1517  cpu_thread_idx));
1518  }
1519  for (auto& child : pos_threads) {
1520  child.get();
1521  }
1522 
1523  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1524  std::vector<std::future<void>> rowid_threads;
1525  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1526  rowid_threads.push_back(std::async(
1527  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1528  }
1529 
1530  for (auto& child : rowid_threads) {
1531  child.get();
1532  }
1533 }
1534 
1536  const int32_t hash_entry_count,
1537 
1538  const int32_t invalid_slot_val,
1539  const JoinColumn& join_column,
1540  const JoinColumnTypeInfo& type_info,
1541  const ShardInfo& shard_info,
1542  const void* sd_inner_proxy,
1543  const void* sd_outer_proxy,
1544  const unsigned cpu_thread_count) {
1545  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1546  invalid_slot_val,
1547  &join_column,
1548  &type_info,
1549  &shard_info
1550 #ifndef __CUDACC__
1551  ,
1552  sd_inner_proxy,
1553  sd_outer_proxy
1554 #endif
1555  ](auto cpu_thread_idx, auto cpu_thread_count) {
1556  return SUFFIX(count_matches_sharded)(count_buff,
1557  invalid_slot_val,
1558  join_column,
1559  type_info,
1560  shard_info
1561 #ifndef __CUDACC__
1562  ,
1563  sd_inner_proxy,
1564  sd_outer_proxy,
1565  cpu_thread_idx,
1566  cpu_thread_count
1567 #endif
1568  );
1569  };
1570 
1571  auto launch_fill_row_ids = [buff,
1572  hash_entry_count,
1573  invalid_slot_val,
1574  &join_column,
1575  &type_info,
1576  &shard_info
1577 #ifndef __CUDACC__
1578  ,
1579  sd_inner_proxy,
1580  sd_outer_proxy
1581 #endif
1582  ](auto cpu_thread_idx, auto cpu_thread_count) {
1583  return SUFFIX(fill_row_ids_sharded)(buff,
1584  hash_entry_count,
1585  invalid_slot_val,
1586  join_column,
1587  type_info,
1588  shard_info
1589 #ifndef __CUDACC__
1590  ,
1591  sd_inner_proxy,
1592  sd_outer_proxy,
1593  cpu_thread_idx,
1594  cpu_thread_count);
1595 #endif
1596  };
1597 
1599  hash_entry_count,
1600  invalid_slot_val,
1601  join_column,
1602  type_info,
1603  shard_info
1604 #ifndef __CUDACC__
1605  ,
1606  sd_inner_proxy,
1607  sd_outer_proxy,
1608  cpu_thread_count
1609 #endif
1610  ,
1611  launch_count_matches,
1612  launch_fill_row_ids);
1613 }
1614 
1615 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1616  const int32_t entry_count,
1617  const size_t key_component_count,
1618  const bool with_val_slot,
1619  const int32_t invalid_slot_val,
1620  const int32_t cpu_thread_idx,
1621  const int32_t cpu_thread_count) {
1622  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1623  entry_count,
1624  key_component_count,
1625  with_val_slot,
1626  invalid_slot_val,
1627  cpu_thread_idx,
1628  cpu_thread_count);
1629 }
1630 
1631 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1632  const int32_t entry_count,
1633  const size_t key_component_count,
1634  const bool with_val_slot,
1635  const int32_t invalid_slot_val,
1636  const int32_t cpu_thread_idx,
1637  const int32_t cpu_thread_count) {
1638  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1639  entry_count,
1640  key_component_count,
1641  with_val_slot,
1642  invalid_slot_val,
1643  cpu_thread_idx,
1644  cpu_thread_count);
1645 }
1646 
1647 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1648  const size_t entry_count,
1649  const int32_t invalid_slot_val,
1650  const size_t key_component_count,
1651  const bool with_val_slot,
1652  const GenericKeyHandler* key_handler,
1653  const size_t num_elems,
1654  const int32_t cpu_thread_idx,
1655  const int32_t cpu_thread_count) {
1656  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1657  entry_count,
1658  invalid_slot_val,
1659  key_component_count,
1660  with_val_slot,
1661  key_handler,
1662  num_elems,
1663  cpu_thread_idx,
1664  cpu_thread_count);
1665 }
1666 
1668  const size_t entry_count,
1669  const int32_t invalid_slot_val,
1670  const size_t key_component_count,
1671  const bool with_val_slot,
1672  const OverlapsKeyHandler* key_handler,
1673  const size_t num_elems,
1674  const int32_t cpu_thread_idx,
1675  const int32_t cpu_thread_count) {
1676  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1677  entry_count,
1678  invalid_slot_val,
1679  key_component_count,
1680  with_val_slot,
1681  key_handler,
1682  num_elems,
1683  cpu_thread_idx,
1684  cpu_thread_count);
1685 }
1686 
1687 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1688  const size_t entry_count,
1689  const int32_t invalid_slot_val,
1690  const size_t key_component_count,
1691  const bool with_val_slot,
1692  const GenericKeyHandler* key_handler,
1693  const size_t num_elems,
1694  const int32_t cpu_thread_idx,
1695  const int32_t cpu_thread_count) {
1696  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1697  entry_count,
1698  invalid_slot_val,
1699  key_component_count,
1700  with_val_slot,
1701  key_handler,
1702  num_elems,
1703  cpu_thread_idx,
1704  cpu_thread_count);
1705 }
1706 
1708  const size_t entry_count,
1709  const int32_t invalid_slot_val,
1710  const size_t key_component_count,
1711  const bool with_val_slot,
1712  const OverlapsKeyHandler* key_handler,
1713  const size_t num_elems,
1714  const int32_t cpu_thread_idx,
1715  const int32_t cpu_thread_count) {
1716  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1717  entry_count,
1718  invalid_slot_val,
1719  key_component_count,
1720  with_val_slot,
1721  key_handler,
1722  num_elems,
1723  cpu_thread_idx,
1724  cpu_thread_count);
1725 }
1726 
1727 template <typename T>
1729  int32_t* buff,
1730  const T* composite_key_dict,
1731  const size_t hash_entry_count,
1732  const int32_t invalid_slot_val,
1733  const size_t key_component_count,
1734  const std::vector<JoinColumn>& join_column_per_key,
1735  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1736  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1737  const std::vector<const void*>& sd_inner_proxy_per_key,
1738  const std::vector<const void*>& sd_outer_proxy_per_key,
1739  const size_t cpu_thread_count) {
1740  int32_t* pos_buff = buff;
1741  int32_t* count_buff = buff + hash_entry_count;
1742  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1743  std::vector<std::future<void>> counter_threads;
1744  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1745  if (join_buckets_per_key.size() > 0) {
1746  counter_threads.push_back(
1747  std::async(std::launch::async,
1748  [count_buff,
1749  composite_key_dict,
1750  &hash_entry_count,
1751  &join_buckets_per_key,
1752  &join_column_per_key,
1753  cpu_thread_idx,
1754  cpu_thread_count] {
1755  const auto key_handler = OverlapsKeyHandler(
1756  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1757  &join_column_per_key[0],
1758  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1759  count_matches_baseline(count_buff,
1760  composite_key_dict,
1761  hash_entry_count,
1762  &key_handler,
1763  join_column_per_key[0].num_elems,
1764  cpu_thread_idx,
1765  cpu_thread_count);
1766  }));
1767  } else {
1768  counter_threads.push_back(std::async(
1769  std::launch::async,
1770  [count_buff,
1771  composite_key_dict,
1772  &key_component_count,
1773  &hash_entry_count,
1774  &join_column_per_key,
1775  &type_info_per_key,
1776  &sd_inner_proxy_per_key,
1777  &sd_outer_proxy_per_key,
1778  cpu_thread_idx,
1779  cpu_thread_count] {
1780  const auto key_handler = GenericKeyHandler(key_component_count,
1781  true,
1782  &join_column_per_key[0],
1783  &type_info_per_key[0],
1784  &sd_inner_proxy_per_key[0],
1785  &sd_outer_proxy_per_key[0]);
1786  count_matches_baseline(count_buff,
1787  composite_key_dict,
1788  hash_entry_count,
1789  &key_handler,
1790  join_column_per_key[0].num_elems,
1791  cpu_thread_idx,
1792  cpu_thread_count);
1793  }));
1794  }
1795  }
1796 
1797  for (auto& child : counter_threads) {
1798  child.get();
1799  }
1800 
1801  std::vector<int32_t> count_copy(hash_entry_count, 0);
1802  CHECK_GT(hash_entry_count, 0u);
1803  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1805  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1806  std::vector<std::future<void>> pos_threads;
1807  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1808  pos_threads.push_back(std::async(
1809  std::launch::async,
1810  [&](const int thread_idx) {
1811  for (size_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1812  if (count_buff[i]) {
1813  pos_buff[i] = count_copy[i];
1814  }
1815  }
1816  },
1817  cpu_thread_idx));
1818  }
1819  for (auto& child : pos_threads) {
1820  child.get();
1821  }
1822 
1823  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1824  std::vector<std::future<void>> rowid_threads;
1825  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1826  if (join_buckets_per_key.size() > 0) {
1827  rowid_threads.push_back(
1828  std::async(std::launch::async,
1829  [buff,
1830  composite_key_dict,
1831  hash_entry_count,
1832  invalid_slot_val,
1833  &join_column_per_key,
1834  &join_buckets_per_key,
1835  cpu_thread_idx,
1836  cpu_thread_count] {
1837  const auto key_handler = OverlapsKeyHandler(
1838  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1839  &join_column_per_key[0],
1840  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1842  (buff,
1843  composite_key_dict,
1844  hash_entry_count,
1845  invalid_slot_val,
1846  &key_handler,
1847  join_column_per_key[0].num_elems,
1848  cpu_thread_idx,
1849  cpu_thread_count);
1850  }));
1851  } else {
1852  rowid_threads.push_back(std::async(std::launch::async,
1853  [buff,
1854  composite_key_dict,
1855  hash_entry_count,
1856  invalid_slot_val,
1857  key_component_count,
1858  &join_column_per_key,
1859  &type_info_per_key,
1860  &sd_inner_proxy_per_key,
1861  &sd_outer_proxy_per_key,
1862  cpu_thread_idx,
1863  cpu_thread_count] {
1864  const auto key_handler = GenericKeyHandler(
1865  key_component_count,
1866  true,
1867  &join_column_per_key[0],
1868  &type_info_per_key[0],
1869  &sd_inner_proxy_per_key[0],
1870  &sd_outer_proxy_per_key[0]);
1872  (buff,
1873  composite_key_dict,
1874  hash_entry_count,
1875  invalid_slot_val,
1876  &key_handler,
1877  join_column_per_key[0].num_elems,
1878  cpu_thread_idx,
1879  cpu_thread_count);
1880  }));
1881  }
1882  }
1883 
1884  for (auto& child : rowid_threads) {
1885  child.get();
1886  }
1887 }
1888 
1890  int32_t* buff,
1891  const int32_t* composite_key_dict,
1892  const size_t hash_entry_count,
1893  const int32_t invalid_slot_val,
1894  const size_t key_component_count,
1895  const std::vector<JoinColumn>& join_column_per_key,
1896  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1897  const std::vector<JoinBucketInfo>& join_bucket_info,
1898  const std::vector<const void*>& sd_inner_proxy_per_key,
1899  const std::vector<const void*>& sd_outer_proxy_per_key,
1900  const int32_t cpu_thread_count) {
1901  fill_one_to_many_baseline_hash_table<int32_t>(buff,
1902  composite_key_dict,
1903  hash_entry_count,
1904  invalid_slot_val,
1905  key_component_count,
1906  join_column_per_key,
1907  type_info_per_key,
1908  join_bucket_info,
1909  sd_inner_proxy_per_key,
1910  sd_outer_proxy_per_key,
1911  cpu_thread_count);
1912 }
1913 
1915  int32_t* buff,
1916  const int64_t* composite_key_dict,
1917  const size_t hash_entry_count,
1918  const int32_t invalid_slot_val,
1919  const size_t key_component_count,
1920  const std::vector<JoinColumn>& join_column_per_key,
1921  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1922  const std::vector<JoinBucketInfo>& join_bucket_info,
1923  const std::vector<const void*>& sd_inner_proxy_per_key,
1924  const std::vector<const void*>& sd_outer_proxy_per_key,
1925  const int32_t cpu_thread_count) {
1926  fill_one_to_many_baseline_hash_table<int64_t>(buff,
1927  composite_key_dict,
1928  hash_entry_count,
1929  invalid_slot_val,
1930  key_component_count,
1931  join_column_per_key,
1932  type_info_per_key,
1933  join_bucket_info,
1934  sd_inner_proxy_per_key,
1935  sd_outer_proxy_per_key,
1936  cpu_thread_count);
1937 }
1938 
1939 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
1940  const uint32_t b,
1941  const size_t padded_size_bytes,
1942  const std::vector<JoinColumn>& join_column_per_key,
1943  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1944  const int thread_count) {
1945  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1946  CHECK(!join_column_per_key.empty());
1947 
1948  std::vector<std::future<void>> approx_distinct_threads;
1949  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1950  approx_distinct_threads.push_back(std::async(
1951  std::launch::async,
1952  [&join_column_per_key,
1953  &type_info_per_key,
1954  b,
1955  hll_buffer_all_cpus,
1956  padded_size_bytes,
1957  thread_idx,
1958  thread_count] {
1959  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1960 
1961  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
1962  false,
1963  &join_column_per_key[0],
1964  &type_info_per_key[0],
1965  nullptr,
1966  nullptr);
1968  nullptr,
1969  b,
1970  join_column_per_key[0].num_elems,
1971  &key_handler,
1972  thread_idx,
1973  thread_count);
1974  }));
1975  }
1976  for (auto& child : approx_distinct_threads) {
1977  child.get();
1978  }
1979 }
1980 
1982  uint8_t* hll_buffer_all_cpus,
1983  std::vector<int32_t>& row_counts,
1984  const uint32_t b,
1985  const size_t padded_size_bytes,
1986  const std::vector<JoinColumn>& join_column_per_key,
1987  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1988  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1989  const int thread_count) {
1990  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
1991  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1992  CHECK(!join_column_per_key.empty());
1993 
1994  std::vector<std::future<void>> approx_distinct_threads;
1995  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1996  approx_distinct_threads.push_back(std::async(
1997  std::launch::async,
1998  [&join_column_per_key,
1999  &join_buckets_per_key,
2000  &row_counts,
2001  b,
2002  hll_buffer_all_cpus,
2003  padded_size_bytes,
2004  thread_idx,
2005  thread_count] {
2006  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2007 
2008  const auto key_handler = OverlapsKeyHandler(
2009  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
2010  &join_column_per_key[0],
2011  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
2013  row_counts.data(),
2014  b,
2015  join_column_per_key[0].num_elems,
2016  &key_handler,
2017  thread_idx,
2018  thread_count);
2019  }));
2020  }
2021  for (auto& child : approx_distinct_threads) {
2022  child.get();
2023  }
2024 
2026  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2027 }
2028 
2029 void compute_bucket_sizes(std::vector<double>& bucket_sizes_for_dimension,
2030  const JoinColumn& join_column,
2031  const JoinColumnTypeInfo& type_info,
2032  const double bucket_size_threshold,
2033  const int thread_count) {
2034  std::vector<std::vector<double>> bucket_sizes_for_threads;
2035  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2036  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(),
2037  std::numeric_limits<double>::max());
2038  }
2039  std::vector<std::future<void>> threads;
2040  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2041  threads.push_back(std::async(std::launch::async,
2042  compute_bucket_sizes_impl<2>,
2043  bucket_sizes_for_threads[thread_idx].data(),
2044  &join_column,
2045  &type_info,
2046  bucket_size_threshold,
2047  thread_idx,
2048  thread_count));
2049  }
2050  for (auto& child : threads) {
2051  child.get();
2052  }
2053 
2054  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2055  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2056  if (bucket_sizes_for_threads[thread_idx][i] < bucket_sizes_for_dimension[i]) {
2057  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2058  }
2059  }
2060  }
2061 }
2062 
2063 #endif // ifndef __CUDACC__
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
const size_t num_shards
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const size_t entry_count, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const size_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
size_t getNormalizedHashEntryCount() const
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:31
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const size_t hash_entry_size)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count, const size_t key_size_in_bytes)
#define GLOBAL
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define SUFFIX(name)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define CHECK_GT(x, y)
Definition: Logger.h:209
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
DEVICE int SUFFIX() fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const FILL_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEVICE
DEVICE FORCE_INLINE const int8_t * ptr() const
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:39
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:45
static constexpr int32_t INVALID_STR_ID
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const size_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE T SUFFIX() get_invalid_key()
std::string getString(int32_t string_id) const
#define CHECK_NE(x, y)
Definition: Logger.h:206
#define mapd_cas(address, compare, val)
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int64_t bucket_normalization
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
const size_t shard
size_t hash_entry_count
#define UNLIKELY(x)
Definition: likely.h:20
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:60
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
#define store_cst(ptr, val)
#define CHECK(condition)
Definition: Logger.h:197
void fill_one_to_many_hash_table_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:126
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const size_t cpu_thread_count)
#define mapd_add(address, val)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
DEVICE void fill_row_ids_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)