OmniSciDB  c07336695a
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 #include "../Shared/shard_key.h"
19 #include "CompareKeysInl.h"
20 #include "HashJoinKeyHandlers.h"
21 #include "HyperLogLogRank.h"
22 #include "MurmurHash1Inl.h"
23 #ifdef __CUDACC__
24 #include "DecodersImpl.h"
25 #include "GpuRtConstants.h"
26 #include "JoinHashImpl.h"
27 #else
28 #include "Shared/Logger.h"
29 
30 #include "RuntimeFunctions.h"
31 #include "Shared/likely.h"
34 
35 #include <future>
36 #endif
37 
38 #if HAVE_CUDA
39 #include <thrust/scan.h>
40 #endif
41 #include "../Shared/funcannotations.h"
42 
43 #include <cmath>
44 #include <numeric>
45 
46 #ifndef __CUDACC__
47 namespace {
48 
69 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
70  const int64_t min_elem,
71  const int64_t max_elem,
72  const void* sd_inner_proxy,
73  const void* sd_outer_proxy) {
74  CHECK(sd_outer_proxy);
75  const auto sd_inner_dict_proxy =
76  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
77  const auto sd_outer_dict_proxy =
78  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
79  const auto elem_str = sd_inner_dict_proxy->getString(elem);
80  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
81  if (outer_id > max_elem || outer_id < min_elem) {
83  }
84  return outer_id;
85 }
86 
87 } // namespace
88 #endif
89 
90 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
91  const int32_t hash_entry_count,
92  const int32_t invalid_slot_val,
93  const int32_t cpu_thread_idx,
94  const int32_t cpu_thread_count) {
95 #ifdef __CUDACC__
96  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
97  int32_t step = blockDim.x * gridDim.x;
98 #else
99  int32_t start = cpu_thread_idx;
100  int32_t step = cpu_thread_count;
101 #endif
102  for (int32_t i = start; i < hash_entry_count; i += step) {
103  groups_buffer[i] = invalid_slot_val;
104  }
105 }
106 
107 #ifdef __CUDACC__
108 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
109 #else
110 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
111 #endif
112 
113 template <typename SLOT_SELECTOR>
114 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
115  const int32_t invalid_slot_val,
116  const JoinColumn join_column,
117  const JoinColumnTypeInfo type_info,
118  const void* sd_inner_proxy,
119  const void* sd_outer_proxy,
120  const int32_t cpu_thread_idx,
121  const int32_t cpu_thread_count,
122  SLOT_SELECTOR slot_sel) {
123 #ifdef __CUDACC__
124  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
125  int32_t step = blockDim.x * gridDim.x;
126 #else
127  int32_t start = cpu_thread_idx;
128  int32_t step = cpu_thread_count;
129 #endif
130  for (size_t i = start; i < join_column.num_elems; i += step) {
131  int64_t elem = get_join_column_element_value(type_info, join_column, i);
132  if (elem == type_info.null_val) {
133  if (type_info.uses_bw_eq) {
134  elem = type_info.translated_null_val;
135  } else {
136  continue;
137  }
138  }
139 #ifndef __CUDACC__
140  if (sd_inner_proxy &&
141  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
142  const auto outer_id = translate_str_id_to_outer_dict(
143  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
144  if (outer_id == StringDictionary::INVALID_STR_ID) {
145  continue;
146  }
147  elem = outer_id;
148  }
149  CHECK_GE(elem, type_info.min_val)
150  << "Element " << elem << " less than min val " << type_info.min_val;
151 #endif
152  int32_t* entry_ptr = slot_sel(elem);
153  if (mapd_cas(entry_ptr, invalid_slot_val, i) != invalid_slot_val) {
154  return -1;
155  }
156  }
157  return 0;
158 };
159 
161  const int32_t invalid_slot_val,
162  const JoinColumn join_column,
163  const JoinColumnTypeInfo type_info,
164  const void* sd_inner_proxy,
165  const void* sd_outer_proxy,
166  const int32_t cpu_thread_idx,
167  const int32_t cpu_thread_count,
168  const int64_t bucket_normalization) {
169  auto slot_selector = [&](auto elem) {
171  buff, elem, type_info.min_val, bucket_normalization);
172  };
173  return fill_hash_join_buff_impl(buff,
174  invalid_slot_val,
175  join_column,
176  type_info,
177  sd_inner_proxy,
178  sd_outer_proxy,
179  cpu_thread_idx,
180  cpu_thread_count,
181  slot_selector);
182 }
183 
184 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
185  const int32_t invalid_slot_val,
186  const JoinColumn join_column,
187  const JoinColumnTypeInfo type_info,
188  const void* sd_inner_proxy,
189  const void* sd_outer_proxy,
190  const int32_t cpu_thread_idx,
191  const int32_t cpu_thread_count) {
192  auto slot_selector = [&](auto elem) {
193  return SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
194  };
195  return fill_hash_join_buff_impl(buff,
196  invalid_slot_val,
197  join_column,
198  type_info,
199  sd_inner_proxy,
200  sd_outer_proxy,
201  cpu_thread_idx,
202  cpu_thread_count,
203  slot_selector);
204 }
205 
206 template <typename SLOT_SELECTOR>
208  const int32_t invalid_slot_val,
209  const JoinColumn join_column,
210  const JoinColumnTypeInfo type_info,
211  const ShardInfo shard_info,
212  const void* sd_inner_proxy,
213  const void* sd_outer_proxy,
214  const int32_t cpu_thread_idx,
215  const int32_t cpu_thread_count,
216  SLOT_SELECTOR slot_sel) {
217 #ifdef __CUDACC__
218  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
219  int32_t step = blockDim.x * gridDim.x;
220 #else
221  int32_t start = cpu_thread_idx;
222  int32_t step = cpu_thread_count;
223 #endif
224  for (size_t i = start; i < join_column.num_elems; i += step) {
225  int64_t elem = get_join_column_element_value(type_info, join_column, i);
226  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
227  if (shard != shard_info.shard) {
228  continue;
229  }
230  if (elem == type_info.null_val) {
231  if (type_info.uses_bw_eq) {
232  elem = type_info.translated_null_val;
233  } else {
234  continue;
235  }
236  }
237 #ifndef __CUDACC__
238  if (sd_inner_proxy &&
239  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
240  const auto outer_id = translate_str_id_to_outer_dict(
241  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
242  if (outer_id == StringDictionary::INVALID_STR_ID) {
243  continue;
244  }
245  elem = outer_id;
246  }
247  CHECK_GE(elem, type_info.min_val)
248  << "Element " << elem << " less than min val " << type_info.min_val;
249 #endif
250  int32_t* entry_ptr = slot_sel(elem);
251  if (mapd_cas(entry_ptr, invalid_slot_val, i) != invalid_slot_val) {
252  return -1;
253  }
254  }
255  return 0;
256 }
257 
259  int32_t* buff,
260  const int32_t invalid_slot_val,
261  const JoinColumn join_column,
262  const JoinColumnTypeInfo type_info,
263  const ShardInfo shard_info,
264  const void* sd_inner_proxy,
265  const void* sd_outer_proxy,
266  const int32_t cpu_thread_idx,
267  const int32_t cpu_thread_count,
268  const int64_t bucket_normalization) {
269  auto slot_selector = [&](auto elem) -> auto {
271  elem,
272  type_info.min_val,
273  shard_info.entry_count_per_shard,
274  shard_info.num_shards,
275  shard_info.device_count,
276  bucket_normalization);
277  };
278 
280  invalid_slot_val,
281  join_column,
282  type_info,
283  shard_info,
284  sd_inner_proxy,
285  sd_outer_proxy,
286  cpu_thread_idx,
287  cpu_thread_count,
288  slot_selector);
289 }
290 
292  const int32_t invalid_slot_val,
293  const JoinColumn join_column,
294  const JoinColumnTypeInfo type_info,
295  const ShardInfo shard_info,
296  const void* sd_inner_proxy,
297  const void* sd_outer_proxy,
298  const int32_t cpu_thread_idx,
299  const int32_t cpu_thread_count) {
300  auto slot_selector = [&](auto elem) {
301  return SUFFIX(get_hash_slot_sharded)(buff,
302  elem,
303  type_info.min_val,
304  shard_info.entry_count_per_shard,
305  shard_info.num_shards,
306  shard_info.device_count);
307  };
309  invalid_slot_val,
310  join_column,
311  type_info,
312  shard_info,
313  sd_inner_proxy,
314  sd_outer_proxy,
315  cpu_thread_idx,
316  cpu_thread_count,
317  slot_selector);
318 }
319 
320 template <typename T>
322  const size_t entry_count,
323  const size_t key_component_count,
324  const bool with_val_slot,
325  const int32_t invalid_slot_val,
326  const int32_t cpu_thread_idx,
327  const int32_t cpu_thread_count) {
328 #ifdef __CUDACC__
329  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
330  int32_t step = blockDim.x * gridDim.x;
331 #else
332  int32_t start = cpu_thread_idx;
333  int32_t step = cpu_thread_count;
334 #endif
335  const T empty_key = SUFFIX(get_invalid_key)<T>();
336  for (uint32_t h = start; h < entry_count; h += step) {
337  uint32_t off = h * (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
338  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
339  for (size_t i = 0; i < key_component_count; ++i) {
340  row_ptr[i] = empty_key;
341  }
342  if (with_val_slot) {
343  row_ptr[key_component_count] = invalid_slot_val;
344  }
345  }
346 }
347 
348 #ifdef __CUDACC__
349 template <typename T>
350 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
351  const uint32_t h,
352  const T* key,
353  const size_t key_component_count,
354  const bool with_val_slot) {
355  uint32_t off = h * (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
356  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
357  const T empty_key = SUFFIX(get_invalid_key)<T>();
358  {
359  const T old = atomicCAS(row_ptr, empty_key, *key);
360  if (empty_key == old && key_component_count > 1) {
361  for (size_t i = 1; i <= key_component_count - 1; ++i) {
362  atomicExch(row_ptr + i, key[i]);
363  }
364  }
365  }
366  if (key_component_count > 1) {
367  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
368  // spin until the winning thread has finished writing the entire key and the init
369  // value
370  }
371  }
372  bool match = true;
373  for (uint32_t i = 0; i < key_component_count; ++i) {
374  if (row_ptr[i] != key[i]) {
375  match = false;
376  break;
377  }
378  }
379 
380  if (match) {
381  return reinterpret_cast<T*>(row_ptr + key_component_count);
382  }
383  return nullptr;
384 }
385 #else
386 
387 #define cas_cst(ptr, expected, desired) \
388  __atomic_compare_exchange_n( \
389  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
390 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
391 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
392 
393 template <typename T>
395  const uint32_t h,
396  const T* key,
397  const size_t key_component_count,
398  const bool with_val_slot) {
399  uint32_t off = h * (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
400  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
401  T empty_key = SUFFIX(get_invalid_key)<T>();
402  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
403  if (UNLIKELY(*key == write_pending)) {
404  // Address the singularity case where the first column contains the pending
405  // write special value. Should never happen, but avoid doing wrong things.
406  return nullptr;
407  }
408  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
409  if (success) {
410  if (key_component_count > 1) {
411  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
412  }
413  store_cst(row_ptr, *key);
414  return reinterpret_cast<T*>(row_ptr + key_component_count);
415  }
416  while (load_cst(row_ptr) == write_pending) {
417  // spin until the winning thread has finished writing the entire key
418  }
419  for (size_t i = 0; i < key_component_count; ++i) {
420  if (load_cst(row_ptr + i) != key[i]) {
421  return nullptr;
422  }
423  }
424  return reinterpret_cast<T*>(row_ptr + key_component_count);
425 }
426 
427 #undef load_cst
428 #undef store_cst
429 #undef cas_cst
430 
431 #endif // __CUDACC__
432 
433 template <typename T>
434 DEVICE int write_baseline_hash_slot(const int32_t val,
435  int8_t* hash_buff,
436  const size_t entry_count,
437  const T* key,
438  const size_t key_component_count,
439  const bool with_val_slot,
440  const int32_t invalid_slot_val) {
441  const uint32_t h =
442  MurmurHash1Impl(key, key_component_count * sizeof(T), 0) % entry_count;
443  T* matching_group = get_matching_baseline_hash_slot_at(
444  hash_buff, h, key, key_component_count, with_val_slot);
445  if (!matching_group) {
446  uint32_t h_probe = (h + 1) % entry_count;
447  while (h_probe != h) {
448  matching_group = get_matching_baseline_hash_slot_at(
449  hash_buff, h_probe, key, key_component_count, with_val_slot);
450  if (matching_group) {
451  break;
452  }
453  h_probe = (h_probe + 1) % entry_count;
454  }
455  }
456  if (!matching_group) {
457  return -2;
458  }
459  if (!with_val_slot) {
460  return 0;
461  }
462  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
463  return -1;
464  }
465  return 0;
466 }
467 
468 template <typename T, typename FILL_HANDLER>
470  const size_t entry_count,
471  const int32_t invalid_slot_val,
472  const size_t key_component_count,
473  const bool with_val_slot,
474  const FILL_HANDLER* f,
475  const size_t num_elems,
476  const int32_t cpu_thread_idx,
477  const int32_t cpu_thread_count) {
478 #ifdef __CUDACC__
479  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
480  int32_t step = blockDim.x * gridDim.x;
481 #else
482  int32_t start = cpu_thread_idx;
483  int32_t step = cpu_thread_count;
484 #endif
485 
486  T key_scratch_buff[g_maximum_conditions_to_coalesce];
487 
488  auto key_buff_handler = [hash_buff, entry_count, with_val_slot, invalid_slot_val](
489  const size_t entry_idx,
490  const T* key_scratch_buffer,
491  const size_t key_component_count) {
492  return write_baseline_hash_slot<T>(entry_idx,
493  hash_buff,
494  entry_count,
495  key_scratch_buffer,
496  key_component_count,
497  with_val_slot,
498  invalid_slot_val);
499  };
500 
501  for (size_t i = start; i < num_elems; i += step) {
502  const auto err = (*f)(i, key_scratch_buff, key_buff_handler);
503  if (err) {
504  return err;
505  }
506  }
507  return 0;
508 }
509 
510 #undef mapd_cas
511 
512 #ifdef __CUDACC__
513 #define mapd_add(address, val) atomicAdd(address, val)
514 #else
515 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
516 #endif
517 
518 template <typename SLOT_SELECTOR>
519 DEVICE void count_matches_impl(int32_t* count_buff,
520  const int32_t invalid_slot_val,
521  const JoinColumn join_column,
522  const JoinColumnTypeInfo type_info
523 #ifndef __CUDACC__
524  ,
525  const void* sd_inner_proxy,
526  const void* sd_outer_proxy,
527  const int32_t cpu_thread_idx,
528  const int32_t cpu_thread_count
529 #endif
530  ,
531  SLOT_SELECTOR slot_selector) {
532 #ifdef __CUDACC__
533  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
534  int32_t step = blockDim.x * gridDim.x;
535 #else
536  int32_t start = cpu_thread_idx;
537  int32_t step = cpu_thread_count;
538 #endif
539  for (size_t i = start; i < join_column.num_elems; i += step) {
540  int64_t elem = get_join_column_element_value(type_info, join_column, i);
541  if (elem == type_info.null_val) {
542  if (type_info.uses_bw_eq) {
543  elem = type_info.translated_null_val;
544  } else {
545  continue;
546  }
547  }
548 #ifndef __CUDACC__
549  if (sd_inner_proxy &&
550  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
551  const auto outer_id = translate_str_id_to_outer_dict(
552  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
553  if (outer_id == StringDictionary::INVALID_STR_ID) {
554  continue;
555  }
556  elem = outer_id;
557  }
558  CHECK_GE(elem, type_info.min_val)
559  << "Element " << elem << " less than min val " << type_info.min_val;
560 #endif
561  auto* entry_ptr = slot_selector(count_buff, elem);
562  mapd_add(entry_ptr, int32_t(1));
563  }
564 }
565 
566 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
567  const int32_t invalid_slot_val,
568  const JoinColumn join_column,
569  const JoinColumnTypeInfo type_info
570 #ifndef __CUDACC__
571  ,
572  const void* sd_inner_proxy,
573  const void* sd_outer_proxy,
574  const int32_t cpu_thread_idx,
575  const int32_t cpu_thread_count
576 #endif
577 ) {
578  auto slot_sel = [&type_info](auto count_buff, auto elem) {
579  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
580  };
581  count_matches_impl(count_buff,
582  invalid_slot_val,
583  join_column,
584  type_info
585 #ifndef __CUDACC__
586  ,
587  sd_inner_proxy,
588  sd_outer_proxy,
589  cpu_thread_idx,
590  cpu_thread_count
591 #endif
592  ,
593  slot_sel);
594 }
595 
596 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
597  const int32_t invalid_slot_val,
598  const JoinColumn join_column,
599  const JoinColumnTypeInfo type_info
600 #ifndef __CUDACC__
601  ,
602  const void* sd_inner_proxy,
603  const void* sd_outer_proxy,
604  const int32_t cpu_thread_idx,
605  const int32_t cpu_thread_count
606 #endif
607  ,
608  const int64_t bucket_normalization) {
609  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
611  count_buff, elem, type_info.min_val, bucket_normalization);
612  };
613  count_matches_impl(count_buff,
614  invalid_slot_val,
615  join_column,
616  type_info
617 #ifndef __CUDACC__
618  ,
619  sd_inner_proxy,
620  sd_outer_proxy,
621  cpu_thread_idx,
622  cpu_thread_count
623 #endif
624  ,
625  slot_sel);
626 }
627 
628 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
629  const int32_t invalid_slot_val,
630  const JoinColumn join_column,
631  const JoinColumnTypeInfo type_info,
632  const ShardInfo shard_info
633 #ifndef __CUDACC__
634  ,
635  const void* sd_inner_proxy,
636  const void* sd_outer_proxy,
637  const int32_t cpu_thread_idx,
638  const int32_t cpu_thread_count
639 #endif
640 ) {
641 #ifdef __CUDACC__
642  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
643  int32_t step = blockDim.x * gridDim.x;
644 #else
645  int32_t start = cpu_thread_idx;
646  int32_t step = cpu_thread_count;
647 #endif
648  for (size_t i = start; i < join_column.num_elems; i += step) {
649  int64_t elem = get_join_column_element_value(type_info, join_column, i);
650  if (elem == type_info.null_val) {
651  if (type_info.uses_bw_eq) {
652  elem = type_info.translated_null_val;
653  } else {
654  continue;
655  }
656  }
657 #ifndef __CUDACC__
658  if (sd_inner_proxy &&
659  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
660  const auto outer_id = translate_str_id_to_outer_dict(
661  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
662  if (outer_id == StringDictionary::INVALID_STR_ID) {
663  continue;
664  }
665  elem = outer_id;
666  }
667  CHECK_GE(elem, type_info.min_val)
668  << "Element " << elem << " less than min val " << type_info.min_val;
669 #endif
670  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
671  elem,
672  type_info.min_val,
673  shard_info.entry_count_per_shard,
674  shard_info.num_shards,
675  shard_info.device_count);
676  mapd_add(entry_ptr, int32_t(1));
677  }
678 }
679 
680 template <typename T>
682  const T* key,
683  const size_t key_component_count,
684  const T* composite_key_dict,
685  const size_t entry_count) {
686  const uint32_t h =
687  MurmurHash1Impl(key, key_component_count * sizeof(T), 0) % entry_count;
688  uint32_t off = h * key_component_count;
689  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
690  return &composite_key_dict[off];
691  }
692  uint32_t h_probe = (h + 1) % entry_count;
693  while (h_probe != h) {
694  off = h_probe * key_component_count;
695  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
696  return &composite_key_dict[off];
697  }
698  h_probe = (h_probe + 1) % entry_count;
699  }
700 #ifndef __CUDACC__
701  CHECK(false);
702 #else
703  assert(false);
704 #endif
705  return nullptr;
706 }
707 
708 template <typename T, typename KEY_HANDLER>
709 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
710  const T* composite_key_dict,
711  const size_t entry_count,
712  const KEY_HANDLER* f,
713  const size_t num_elems
714 #ifndef __CUDACC__
715  ,
716  const int32_t cpu_thread_idx,
717  const int32_t cpu_thread_count
718 #endif
719 ) {
720 #ifdef __CUDACC__
721  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
722  int32_t step = blockDim.x * gridDim.x;
723 #else
724  int32_t start = cpu_thread_idx;
725  int32_t step = cpu_thread_count;
726 #endif
727 #ifdef __CUDACC__
728  assert(composite_key_dict);
729 #endif
730  T key_scratch_buff[g_maximum_conditions_to_coalesce];
731 
732  auto key_buff_handler = [composite_key_dict, entry_count, count_buff](
733  const size_t row_entry_idx,
734  const T* key_scratch_buff,
735  const size_t key_component_count) {
736  const auto matching_group = SUFFIX(get_matching_baseline_hash_slot_readonly)(
737  key_scratch_buff, key_component_count, composite_key_dict, entry_count);
738  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
739  mapd_add(&count_buff[entry_idx], int32_t(1));
740  return 0;
741  };
742 
743  for (size_t i = start; i < num_elems; i += step) {
744  (*f)(i, key_scratch_buff, key_buff_handler);
745  }
746 }
747 
748 template <typename SLOT_SELECTOR>
749 DEVICE void fill_row_ids_impl(int32_t* buff,
750  const int32_t hash_entry_count,
751  const int32_t invalid_slot_val,
752  const JoinColumn join_column,
753  const JoinColumnTypeInfo type_info
754 #ifndef __CUDACC__
755  ,
756  const void* sd_inner_proxy,
757  const void* sd_outer_proxy,
758  const int32_t cpu_thread_idx,
759  const int32_t cpu_thread_count
760 #endif
761  ,
762  SLOT_SELECTOR slot_selector) {
763  int32_t* pos_buff = buff;
764  int32_t* count_buff = buff + hash_entry_count;
765  int32_t* id_buff = count_buff + hash_entry_count;
766 
767 #ifdef __CUDACC__
768  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
769  int32_t step = blockDim.x * gridDim.x;
770 #else
771  int32_t start = cpu_thread_idx;
772  int32_t step = cpu_thread_count;
773 #endif
774  for (size_t i = start; i < join_column.num_elems; i += step) {
775  int64_t elem = get_join_column_element_value(type_info, join_column, i);
776  if (elem == type_info.null_val) {
777  if (type_info.uses_bw_eq) {
778  elem = type_info.translated_null_val;
779  } else {
780  continue;
781  }
782  }
783 #ifndef __CUDACC__
784  if (sd_inner_proxy &&
785  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
786  const auto outer_id = translate_str_id_to_outer_dict(
787  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
788  if (outer_id == StringDictionary::INVALID_STR_ID) {
789  continue;
790  }
791  elem = outer_id;
792  }
793  CHECK_GE(elem, type_info.min_val)
794  << "Element " << elem << " less than min val " << type_info.min_val;
795 #endif
796  auto pos_ptr = slot_selector(pos_buff, elem);
797 #ifndef __CUDACC__
798  CHECK_NE(*pos_ptr, invalid_slot_val);
799 #endif
800  const auto bin_idx = pos_ptr - pos_buff;
801  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
802  id_buff[id_buff_idx] = static_cast<int32_t>(i);
803  }
804 }
805 
806 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
807  const int32_t hash_entry_count,
808  const int32_t invalid_slot_val,
809  const JoinColumn join_column,
810  const JoinColumnTypeInfo type_info
811 #ifndef __CUDACC__
812  ,
813  const void* sd_inner_proxy,
814  const void* sd_outer_proxy,
815  const int32_t cpu_thread_idx,
816  const int32_t cpu_thread_count
817 #endif
818 ) {
819  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
820  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
821  };
822 
823  fill_row_ids_impl(buff,
824  hash_entry_count,
825  invalid_slot_val,
826  join_column,
827  type_info
828 #ifndef __CUDACC__
829  ,
830  sd_inner_proxy,
831  sd_outer_proxy,
832  cpu_thread_idx,
833  cpu_thread_count
834 #endif
835  ,
836  slot_sel);
837 }
838 
840  const int32_t hash_entry_count,
841  const int32_t invalid_slot_val,
842  const JoinColumn join_column,
843  const JoinColumnTypeInfo type_info
844 #ifndef __CUDACC__
845  ,
846  const void* sd_inner_proxy,
847  const void* sd_outer_proxy,
848  const int32_t cpu_thread_idx,
849  const int32_t cpu_thread_count
850 #endif
851  ,
852  const int64_t bucket_normalization) {
853  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
855  pos_buff, elem, type_info.min_val, bucket_normalization);
856  };
857  fill_row_ids_impl(buff,
858  hash_entry_count,
859  invalid_slot_val,
860  join_column,
861  type_info
862 #ifndef __CUDACC__
863  ,
864  sd_inner_proxy,
865  sd_outer_proxy,
866  cpu_thread_idx,
867  cpu_thread_count
868 #endif
869  ,
870  slot_sel);
871 }
872 
873 template <typename SLOT_SELECTOR>
875  const int32_t hash_entry_count,
876  const int32_t invalid_slot_val,
877  const JoinColumn join_column,
878  const JoinColumnTypeInfo type_info,
879  const ShardInfo shard_info
880 #ifndef __CUDACC__
881  ,
882  const void* sd_inner_proxy,
883  const void* sd_outer_proxy,
884  const int32_t cpu_thread_idx,
885  const int32_t cpu_thread_count
886 #endif
887  ,
888  SLOT_SELECTOR slot_selector) {
889 
890  int32_t* pos_buff = buff;
891  int32_t* count_buff = buff + hash_entry_count;
892  int32_t* id_buff = count_buff + hash_entry_count;
893 
894 #ifdef __CUDACC__
895  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
896  int32_t step = blockDim.x * gridDim.x;
897 #else
898  int32_t start = cpu_thread_idx;
899  int32_t step = cpu_thread_count;
900 #endif
901  for (size_t i = start; i < join_column.num_elems; i += step) {
902  int64_t elem = get_join_column_element_value(type_info, join_column, i);
903  if (elem == type_info.null_val) {
904  if (type_info.uses_bw_eq) {
905  elem = type_info.translated_null_val;
906  } else {
907  continue;
908  }
909  }
910 #ifndef __CUDACC__
911  if (sd_inner_proxy &&
912  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
913  const auto outer_id = translate_str_id_to_outer_dict(
914  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
915  if (outer_id == StringDictionary::INVALID_STR_ID) {
916  continue;
917  }
918  elem = outer_id;
919  }
920  CHECK_GE(elem, type_info.min_val)
921  << "Element " << elem << " less than min val " << type_info.min_val;
922 #endif
923  auto* pos_ptr = slot_selector(pos_buff, elem);
924 #ifndef __CUDACC__
925  CHECK_NE(*pos_ptr, invalid_slot_val);
926 #endif
927  const auto bin_idx = pos_ptr - pos_buff;
928  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
929  id_buff[id_buff_idx] = static_cast<int32_t>(i);
930  }
931 }
932 
933 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
934  const int32_t hash_entry_count,
935  const int32_t invalid_slot_val,
936  const JoinColumn join_column,
937  const JoinColumnTypeInfo type_info,
938  const ShardInfo shard_info
939 #ifndef __CUDACC__
940  ,
941  const void* sd_inner_proxy,
942  const void* sd_outer_proxy,
943  const int32_t cpu_thread_idx,
944  const int32_t cpu_thread_count
945 #endif
946 ) {
947  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
948  return SUFFIX(get_hash_slot_sharded)(pos_buff,
949  elem,
950  type_info.min_val,
951  shard_info.entry_count_per_shard,
952  shard_info.num_shards,
953  shard_info.device_count);
954  };
955 
956  fill_row_ids_impl(buff,
957  hash_entry_count,
958  invalid_slot_val,
959  join_column,
960  type_info
961 #ifndef __CUDACC__
962  ,
963  sd_inner_proxy,
964  sd_outer_proxy,
965  cpu_thread_idx,
966  cpu_thread_count
967 #endif
968  ,
969  slot_sel);
970 }
971 
973  const int32_t hash_entry_count,
974  const int32_t invalid_slot_val,
975  const JoinColumn join_column,
976  const JoinColumnTypeInfo type_info,
977  const ShardInfo shard_info
978 #ifndef __CUDACC__
979  ,
980  const void* sd_inner_proxy,
981  const void* sd_outer_proxy,
982  const int32_t cpu_thread_idx,
983  const int32_t cpu_thread_count
984 #endif
985  ,
986  const int64_t bucket_normalization) {
987  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
988  auto elem) {
989  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
990  elem,
991  type_info.min_val,
992  shard_info.entry_count_per_shard,
993  shard_info.num_shards,
994  shard_info.device_count,
995  bucket_normalization);
996  };
997 
998  fill_row_ids_impl(buff,
999  hash_entry_count,
1000  invalid_slot_val,
1001  join_column,
1002  type_info
1003 #ifndef __CUDACC__
1004  ,
1005  sd_inner_proxy,
1006  sd_outer_proxy,
1007  cpu_thread_idx,
1008  cpu_thread_count
1009 #endif
1010  ,
1011  slot_sel);
1012 }
1013 
1014 template <typename T, typename KEY_HANDLER>
1016  const T* composite_key_dict,
1017  const size_t hash_entry_count,
1018  const int32_t invalid_slot_val,
1019  const KEY_HANDLER* f,
1020  const size_t num_elems
1021 #ifndef __CUDACC__
1022  ,
1023  const int32_t cpu_thread_idx,
1024  const int32_t cpu_thread_count
1025 #endif
1026 ) {
1027  int32_t* pos_buff = buff;
1028  int32_t* count_buff = buff + hash_entry_count;
1029  int32_t* id_buff = count_buff + hash_entry_count;
1030 #ifdef __CUDACC__
1031  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1032  int32_t step = blockDim.x * gridDim.x;
1033 #else
1034  int32_t start = cpu_thread_idx;
1035  int32_t step = cpu_thread_count;
1036 #endif
1037 
1038  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1039 #ifdef __CUDACC__
1040  assert(composite_key_dict);
1041 #endif
1042 
1043  auto key_buff_handler = [composite_key_dict,
1044  hash_entry_count,
1045  pos_buff,
1046  invalid_slot_val,
1047  count_buff,
1048  id_buff](const size_t row_index,
1049  const T* key_scratch_buff,
1050  const size_t key_component_count) {
1051  const T* matching_group = SUFFIX(get_matching_baseline_hash_slot_readonly)(
1052  key_scratch_buff, key_component_count, composite_key_dict, hash_entry_count);
1053  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1054  int32_t* pos_ptr = pos_buff + entry_idx;
1055 #ifndef __CUDACC__
1056  CHECK_NE(*pos_ptr, invalid_slot_val);
1057 #endif
1058  const auto bin_idx = pos_ptr - pos_buff;
1059  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1060  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1061  return 0;
1062  };
1063 
1064  for (size_t i = start; i < num_elems; i += step) {
1065  (*f)(i, key_scratch_buff, key_buff_handler);
1066  }
1067  return;
1068 }
1069 
1070 #undef mapd_add
1071 
1072 template <typename KEY_HANDLER>
1074  int32_t* row_count_buffer,
1075  const uint32_t b,
1076  const size_t num_elems,
1077  const KEY_HANDLER* f
1078 #ifndef __CUDACC__
1079  ,
1080  const int32_t cpu_thread_idx,
1081  const int32_t cpu_thread_count
1082 #endif
1083 ) {
1084 #ifdef __CUDACC__
1085  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1086  int32_t step = blockDim.x * gridDim.x;
1087 #else
1088  int32_t start = cpu_thread_idx;
1089  int32_t step = cpu_thread_count;
1090 #endif
1091 
1092  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1093  const size_t entry_idx,
1094  const int64_t* key_scratch_buff,
1095  const size_t key_component_count) {
1096  if (row_count_buffer) {
1097  row_count_buffer[entry_idx] += 1;
1098  }
1099 
1100  const uint64_t hash =
1101  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1102  const uint32_t index = hash >> (64 - b);
1103  const auto rank = get_rank(hash << b, 64 - b);
1104 #ifdef __CUDACC__
1105  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1106 #else
1107  hll_buffer[index] = std::max(hll_buffer[index], rank);
1108 #endif
1109 
1110  return 0;
1111  };
1112 
1113  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1114  for (size_t i = start; i < num_elems; i += step) {
1115  (*f)(i, key_scratch_buff, key_buff_handler);
1116  }
1117 }
1118 
1119 #ifdef __CUDACC__
1120 namespace {
1121 // TODO(adb): put these in a header file so they are not duplicated between here and
1122 // cuda_mapd_rt.cu
1123 __device__ double atomicMin(double* address, double val) {
1124  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1125  unsigned long long int old = *address_as_ull, assumed;
1126 
1127  do {
1128  assumed = old;
1129  old = atomicCAS(address_as_ull,
1130  assumed,
1131  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1132  } while (assumed != old);
1133 
1134  return __longlong_as_double(old);
1135 }
1136 } // namespace
1137 #endif
1138 
1139 template <size_t N>
1140 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1141  const JoinColumn* join_column,
1142  const double bucket_sz_threshold,
1143  const int32_t cpu_thread_idx,
1144  const int32_t cpu_thread_count) {
1145 #ifdef __CUDACC__
1146  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1147  int32_t step = blockDim.x * gridDim.x;
1148 #else
1149  int32_t start = cpu_thread_idx;
1150  int32_t step = cpu_thread_count;
1151 #endif
1152  for (size_t i = start; i < join_column->num_elems; i += step) {
1153  // We exepct the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1154  double bounds[2 * N];
1155  for (size_t j = 0; j < 2 * N; j++) {
1156  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(join_column->col_buff,
1157  2 * N * i + j);
1158  }
1159 
1160  for (size_t j = 0; j < N; j++) {
1161  const auto diff = bounds[j + N] - bounds[j];
1162 #ifdef __CUDACC__
1163  if (diff > bucket_sz_threshold) {
1164  atomicMin(&bucket_sizes_for_thread[j], diff);
1165  }
1166 #else
1167  if (diff > bucket_sz_threshold && diff < bucket_sizes_for_thread[j]) {
1168  bucket_sizes_for_thread[j] = diff;
1169  }
1170 #endif
1171  }
1172  }
1173 }
1174 
1175 #ifndef __CUDACC__
1176 
1177 template <typename InputIterator, typename OutputIterator>
1178 void inclusive_scan(InputIterator first,
1179  InputIterator last,
1180  OutputIterator out,
1181  const size_t thread_count) {
1182  using ElementType = typename InputIterator::value_type;
1183  using OffsetType = typename InputIterator::difference_type;
1184  const OffsetType elem_count = last - first;
1185  if (elem_count < 10000 || thread_count <= 1) {
1186  ElementType sum = 0;
1187  for (auto iter = first; iter != last; ++iter, ++out) {
1188  *out = sum += *iter;
1189  }
1190  return;
1191  }
1192 
1193  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1194  OffsetType start_off = 0;
1195  OffsetType end_off = std::min(step, elem_count);
1196  std::vector<ElementType> partial_sums(thread_count);
1197  std::vector<std::future<void>> counter_threads;
1198  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1199  start_off = std::min(start_off + step, elem_count),
1200  end_off = std::min(start_off + step, elem_count)) {
1201  counter_threads.push_back(std::async(
1202  std::launch::async,
1203  [first, out](
1204  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1205  ElementType sum = 0;
1206  for (auto in_iter = first + start, out_iter = out + start;
1207  in_iter != (first + end);
1208  ++in_iter, ++out_iter) {
1209  *out_iter = sum += *in_iter;
1210  }
1211  partial_sum = sum;
1212  },
1213  std::ref(partial_sums[thread_idx]),
1214  start_off,
1215  end_off));
1216  }
1217  for (auto& child : counter_threads) {
1218  child.get();
1219  }
1220 
1221  ElementType sum = 0;
1222  for (auto& s : partial_sums) {
1223  s += sum;
1224  sum = s;
1225  }
1226 
1227  counter_threads.clear();
1228  start_off = std::min(step, elem_count);
1229  end_off = std::min(start_off + step, elem_count);
1230  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1231  start_off = std::min(start_off + step, elem_count),
1232  end_off = std::min(start_off + step, elem_count)) {
1233  counter_threads.push_back(std::async(
1234  std::launch::async,
1235  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1236  for (auto iter = out + start; iter != (out + end); ++iter) {
1237  *iter += prev_sum;
1238  }
1239  },
1240  partial_sums[thread_idx],
1241  start_off,
1242  end_off));
1243  }
1244  for (auto& child : counter_threads) {
1245  child.get();
1246  }
1247 }
1248 
1249 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1251  const int32_t hash_entry_count,
1252  const int32_t invalid_slot_val,
1253  const JoinColumn& join_column,
1254  const JoinColumnTypeInfo& type_info,
1255  const void* sd_inner_proxy,
1256  const void* sd_outer_proxy,
1257  const unsigned cpu_thread_count,
1258  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1259  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1260  int32_t* pos_buff = buff;
1261  int32_t* count_buff = buff + hash_entry_count;
1262  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1263  std::vector<std::future<void>> counter_threads;
1264  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1265  counter_threads.push_back(std::async(
1266  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1267  }
1268 
1269  for (auto& child : counter_threads) {
1270  child.get();
1271  }
1272 
1273  std::vector<int32_t> count_copy(hash_entry_count, 0);
1274  CHECK_GT(hash_entry_count, int32_t(0));
1275  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1276 #if HAVE_CUDA
1277  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1278 #else
1280  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1281 #endif
1282  std::vector<std::future<void>> pos_threads;
1283  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1284  pos_threads.push_back(std::async(
1285  std::launch::async,
1286  [&](size_t thread_idx) {
1287  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1288  i += cpu_thread_count) {
1289  if (count_buff[i]) {
1290  pos_buff[i] = count_copy[i];
1291  }
1292  }
1293  },
1294  cpu_thread_idx));
1295  }
1296  for (auto& child : pos_threads) {
1297  child.get();
1298  }
1299 
1300  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1301  std::vector<std::future<void>> rowid_threads;
1302  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1303  rowid_threads.push_back(std::async(
1304  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1305  }
1306 
1307  for (auto& child : rowid_threads) {
1308  child.get();
1309  }
1310 }
1311 
1312 void fill_one_to_many_hash_table(int32_t* buff,
1313  const HashEntryInfo hash_entry_info,
1314  const int32_t invalid_slot_val,
1315  const JoinColumn& join_column,
1316  const JoinColumnTypeInfo& type_info,
1317  const void* sd_inner_proxy,
1318  const void* sd_outer_proxy,
1319  const unsigned cpu_thread_count) {
1320  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1321  invalid_slot_val,
1322  &join_column,
1323  &type_info,
1324  sd_inner_proxy,
1325  sd_outer_proxy](auto cpu_thread_idx,
1326  auto cpu_thread_count) {
1328  (count_buff,
1329  invalid_slot_val,
1330  join_column,
1331  type_info,
1332  sd_inner_proxy,
1333  sd_outer_proxy,
1334  cpu_thread_idx,
1335  cpu_thread_count);
1336  };
1337  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1338  buff,
1339  invalid_slot_val,
1340  &join_column,
1341  &type_info,
1342  sd_inner_proxy,
1343  sd_outer_proxy](auto cpu_thread_idx,
1344  auto cpu_thread_count) {
1346  (buff,
1347  hash_entry_count,
1348  invalid_slot_val,
1349  join_column,
1350  type_info,
1351  sd_inner_proxy,
1352  sd_outer_proxy,
1353  cpu_thread_idx,
1354  cpu_thread_count);
1355  };
1356 
1358  hash_entry_info.hash_entry_count,
1359  invalid_slot_val,
1360  join_column,
1361  type_info,
1362  sd_inner_proxy,
1363  sd_outer_proxy,
1364  cpu_thread_count,
1365  launch_count_matches,
1366  launch_fill_row_ids);
1367 }
1368 
1370  const HashEntryInfo hash_entry_info,
1371  const int32_t invalid_slot_val,
1372  const JoinColumn& join_column,
1373  const JoinColumnTypeInfo& type_info,
1374  const void* sd_inner_proxy,
1375  const void* sd_outer_proxy,
1376  const unsigned cpu_thread_count) {
1377  auto bucket_normalization = hash_entry_info.bucket_normalization;
1378  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1379  auto launch_count_matches = [bucket_normalization,
1380  count_buff = buff + hash_entry_count,
1381  invalid_slot_val,
1382  &join_column,
1383  &type_info,
1384  sd_inner_proxy,
1385  sd_outer_proxy](auto cpu_thread_idx,
1386  auto cpu_thread_count) {
1388  (count_buff,
1389  invalid_slot_val,
1390  join_column,
1391  type_info,
1392  sd_inner_proxy,
1393  sd_outer_proxy,
1394  cpu_thread_idx,
1395  cpu_thread_count,
1396  bucket_normalization);
1397  };
1398  auto launch_fill_row_ids = [bucket_normalization,
1399  hash_entry_count,
1400  buff,
1401  invalid_slot_val,
1402  &join_column,
1403  &type_info,
1404  sd_inner_proxy,
1405  sd_outer_proxy](auto cpu_thread_idx,
1406  auto cpu_thread_count) {
1408  (buff,
1409  hash_entry_count,
1410  invalid_slot_val,
1411  join_column,
1412  type_info,
1413  sd_inner_proxy,
1414  sd_outer_proxy,
1415  cpu_thread_idx,
1416  cpu_thread_count,
1417  bucket_normalization);
1418  };
1419 
1421  hash_entry_count,
1422  invalid_slot_val,
1423  join_column,
1424  type_info,
1425  sd_inner_proxy,
1426  sd_outer_proxy,
1427  cpu_thread_count,
1428  launch_count_matches,
1429  launch_fill_row_ids);
1430 }
1431 
1432 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1434  int32_t* buff,
1435  const int32_t hash_entry_count,
1436  const int32_t invalid_slot_val,
1437  const JoinColumn& join_column,
1438  const JoinColumnTypeInfo& type_info,
1439  const ShardInfo& shard_info,
1440  const void* sd_inner_proxy,
1441  const void* sd_outer_proxy,
1442  const unsigned cpu_thread_count,
1443  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1444  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1445  int32_t* pos_buff = buff;
1446  int32_t* count_buff = buff + hash_entry_count;
1447  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1448  std::vector<std::future<void>> counter_threads;
1449  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1450  counter_threads.push_back(std::async(
1451  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1452  }
1453 
1454  for (auto& child : counter_threads) {
1455  child.get();
1456  }
1457 
1458  std::vector<int32_t> count_copy(hash_entry_count, 0);
1459  CHECK_GT(hash_entry_count, int32_t(0));
1460  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1462  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1463  std::vector<std::future<void>> pos_threads;
1464  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1465  pos_threads.push_back(std::async(
1466  std::launch::async,
1467  [&](const unsigned thread_idx) {
1468  for (size_t i = thread_idx; i < static_cast<size_t>(hash_entry_count);
1469  i += cpu_thread_count) {
1470  if (count_buff[i]) {
1471  pos_buff[i] = count_copy[i];
1472  }
1473  }
1474  },
1475  cpu_thread_idx));
1476  }
1477  for (auto& child : pos_threads) {
1478  child.get();
1479  }
1480 
1481  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1482  std::vector<std::future<void>> rowid_threads;
1483  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1484  rowid_threads.push_back(std::async(
1485  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1486  }
1487 
1488  for (auto& child : rowid_threads) {
1489  child.get();
1490  }
1491 }
1492 
1494  const int32_t hash_entry_count,
1495 
1496  const int32_t invalid_slot_val,
1497  const JoinColumn& join_column,
1498  const JoinColumnTypeInfo& type_info,
1499  const ShardInfo& shard_info,
1500  const void* sd_inner_proxy,
1501  const void* sd_outer_proxy,
1502  const unsigned cpu_thread_count) {
1503  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1504  invalid_slot_val,
1505  &join_column,
1506  &type_info,
1507  &shard_info
1508 #ifndef __CUDACC__
1509  ,
1510  sd_inner_proxy,
1511  sd_outer_proxy
1512 #endif
1513  ](auto cpu_thread_idx, auto cpu_thread_count) {
1514  return SUFFIX(count_matches_sharded)(count_buff,
1515  invalid_slot_val,
1516  join_column,
1517  type_info,
1518  shard_info
1519 #ifndef __CUDACC__
1520  ,
1521  sd_inner_proxy,
1522  sd_outer_proxy,
1523  cpu_thread_idx,
1524  cpu_thread_count
1525 #endif
1526  );
1527  };
1528 
1529  auto launch_fill_row_ids = [buff,
1530  hash_entry_count,
1531  invalid_slot_val,
1532  &join_column,
1533  &type_info,
1534  &shard_info
1535 #ifndef __CUDACC__
1536  ,
1537  sd_inner_proxy,
1538  sd_outer_proxy
1539 #endif
1540  ](auto cpu_thread_idx, auto cpu_thread_count) {
1541  return SUFFIX(fill_row_ids_sharded)(buff,
1542  hash_entry_count,
1543  invalid_slot_val,
1544  join_column,
1545  type_info,
1546  shard_info
1547 #ifndef __CUDACC__
1548  ,
1549  sd_inner_proxy,
1550  sd_outer_proxy,
1551  cpu_thread_idx,
1552  cpu_thread_count);
1553 #endif
1554  };
1555 
1557  hash_entry_count,
1558  invalid_slot_val,
1559  join_column,
1560  type_info,
1561  shard_info
1562 #ifndef __CUDACC__
1563  ,
1564  sd_inner_proxy,
1565  sd_outer_proxy,
1566  cpu_thread_count
1567 #endif
1568  ,
1569  launch_count_matches,
1570  launch_fill_row_ids);
1571 }
1572 
1573 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1574  const int32_t entry_count,
1575  const size_t key_component_count,
1576  const bool with_val_slot,
1577  const int32_t invalid_slot_val,
1578  const int32_t cpu_thread_idx,
1579  const int32_t cpu_thread_count) {
1580  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1581  entry_count,
1582  key_component_count,
1583  with_val_slot,
1584  invalid_slot_val,
1585  cpu_thread_idx,
1586  cpu_thread_count);
1587 }
1588 
1589 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1590  const int32_t entry_count,
1591  const size_t key_component_count,
1592  const bool with_val_slot,
1593  const int32_t invalid_slot_val,
1594  const int32_t cpu_thread_idx,
1595  const int32_t cpu_thread_count) {
1596  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1597  entry_count,
1598  key_component_count,
1599  with_val_slot,
1600  invalid_slot_val,
1601  cpu_thread_idx,
1602  cpu_thread_count);
1603 }
1604 
1605 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1606  const size_t entry_count,
1607  const int32_t invalid_slot_val,
1608  const size_t key_component_count,
1609  const bool with_val_slot,
1610  const GenericKeyHandler* key_handler,
1611  const size_t num_elems,
1612  const int32_t cpu_thread_idx,
1613  const int32_t cpu_thread_count) {
1614  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1615  entry_count,
1616  invalid_slot_val,
1617  key_component_count,
1618  with_val_slot,
1619  key_handler,
1620  num_elems,
1621  cpu_thread_idx,
1622  cpu_thread_count);
1623 }
1624 
1626  const size_t entry_count,
1627  const int32_t invalid_slot_val,
1628  const size_t key_component_count,
1629  const bool with_val_slot,
1630  const OverlapsKeyHandler* key_handler,
1631  const size_t num_elems,
1632  const int32_t cpu_thread_idx,
1633  const int32_t cpu_thread_count) {
1634  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1635  entry_count,
1636  invalid_slot_val,
1637  key_component_count,
1638  with_val_slot,
1639  key_handler,
1640  num_elems,
1641  cpu_thread_idx,
1642  cpu_thread_count);
1643 }
1644 
1645 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1646  const size_t entry_count,
1647  const int32_t invalid_slot_val,
1648  const size_t key_component_count,
1649  const bool with_val_slot,
1650  const GenericKeyHandler* key_handler,
1651  const size_t num_elems,
1652  const int32_t cpu_thread_idx,
1653  const int32_t cpu_thread_count) {
1654  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1655  entry_count,
1656  invalid_slot_val,
1657  key_component_count,
1658  with_val_slot,
1659  key_handler,
1660  num_elems,
1661  cpu_thread_idx,
1662  cpu_thread_count);
1663 }
1664 
1666  const size_t entry_count,
1667  const int32_t invalid_slot_val,
1668  const size_t key_component_count,
1669  const bool with_val_slot,
1670  const OverlapsKeyHandler* key_handler,
1671  const size_t num_elems,
1672  const int32_t cpu_thread_idx,
1673  const int32_t cpu_thread_count) {
1674  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1675  entry_count,
1676  invalid_slot_val,
1677  key_component_count,
1678  with_val_slot,
1679  key_handler,
1680  num_elems,
1681  cpu_thread_idx,
1682  cpu_thread_count);
1683 }
1684 
1685 template <typename T>
1687  int32_t* buff,
1688  const T* composite_key_dict,
1689  const size_t hash_entry_count,
1690  const int32_t invalid_slot_val,
1691  const size_t key_component_count,
1692  const std::vector<JoinColumn>& join_column_per_key,
1693  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1694  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1695  const std::vector<const void*>& sd_inner_proxy_per_key,
1696  const std::vector<const void*>& sd_outer_proxy_per_key,
1697  const size_t cpu_thread_count) {
1698  int32_t* pos_buff = buff;
1699  int32_t* count_buff = buff + hash_entry_count;
1700  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1701  std::vector<std::future<void>> counter_threads;
1702  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1703  if (join_buckets_per_key.size() > 0) {
1704  counter_threads.push_back(
1705  std::async(std::launch::async,
1706  [count_buff,
1707  composite_key_dict,
1708  &hash_entry_count,
1709  &join_buckets_per_key,
1710  &join_column_per_key,
1711  cpu_thread_idx,
1712  cpu_thread_count] {
1713  const auto key_handler = OverlapsKeyHandler(
1714  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1715  &join_column_per_key[0],
1716  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1717  count_matches_baseline(count_buff,
1718  composite_key_dict,
1719  hash_entry_count,
1720  &key_handler,
1721  join_column_per_key[0].num_elems,
1722  cpu_thread_idx,
1723  cpu_thread_count);
1724  }));
1725  } else {
1726  counter_threads.push_back(std::async(
1727  std::launch::async,
1728  [count_buff,
1729  composite_key_dict,
1730  &key_component_count,
1731  &hash_entry_count,
1732  &join_column_per_key,
1733  &type_info_per_key,
1734  &sd_inner_proxy_per_key,
1735  &sd_outer_proxy_per_key,
1736  cpu_thread_idx,
1737  cpu_thread_count] {
1738  const auto key_handler = GenericKeyHandler(key_component_count,
1739  true,
1740  &join_column_per_key[0],
1741  &type_info_per_key[0],
1742  &sd_inner_proxy_per_key[0],
1743  &sd_outer_proxy_per_key[0]);
1744  count_matches_baseline(count_buff,
1745  composite_key_dict,
1746  hash_entry_count,
1747  &key_handler,
1748  join_column_per_key[0].num_elems,
1749  cpu_thread_idx,
1750  cpu_thread_count);
1751  }));
1752  }
1753  }
1754 
1755  for (auto& child : counter_threads) {
1756  child.get();
1757  }
1758 
1759  std::vector<int32_t> count_copy(hash_entry_count, 0);
1760  CHECK_GT(hash_entry_count, 0u);
1761  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1763  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1764  std::vector<std::future<void>> pos_threads;
1765  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1766  pos_threads.push_back(std::async(
1767  std::launch::async,
1768  [&](const int thread_idx) {
1769  for (size_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1770  if (count_buff[i]) {
1771  pos_buff[i] = count_copy[i];
1772  }
1773  }
1774  },
1775  cpu_thread_idx));
1776  }
1777  for (auto& child : pos_threads) {
1778  child.get();
1779  }
1780 
1781  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1782  std::vector<std::future<void>> rowid_threads;
1783  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1784  if (join_buckets_per_key.size() > 0) {
1785  rowid_threads.push_back(
1786  std::async(std::launch::async,
1787  [buff,
1788  composite_key_dict,
1789  hash_entry_count,
1790  invalid_slot_val,
1791  &join_column_per_key,
1792  &join_buckets_per_key,
1793  cpu_thread_idx,
1794  cpu_thread_count] {
1795  const auto key_handler = OverlapsKeyHandler(
1796  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1797  &join_column_per_key[0],
1798  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1800  (buff,
1801  composite_key_dict,
1802  hash_entry_count,
1803  invalid_slot_val,
1804  &key_handler,
1805  join_column_per_key[0].num_elems,
1806  cpu_thread_idx,
1807  cpu_thread_count);
1808  }));
1809  } else {
1810  rowid_threads.push_back(std::async(std::launch::async,
1811  [buff,
1812  composite_key_dict,
1813  hash_entry_count,
1814  invalid_slot_val,
1815  key_component_count,
1816  &join_column_per_key,
1817  &type_info_per_key,
1818  &sd_inner_proxy_per_key,
1819  &sd_outer_proxy_per_key,
1820  cpu_thread_idx,
1821  cpu_thread_count] {
1822  const auto key_handler = GenericKeyHandler(
1823  key_component_count,
1824  true,
1825  &join_column_per_key[0],
1826  &type_info_per_key[0],
1827  &sd_inner_proxy_per_key[0],
1828  &sd_outer_proxy_per_key[0]);
1830  (buff,
1831  composite_key_dict,
1832  hash_entry_count,
1833  invalid_slot_val,
1834  &key_handler,
1835  join_column_per_key[0].num_elems,
1836  cpu_thread_idx,
1837  cpu_thread_count);
1838  }));
1839  }
1840  }
1841 
1842  for (auto& child : rowid_threads) {
1843  child.get();
1844  }
1845 }
1846 
1848  int32_t* buff,
1849  const int32_t* composite_key_dict,
1850  const size_t hash_entry_count,
1851  const int32_t invalid_slot_val,
1852  const size_t key_component_count,
1853  const std::vector<JoinColumn>& join_column_per_key,
1854  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1855  const std::vector<JoinBucketInfo>& join_bucket_info,
1856  const std::vector<const void*>& sd_inner_proxy_per_key,
1857  const std::vector<const void*>& sd_outer_proxy_per_key,
1858  const int32_t cpu_thread_count) {
1859  fill_one_to_many_baseline_hash_table<int32_t>(buff,
1860  composite_key_dict,
1861  hash_entry_count,
1862  invalid_slot_val,
1863  key_component_count,
1864  join_column_per_key,
1865  type_info_per_key,
1866  join_bucket_info,
1867  sd_inner_proxy_per_key,
1868  sd_outer_proxy_per_key,
1869  cpu_thread_count);
1870 }
1871 
1873  int32_t* buff,
1874  const int64_t* composite_key_dict,
1875  const size_t hash_entry_count,
1876  const int32_t invalid_slot_val,
1877  const size_t key_component_count,
1878  const std::vector<JoinColumn>& join_column_per_key,
1879  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1880  const std::vector<JoinBucketInfo>& join_bucket_info,
1881  const std::vector<const void*>& sd_inner_proxy_per_key,
1882  const std::vector<const void*>& sd_outer_proxy_per_key,
1883  const int32_t cpu_thread_count) {
1884  fill_one_to_many_baseline_hash_table<int64_t>(buff,
1885  composite_key_dict,
1886  hash_entry_count,
1887  invalid_slot_val,
1888  key_component_count,
1889  join_column_per_key,
1890  type_info_per_key,
1891  join_bucket_info,
1892  sd_inner_proxy_per_key,
1893  sd_outer_proxy_per_key,
1894  cpu_thread_count);
1895 }
1896 
1897 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
1898  const uint32_t b,
1899  const size_t padded_size_bytes,
1900  const std::vector<JoinColumn>& join_column_per_key,
1901  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1902  const int thread_count) {
1903  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1904  CHECK(!join_column_per_key.empty());
1905 
1906  std::vector<std::future<void>> approx_distinct_threads;
1907  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1908  approx_distinct_threads.push_back(std::async(
1909  std::launch::async,
1910  [&join_column_per_key,
1911  &type_info_per_key,
1912  b,
1913  hll_buffer_all_cpus,
1914  padded_size_bytes,
1915  thread_idx,
1916  thread_count] {
1917  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1918 
1919  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
1920  false,
1921  &join_column_per_key[0],
1922  &type_info_per_key[0],
1923  nullptr,
1924  nullptr);
1926  nullptr,
1927  b,
1928  join_column_per_key[0].num_elems,
1929  &key_handler,
1930  thread_idx,
1931  thread_count);
1932  }));
1933  }
1934  for (auto& child : approx_distinct_threads) {
1935  child.get();
1936  }
1937 }
1938 
1940  uint8_t* hll_buffer_all_cpus,
1941  std::vector<int32_t>& row_counts,
1942  const uint32_t b,
1943  const size_t padded_size_bytes,
1944  const std::vector<JoinColumn>& join_column_per_key,
1945  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1946  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1947  const int thread_count) {
1948  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
1949  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1950  CHECK(!join_column_per_key.empty());
1951 
1952  std::vector<std::future<void>> approx_distinct_threads;
1953  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1954  approx_distinct_threads.push_back(std::async(
1955  std::launch::async,
1956  [&join_column_per_key,
1957  &join_buckets_per_key,
1958  &row_counts,
1959  b,
1960  hll_buffer_all_cpus,
1961  padded_size_bytes,
1962  thread_idx,
1963  thread_count] {
1964  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1965 
1966  const auto key_handler = OverlapsKeyHandler(
1967  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1968  &join_column_per_key[0],
1969  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1971  row_counts.data(),
1972  b,
1973  join_column_per_key[0].num_elems,
1974  &key_handler,
1975  thread_idx,
1976  thread_count);
1977  }));
1978  }
1979  for (auto& child : approx_distinct_threads) {
1980  child.get();
1981  }
1982 
1984  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
1985 }
1986 
1987 void compute_bucket_sizes(std::vector<double>& bucket_sizes_for_dimension,
1988  const JoinColumn& join_column,
1989  const double bucket_size_threshold,
1990  const int thread_count) {
1991  std::vector<std::vector<double>> bucket_sizes_for_threads;
1992  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1993  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(),
1994  std::numeric_limits<double>::max());
1995  }
1996  std::vector<std::future<void>> threads;
1997  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1998  threads.push_back(std::async(std::launch::async,
1999  compute_bucket_sizes_impl<2>,
2000  bucket_sizes_for_threads[thread_idx].data(),
2001  &join_column,
2002  bucket_size_threshold,
2003  thread_idx,
2004  thread_count));
2005  }
2006  for (auto& child : threads) {
2007  child.get();
2008  }
2009 
2010  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2011  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2012  if (bucket_sizes_for_threads[thread_idx][i] < bucket_sizes_for_dimension[i]) {
2013  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2014  }
2015  }
2016  }
2017 }
2018 
2019 #endif
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
const size_t num_shards
#define CHECK_EQ(x, y)
Definition: Logger.h:195
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const size_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const size_t entry_count, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const size_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
size_t getNormalizedHashEntryCount() const
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:31
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int32_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define GLOBAL
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
#define CHECK_GE(x, y)
Definition: Logger.h:200
#define SUFFIX(name)
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const double bucket_size_threshold, const int thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
const int64_t null_val
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define CHECK_GT(x, y)
Definition: Logger.h:199
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
DEVICE int SUFFIX() fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const FILL_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEVICE
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:39
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:45
static constexpr int32_t INVALID_STR_ID
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE T SUFFIX() get_invalid_key()
std::string getString(int32_t string_id) const
#define CHECK_NE(x, y)
Definition: Logger.h:196
#define mapd_cas(address, compare, val)
const size_t num_elems
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int64_t bucket_normalization
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
const size_t shard
size_t hash_entry_count
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const double bucket_sz_threshold, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:20
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const bool with_val_slot)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:60
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
#define store_cst(ptr, val)
#define CHECK(condition)
Definition: Logger.h:187
DEVICE int64_t get_join_column_element_value(const JoinColumnTypeInfo &type_info, const JoinColumn &join_column, const size_t i)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:126
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const size_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void *> &sd_inner_proxy_per_key, const std::vector< const void *> &sd_outer_proxy_per_key, const size_t cpu_thread_count)
#define mapd_add(address, val)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
DEVICE void fill_row_ids_impl(int32_t *buff, const int32_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int32_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)