OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
24 #include "Shared/shard_key.h"
25 #ifdef __CUDACC__
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #include <future>
38 #endif
39 
40 #if HAVE_CUDA
41 #include <thrust/scan.h>
42 #endif
43 #include "Shared/funcannotations.h"
44 
45 #include <cmath>
46 #include <numeric>
47 
48 #ifndef __CUDACC__
49 namespace {
50 
71 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
72  const int64_t min_elem,
73  const int64_t max_elem,
74  const void* sd_inner_proxy,
75  const void* sd_outer_proxy) {
76  CHECK(sd_outer_proxy);
77  const auto sd_inner_dict_proxy =
78  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
79  const auto sd_outer_dict_proxy =
80  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
81  const auto elem_str = sd_inner_dict_proxy->getString(elem);
82  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
83  if (outer_id > max_elem || outer_id < min_elem) {
85  }
86  return outer_id;
87 }
88 
89 } // namespace
90 #endif
91 
92 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
93  const int64_t hash_entry_count,
94  const int32_t invalid_slot_val,
95  const int32_t cpu_thread_idx,
96  const int32_t cpu_thread_count) {
97 #ifdef __CUDACC__
98  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
99  int32_t step = blockDim.x * gridDim.x;
100 #else
101  int32_t start = cpu_thread_idx;
102  int32_t step = cpu_thread_count;
103 #endif
104  for (int64_t i = start; i < hash_entry_count; i += step) {
105  groups_buffer[i] = invalid_slot_val;
106  }
107 }
108 
109 #ifdef __CUDACC__
110 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
111 #elif defined(_MSC_VER)
112 #define mapd_cas(address, compare, val) \
113  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
114  static_cast<long>(val), \
115  static_cast<long>(compare))
116 #else
117 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
118 #endif
119 
120 template <typename SLOT_SELECTOR>
121 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
122  const int32_t invalid_slot_val,
123  const JoinColumn join_column,
124  const JoinColumnTypeInfo type_info,
125  const void* sd_inner_proxy,
126  const void* sd_outer_proxy,
127  const int32_t cpu_thread_idx,
128  const int32_t cpu_thread_count,
129  SLOT_SELECTOR slot_sel) {
130 #ifdef __CUDACC__
131  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
132  int32_t step = blockDim.x * gridDim.x;
133 #else
134  int32_t start = cpu_thread_idx;
135  int32_t step = cpu_thread_count;
136 #endif
137  JoinColumnTyped col{&join_column, &type_info};
138  for (auto item : col.slice(start, step)) {
139  const size_t index = item.index;
140  int64_t elem = item.element;
141  if (elem == type_info.null_val) {
142  if (type_info.uses_bw_eq) {
143  elem = type_info.translated_null_val;
144  } else {
145  continue;
146  }
147  }
148 #ifndef __CUDACC__
149  if (sd_inner_proxy &&
150  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
151  const auto outer_id = translate_str_id_to_outer_dict(
152  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
153  if (outer_id == StringDictionary::INVALID_STR_ID) {
154  continue;
155  }
156  elem = outer_id;
157  }
158  CHECK_GE(elem, type_info.min_val)
159  << "Element " << elem << " less than min val " << type_info.min_val;
160 #endif
161  int32_t* entry_ptr = slot_sel(elem);
162  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
163  return -1;
164  }
165  }
166  return 0;
167 };
168 
170  const int32_t invalid_slot_val,
171  const JoinColumn join_column,
172  const JoinColumnTypeInfo type_info,
173  const void* sd_inner_proxy,
174  const void* sd_outer_proxy,
175  const int32_t cpu_thread_idx,
176  const int32_t cpu_thread_count,
177  const int64_t bucket_normalization) {
178  auto slot_selector = [&](auto elem) {
180  buff, elem, type_info.min_val, bucket_normalization);
181  };
182  return fill_hash_join_buff_impl(buff,
183  invalid_slot_val,
184  join_column,
185  type_info,
186  sd_inner_proxy,
187  sd_outer_proxy,
188  cpu_thread_idx,
189  cpu_thread_count,
190  slot_selector);
191 }
192 
193 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
194  const int32_t invalid_slot_val,
195  const JoinColumn join_column,
196  const JoinColumnTypeInfo type_info,
197  const void* sd_inner_proxy,
198  const void* sd_outer_proxy,
199  const int32_t cpu_thread_idx,
200  const int32_t cpu_thread_count) {
201  auto slot_selector = [&](auto elem) {
202  return SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
203  };
204  return fill_hash_join_buff_impl(buff,
205  invalid_slot_val,
206  join_column,
207  type_info,
208  sd_inner_proxy,
209  sd_outer_proxy,
210  cpu_thread_idx,
211  cpu_thread_count,
212  slot_selector);
213 }
214 
215 template <typename SLOT_SELECTOR>
217  const int32_t invalid_slot_val,
218  const JoinColumn join_column,
219  const JoinColumnTypeInfo type_info,
220  const ShardInfo shard_info,
221  const void* sd_inner_proxy,
222  const void* sd_outer_proxy,
223  const int32_t cpu_thread_idx,
224  const int32_t cpu_thread_count,
225  SLOT_SELECTOR slot_sel) {
226 #ifdef __CUDACC__
227  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
228  int32_t step = blockDim.x * gridDim.x;
229 #else
230  int32_t start = cpu_thread_idx;
231  int32_t step = cpu_thread_count;
232 #endif
233  JoinColumnTyped col{&join_column, &type_info};
234  for (auto item : col.slice(start, step)) {
235  const size_t index = item.index;
236  int64_t elem = item.element;
237  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
238  if (shard != shard_info.shard) {
239  continue;
240  }
241  if (elem == type_info.null_val) {
242  if (type_info.uses_bw_eq) {
243  elem = type_info.translated_null_val;
244  } else {
245  continue;
246  }
247  }
248 #ifndef __CUDACC__
249  if (sd_inner_proxy &&
250  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
251  const auto outer_id = translate_str_id_to_outer_dict(
252  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
253  if (outer_id == StringDictionary::INVALID_STR_ID) {
254  continue;
255  }
256  elem = outer_id;
257  }
258  CHECK_GE(elem, type_info.min_val)
259  << "Element " << elem << " less than min val " << type_info.min_val;
260 #endif
261  int32_t* entry_ptr = slot_sel(elem, shard);
262  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
263  return -1;
264  }
265  }
266  return 0;
267 }
268 
270  int32_t* buff,
271  const int32_t invalid_slot_val,
272  const JoinColumn join_column,
273  const JoinColumnTypeInfo type_info,
274  const ShardInfo shard_info,
275  const void* sd_inner_proxy,
276  const void* sd_outer_proxy,
277  const int32_t cpu_thread_idx,
278  const int32_t cpu_thread_count,
279  const int64_t bucket_normalization) {
280  auto slot_selector = [&](auto elem, auto shard) -> auto {
282  elem,
283  type_info.min_val,
284  shard_info.entry_count_per_shard,
285  shard,
286  shard_info.num_shards,
287  shard_info.device_count,
288  bucket_normalization);
289  };
290 
292  invalid_slot_val,
293  join_column,
294  type_info,
295  shard_info,
296  sd_inner_proxy,
297  sd_outer_proxy,
298  cpu_thread_idx,
299  cpu_thread_count,
300  slot_selector);
301 }
302 
304  const int32_t invalid_slot_val,
305  const JoinColumn join_column,
306  const JoinColumnTypeInfo type_info,
307  const ShardInfo shard_info,
308  const void* sd_inner_proxy,
309  const void* sd_outer_proxy,
310  const int32_t cpu_thread_idx,
311  const int32_t cpu_thread_count) {
312  auto slot_selector = [&](auto elem, auto shard) {
313  return SUFFIX(get_hash_slot_sharded_opt)(buff,
314  elem,
315  type_info.min_val,
316  shard_info.entry_count_per_shard,
317  shard,
318  shard_info.num_shards,
319  shard_info.device_count);
320  };
322  invalid_slot_val,
323  join_column,
324  type_info,
325  shard_info,
326  sd_inner_proxy,
327  sd_outer_proxy,
328  cpu_thread_idx,
329  cpu_thread_count,
330  slot_selector);
331 }
332 
333 template <typename T>
335  const int64_t entry_count,
336  const size_t key_component_count,
337  const bool with_val_slot,
338  const int32_t invalid_slot_val,
339  const int32_t cpu_thread_idx,
340  const int32_t cpu_thread_count) {
341 #ifdef __CUDACC__
342  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
343  int32_t step = blockDim.x * gridDim.x;
344 #else
345  int32_t start = cpu_thread_idx;
346  int32_t step = cpu_thread_count;
347 #endif
348  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
349  const T empty_key = SUFFIX(get_invalid_key)<T>();
350  for (int64_t h = start; h < entry_count; h += step) {
351  int64_t off = h * hash_entry_size;
352  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
353  for (size_t i = 0; i < key_component_count; ++i) {
354  row_ptr[i] = empty_key;
355  }
356  if (with_val_slot) {
357  row_ptr[key_component_count] = invalid_slot_val;
358  }
359  }
360 }
361 
362 #ifdef __CUDACC__
363 template <typename T>
364 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
365  const uint32_t h,
366  const T* key,
367  const size_t key_component_count,
368  const int64_t hash_entry_size) {
369  uint32_t off = h * hash_entry_size;
370  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
371  const T empty_key = SUFFIX(get_invalid_key)<T>();
372  {
373  const T old = atomicCAS(row_ptr, empty_key, *key);
374  if (empty_key == old && key_component_count > 1) {
375  for (int64_t i = 1; i <= key_component_count - 1; ++i) {
376  atomicExch(row_ptr + i, key[i]);
377  }
378  }
379  }
380  if (key_component_count > 1) {
381  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
382  // spin until the winning thread has finished writing the entire key and the init
383  // value
384  }
385  }
386  bool match = true;
387  for (uint32_t i = 0; i < key_component_count; ++i) {
388  if (row_ptr[i] != key[i]) {
389  match = false;
390  break;
391  }
392  }
393 
394  if (match) {
395  return reinterpret_cast<T*>(row_ptr + key_component_count);
396  }
397  return nullptr;
398 }
399 #else
400 
401 #ifdef _MSC_VER
402 #define cas_cst(ptr, expected, desired) \
403  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
404  reinterpret_cast<void*>(&desired), \
405  expected) == expected)
406 #define store_cst(ptr, val) \
407  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
408  reinterpret_cast<void*>(val))
409 #define load_cst(ptr) \
410  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
411 #else
412 #define cas_cst(ptr, expected, desired) \
413  __atomic_compare_exchange_n( \
414  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
415 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
416 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
417 #endif
418 
419 template <typename T>
421  const uint32_t h,
422  const T* key,
423  const size_t key_component_count,
424  const int64_t hash_entry_size) {
425  uint32_t off = h * hash_entry_size;
426  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
427  T empty_key = SUFFIX(get_invalid_key)<T>();
428  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
429  if (UNLIKELY(*key == write_pending)) {
430  // Address the singularity case where the first column contains the pending
431  // write special value. Should never happen, but avoid doing wrong things.
432  return nullptr;
433  }
434  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
435  if (success) {
436  if (key_component_count > 1) {
437  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
438  }
439  store_cst(row_ptr, *key);
440  return reinterpret_cast<T*>(row_ptr + key_component_count);
441  }
442  while (load_cst(row_ptr) == write_pending) {
443  // spin until the winning thread has finished writing the entire key
444  }
445  for (size_t i = 0; i < key_component_count; ++i) {
446  if (load_cst(row_ptr + i) != key[i]) {
447  return nullptr;
448  }
449  }
450  return reinterpret_cast<T*>(row_ptr + key_component_count);
451 }
452 
453 #undef load_cst
454 #undef store_cst
455 #undef cas_cst
456 
457 #endif // __CUDACC__
458 
459 template <typename T>
460 DEVICE int write_baseline_hash_slot(const int32_t val,
461  int8_t* hash_buff,
462  const int64_t entry_count,
463  const T* key,
464  const size_t key_component_count,
465  const bool with_val_slot,
466  const int32_t invalid_slot_val,
467  const size_t key_size_in_bytes,
468  const size_t hash_entry_size) {
469  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
470  T* matching_group = get_matching_baseline_hash_slot_at(
471  hash_buff, h, key, key_component_count, hash_entry_size);
472  if (!matching_group) {
473  uint32_t h_probe = (h + 1) % entry_count;
474  while (h_probe != h) {
475  matching_group = get_matching_baseline_hash_slot_at(
476  hash_buff, h_probe, key, key_component_count, hash_entry_size);
477  if (matching_group) {
478  break;
479  }
480  h_probe = (h_probe + 1) % entry_count;
481  }
482  }
483  if (!matching_group) {
484  return -2;
485  }
486  if (!with_val_slot) {
487  return 0;
488  }
489  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
490  return -1;
491  }
492  return 0;
493 }
494 
495 template <typename T, typename FILL_HANDLER>
497  const int64_t entry_count,
498  const int32_t invalid_slot_val,
499  const size_t key_component_count,
500  const bool with_val_slot,
501  const FILL_HANDLER* f,
502  const int64_t num_elems,
503  const int32_t cpu_thread_idx,
504  const int32_t cpu_thread_count) {
505 #ifdef __CUDACC__
506  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
507  int32_t step = blockDim.x * gridDim.x;
508 #else
509  int32_t start = cpu_thread_idx;
510  int32_t step = cpu_thread_count;
511 #endif
512 
513  T key_scratch_buff[g_maximum_conditions_to_coalesce];
514  const size_t key_size_in_bytes = key_component_count * sizeof(T);
515  const size_t hash_entry_size =
516  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
517  auto key_buff_handler = [hash_buff,
518  entry_count,
519  with_val_slot,
520  invalid_slot_val,
521  key_size_in_bytes,
522  hash_entry_size](const int64_t entry_idx,
523  const T* key_scratch_buffer,
524  const size_t key_component_count) {
525  return write_baseline_hash_slot<T>(entry_idx,
526  hash_buff,
527  entry_count,
528  key_scratch_buffer,
529  key_component_count,
530  with_val_slot,
531  invalid_slot_val,
532  key_size_in_bytes,
533  hash_entry_size);
534  };
535 
536  JoinColumnTuple cols(
537  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
538  for (auto& it : cols.slice(start, step)) {
539  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
540  if (err) {
541  return err;
542  }
543  }
544  return 0;
545 }
546 
547 #undef mapd_cas
548 
549 #ifdef __CUDACC__
550 #define mapd_add(address, val) atomicAdd(address, val)
551 #elif defined(_MSC_VER)
552 #define mapd_add(address, val) \
553  InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
554  static_cast<long>(val))
555 #else
556 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
557 #endif
558 
559 template <typename SLOT_SELECTOR>
560 DEVICE void count_matches_impl(int32_t* count_buff,
561  const int32_t invalid_slot_val,
562  const JoinColumn join_column,
563  const JoinColumnTypeInfo type_info
564 #ifndef __CUDACC__
565  ,
566  const void* sd_inner_proxy,
567  const void* sd_outer_proxy,
568  const int32_t cpu_thread_idx,
569  const int32_t cpu_thread_count
570 #endif
571  ,
572  SLOT_SELECTOR slot_selector) {
573 #ifdef __CUDACC__
574  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
575  int32_t step = blockDim.x * gridDim.x;
576 #else
577  int32_t start = cpu_thread_idx;
578  int32_t step = cpu_thread_count;
579 #endif
580  JoinColumnTyped col{&join_column, &type_info};
581  for (auto item : col.slice(start, step)) {
582  int64_t elem = item.element;
583  if (elem == type_info.null_val) {
584  if (type_info.uses_bw_eq) {
585  elem = type_info.translated_null_val;
586  } else {
587  continue;
588  }
589  }
590 #ifndef __CUDACC__
591  if (sd_inner_proxy &&
592  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
593  const auto outer_id = translate_str_id_to_outer_dict(
594  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
595  if (outer_id == StringDictionary::INVALID_STR_ID) {
596  continue;
597  }
598  elem = outer_id;
599  }
600  CHECK_GE(elem, type_info.min_val)
601  << "Element " << elem << " less than min val " << type_info.min_val;
602 #endif
603  auto* entry_ptr = slot_selector(count_buff, elem);
604  mapd_add(entry_ptr, int32_t(1));
605  }
606 }
607 
608 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
609  const int32_t invalid_slot_val,
610  const JoinColumn join_column,
611  const JoinColumnTypeInfo type_info
612 #ifndef __CUDACC__
613  ,
614  const void* sd_inner_proxy,
615  const void* sd_outer_proxy,
616  const int32_t cpu_thread_idx,
617  const int32_t cpu_thread_count
618 #endif
619 ) {
620  auto slot_sel = [&type_info](auto count_buff, auto elem) {
621  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
622  };
623  count_matches_impl(count_buff,
624  invalid_slot_val,
625  join_column,
626  type_info
627 #ifndef __CUDACC__
628  ,
629  sd_inner_proxy,
630  sd_outer_proxy,
631  cpu_thread_idx,
632  cpu_thread_count
633 #endif
634  ,
635  slot_sel);
636 }
637 
638 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
639  const int32_t invalid_slot_val,
640  const JoinColumn join_column,
641  const JoinColumnTypeInfo type_info
642 #ifndef __CUDACC__
643  ,
644  const void* sd_inner_proxy,
645  const void* sd_outer_proxy,
646  const int32_t cpu_thread_idx,
647  const int32_t cpu_thread_count
648 #endif
649  ,
650  const int64_t bucket_normalization) {
651  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
653  count_buff, elem, type_info.min_val, bucket_normalization);
654  };
655  count_matches_impl(count_buff,
656  invalid_slot_val,
657  join_column,
658  type_info
659 #ifndef __CUDACC__
660  ,
661  sd_inner_proxy,
662  sd_outer_proxy,
663  cpu_thread_idx,
664  cpu_thread_count
665 #endif
666  ,
667  slot_sel);
668 }
669 
670 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
671  const int32_t invalid_slot_val,
672  const JoinColumn join_column,
673  const JoinColumnTypeInfo type_info,
674  const ShardInfo shard_info
675 #ifndef __CUDACC__
676  ,
677  const void* sd_inner_proxy,
678  const void* sd_outer_proxy,
679  const int32_t cpu_thread_idx,
680  const int32_t cpu_thread_count
681 #endif
682 ) {
683 #ifdef __CUDACC__
684  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
685  int32_t step = blockDim.x * gridDim.x;
686 #else
687  int32_t start = cpu_thread_idx;
688  int32_t step = cpu_thread_count;
689 #endif
690  JoinColumnTyped col{&join_column, &type_info};
691  for (auto item : col.slice(start, step)) {
692  int64_t elem = item.element;
693  if (elem == type_info.null_val) {
694  if (type_info.uses_bw_eq) {
695  elem = type_info.translated_null_val;
696  } else {
697  continue;
698  }
699  }
700 #ifndef __CUDACC__
701  if (sd_inner_proxy &&
702  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
703  const auto outer_id = translate_str_id_to_outer_dict(
704  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
705  if (outer_id == StringDictionary::INVALID_STR_ID) {
706  continue;
707  }
708  elem = outer_id;
709  }
710  CHECK_GE(elem, type_info.min_val)
711  << "Element " << elem << " less than min val " << type_info.min_val;
712 #endif
713  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
714  elem,
715  type_info.min_val,
716  shard_info.entry_count_per_shard,
717  shard_info.num_shards,
718  shard_info.device_count);
719  mapd_add(entry_ptr, int32_t(1));
720  }
721 }
722 
723 template <typename T>
725  const T* key,
726  const size_t key_component_count,
727  const T* composite_key_dict,
728  const int64_t entry_count,
729  const size_t key_size_in_bytes) {
730  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
731  uint32_t off = h * key_component_count;
732  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
733  return &composite_key_dict[off];
734  }
735  uint32_t h_probe = (h + 1) % entry_count;
736  while (h_probe != h) {
737  off = h_probe * key_component_count;
738  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
739  return &composite_key_dict[off];
740  }
741  h_probe = (h_probe + 1) % entry_count;
742  }
743 #ifndef __CUDACC__
744  CHECK(false);
745 #else
746  assert(false);
747 #endif
748  return nullptr;
749 }
750 
751 template <typename T, typename KEY_HANDLER>
752 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
753  const T* composite_key_dict,
754  const int64_t entry_count,
755  const KEY_HANDLER* f,
756  const int64_t num_elems
757 #ifndef __CUDACC__
758  ,
759  const int32_t cpu_thread_idx,
760  const int32_t cpu_thread_count
761 #endif
762 ) {
763 #ifdef __CUDACC__
764  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
765  int32_t step = blockDim.x * gridDim.x;
766 #else
767  int32_t start = cpu_thread_idx;
768  int32_t step = cpu_thread_count;
769 #endif
770 #ifdef __CUDACC__
771  assert(composite_key_dict);
772 #endif
773  T key_scratch_buff[g_maximum_conditions_to_coalesce];
774  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
775  auto key_buff_handler = [composite_key_dict,
776  entry_count,
777  count_buff,
778  key_size_in_bytes](const int64_t row_entry_idx,
779  const T* key_scratch_buff,
780  const size_t key_component_count) {
781  const auto matching_group =
783  key_component_count,
784  composite_key_dict,
785  entry_count,
786  key_size_in_bytes);
787  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
788  mapd_add(&count_buff[entry_idx], int32_t(1));
789  return 0;
790  };
791 
792  JoinColumnTuple cols(
793  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
794  for (auto& it : cols.slice(start, step)) {
795  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
796  }
797 }
798 
799 template <typename SLOT_SELECTOR>
800 DEVICE void fill_row_ids_impl(int32_t* buff,
801  const int64_t hash_entry_count,
802  const int32_t invalid_slot_val,
803  const JoinColumn join_column,
804  const JoinColumnTypeInfo type_info
805 #ifndef __CUDACC__
806  ,
807  const void* sd_inner_proxy,
808  const void* sd_outer_proxy,
809  const int32_t cpu_thread_idx,
810  const int32_t cpu_thread_count
811 #endif
812  ,
813  SLOT_SELECTOR slot_selector) {
814  int32_t* pos_buff = buff;
815  int32_t* count_buff = buff + hash_entry_count;
816  int32_t* id_buff = count_buff + hash_entry_count;
817 
818 #ifdef __CUDACC__
819  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
820  int32_t step = blockDim.x * gridDim.x;
821 #else
822  int32_t start = cpu_thread_idx;
823  int32_t step = cpu_thread_count;
824 #endif
825  JoinColumnTyped col{&join_column, &type_info};
826  for (auto item : col.slice(start, step)) {
827  const size_t index = item.index;
828  int64_t elem = item.element;
829  if (elem == type_info.null_val) {
830  if (type_info.uses_bw_eq) {
831  elem = type_info.translated_null_val;
832  } else {
833  continue;
834  }
835  }
836 #ifndef __CUDACC__
837  if (sd_inner_proxy &&
838  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
839  const auto outer_id = translate_str_id_to_outer_dict(
840  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
841  if (outer_id == StringDictionary::INVALID_STR_ID) {
842  continue;
843  }
844  elem = outer_id;
845  }
846  CHECK_GE(elem, type_info.min_val)
847  << "Element " << elem << " less than min val " << type_info.min_val;
848 #endif
849  auto pos_ptr = slot_selector(pos_buff, elem);
850 #ifndef __CUDACC__
851  CHECK_NE(*pos_ptr, invalid_slot_val);
852 #endif
853  const auto bin_idx = pos_ptr - pos_buff;
854  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
855  id_buff[id_buff_idx] = static_cast<int32_t>(index);
856  }
857 }
858 
859 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
860  const int64_t hash_entry_count,
861  const int32_t invalid_slot_val,
862  const JoinColumn join_column,
863  const JoinColumnTypeInfo type_info
864 #ifndef __CUDACC__
865  ,
866  const void* sd_inner_proxy,
867  const void* sd_outer_proxy,
868  const int32_t cpu_thread_idx,
869  const int32_t cpu_thread_count
870 #endif
871 ) {
872  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
873  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
874  };
875 
876  fill_row_ids_impl(buff,
877  hash_entry_count,
878  invalid_slot_val,
879  join_column,
880  type_info
881 #ifndef __CUDACC__
882  ,
883  sd_inner_proxy,
884  sd_outer_proxy,
885  cpu_thread_idx,
886  cpu_thread_count
887 #endif
888  ,
889  slot_sel);
890 }
891 
893  const int64_t hash_entry_count,
894  const int32_t invalid_slot_val,
895  const JoinColumn join_column,
896  const JoinColumnTypeInfo type_info
897 #ifndef __CUDACC__
898  ,
899  const void* sd_inner_proxy,
900  const void* sd_outer_proxy,
901  const int32_t cpu_thread_idx,
902  const int32_t cpu_thread_count
903 #endif
904  ,
905  const int64_t bucket_normalization) {
906  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
908  pos_buff, elem, type_info.min_val, bucket_normalization);
909  };
910  fill_row_ids_impl(buff,
911  hash_entry_count,
912  invalid_slot_val,
913  join_column,
914  type_info
915 #ifndef __CUDACC__
916  ,
917  sd_inner_proxy,
918  sd_outer_proxy,
919  cpu_thread_idx,
920  cpu_thread_count
921 #endif
922  ,
923  slot_sel);
924 }
925 
926 template <typename SLOT_SELECTOR>
928  const int64_t hash_entry_count,
929  const int32_t invalid_slot_val,
930  const JoinColumn join_column,
931  const JoinColumnTypeInfo type_info,
932  const ShardInfo shard_info
933 #ifndef __CUDACC__
934  ,
935  const void* sd_inner_proxy,
936  const void* sd_outer_proxy,
937  const int32_t cpu_thread_idx,
938  const int32_t cpu_thread_count
939 #endif
940  ,
941  SLOT_SELECTOR slot_selector) {
942 
943  int32_t* pos_buff = buff;
944  int32_t* count_buff = buff + hash_entry_count;
945  int32_t* id_buff = count_buff + hash_entry_count;
946 
947 #ifdef __CUDACC__
948  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
949  int32_t step = blockDim.x * gridDim.x;
950 #else
951  int32_t start = cpu_thread_idx;
952  int32_t step = cpu_thread_count;
953 #endif
954  JoinColumnTyped col{&join_column, &type_info};
955  for (auto item : col.slice(start, step)) {
956  const size_t index = item.index;
957  int64_t elem = item.element;
958  if (elem == type_info.null_val) {
959  if (type_info.uses_bw_eq) {
960  elem = type_info.translated_null_val;
961  } else {
962  continue;
963  }
964  }
965 #ifndef __CUDACC__
966  if (sd_inner_proxy &&
967  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
968  const auto outer_id = translate_str_id_to_outer_dict(
969  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
970  if (outer_id == StringDictionary::INVALID_STR_ID) {
971  continue;
972  }
973  elem = outer_id;
974  }
975  CHECK_GE(elem, type_info.min_val)
976  << "Element " << elem << " less than min val " << type_info.min_val;
977 #endif
978  auto* pos_ptr = slot_selector(pos_buff, elem);
979 #ifndef __CUDACC__
980  CHECK_NE(*pos_ptr, invalid_slot_val);
981 #endif
982  const auto bin_idx = pos_ptr - pos_buff;
983  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
984  id_buff[id_buff_idx] = static_cast<int32_t>(index);
985  }
986 }
987 
988 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
989  const int64_t hash_entry_count,
990  const int32_t invalid_slot_val,
991  const JoinColumn join_column,
992  const JoinColumnTypeInfo type_info,
993  const ShardInfo shard_info
994 #ifndef __CUDACC__
995  ,
996  const void* sd_inner_proxy,
997  const void* sd_outer_proxy,
998  const int32_t cpu_thread_idx,
999  const int32_t cpu_thread_count
1000 #endif
1001 ) {
1002  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
1003  return SUFFIX(get_hash_slot_sharded)(pos_buff,
1004  elem,
1005  type_info.min_val,
1006  shard_info.entry_count_per_shard,
1007  shard_info.num_shards,
1008  shard_info.device_count);
1009  };
1010 
1011  fill_row_ids_impl(buff,
1012  hash_entry_count,
1013  invalid_slot_val,
1014  join_column,
1015  type_info
1016 #ifndef __CUDACC__
1017  ,
1018  sd_inner_proxy,
1019  sd_outer_proxy,
1020  cpu_thread_idx,
1021  cpu_thread_count
1022 #endif
1023  ,
1024  slot_sel);
1025 }
1026 
1028  const int64_t hash_entry_count,
1029  const int32_t invalid_slot_val,
1030  const JoinColumn join_column,
1031  const JoinColumnTypeInfo type_info,
1032  const ShardInfo shard_info
1033 #ifndef __CUDACC__
1034  ,
1035  const void* sd_inner_proxy,
1036  const void* sd_outer_proxy,
1037  const int32_t cpu_thread_idx,
1038  const int32_t cpu_thread_count
1039 #endif
1040  ,
1041  const int64_t bucket_normalization) {
1042  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1043  auto elem) {
1044  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1045  elem,
1046  type_info.min_val,
1047  shard_info.entry_count_per_shard,
1048  shard_info.num_shards,
1049  shard_info.device_count,
1050  bucket_normalization);
1051  };
1052 
1053  fill_row_ids_impl(buff,
1054  hash_entry_count,
1055  invalid_slot_val,
1056  join_column,
1057  type_info
1058 #ifndef __CUDACC__
1059  ,
1060  sd_inner_proxy,
1061  sd_outer_proxy,
1062  cpu_thread_idx,
1063  cpu_thread_count
1064 #endif
1065  ,
1066  slot_sel);
1067 }
1068 
1069 template <typename T, typename KEY_HANDLER>
1071  const T* composite_key_dict,
1072  const int64_t hash_entry_count,
1073  const int32_t invalid_slot_val,
1074  const KEY_HANDLER* f,
1075  const int64_t num_elems
1076 #ifndef __CUDACC__
1077  ,
1078  const int32_t cpu_thread_idx,
1079  const int32_t cpu_thread_count
1080 #endif
1081 ) {
1082  int32_t* pos_buff = buff;
1083  int32_t* count_buff = buff + hash_entry_count;
1084  int32_t* id_buff = count_buff + hash_entry_count;
1085 #ifdef __CUDACC__
1086  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1087  int32_t step = blockDim.x * gridDim.x;
1088 #else
1089  int32_t start = cpu_thread_idx;
1090  int32_t step = cpu_thread_count;
1091 #endif
1092 
1093  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1094 #ifdef __CUDACC__
1095  assert(composite_key_dict);
1096 #endif
1097  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1098  auto key_buff_handler = [composite_key_dict,
1099  hash_entry_count,
1100  pos_buff,
1101  invalid_slot_val,
1102  count_buff,
1103  id_buff,
1104  key_size_in_bytes](const int64_t row_index,
1105  const T* key_scratch_buff,
1106  const size_t key_component_count) {
1107  const T* matching_group =
1109  key_component_count,
1110  composite_key_dict,
1111  hash_entry_count,
1112  key_size_in_bytes);
1113  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1114  int32_t* pos_ptr = pos_buff + entry_idx;
1115 #ifndef __CUDACC__
1116  CHECK_NE(*pos_ptr, invalid_slot_val);
1117 #endif
1118  const auto bin_idx = pos_ptr - pos_buff;
1119  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1120  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1121  return 0;
1122  };
1123 
1124  JoinColumnTuple cols(
1125  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1126  for (auto& it : cols.slice(start, step)) {
1127  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1128  }
1129  return;
1130 }
1131 
1132 #undef mapd_add
1133 
1134 template <typename KEY_HANDLER>
1136  int32_t* row_count_buffer,
1137  const uint32_t b,
1138  const int64_t num_elems,
1139  const KEY_HANDLER* f
1140 #ifndef __CUDACC__
1141  ,
1142  const int32_t cpu_thread_idx,
1143  const int32_t cpu_thread_count
1144 #endif
1145 ) {
1146 #ifdef __CUDACC__
1147  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1148  int32_t step = blockDim.x * gridDim.x;
1149 #else
1150  int32_t start = cpu_thread_idx;
1151  int32_t step = cpu_thread_count;
1152 #endif
1153 
1154  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1155  const int64_t entry_idx,
1156  const int64_t* key_scratch_buff,
1157  const size_t key_component_count) {
1158  if (row_count_buffer) {
1159  row_count_buffer[entry_idx] += 1;
1160  }
1161 
1162  const uint64_t hash =
1163  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1164  const uint32_t index = hash >> (64 - b);
1165  const auto rank = get_rank(hash << b, 64 - b);
1166 #ifdef __CUDACC__
1167  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1168 #else
1169  hll_buffer[index] = std::max(hll_buffer[index], rank);
1170 #endif
1171 
1172  return 0;
1173  };
1174 
1175  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1176 
1177  JoinColumnTuple cols(
1178  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1179  for (auto& it : cols.slice(start, step)) {
1180  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1181  }
1182 }
1183 
1184 #ifdef __CUDACC__
1185 namespace {
1186 // TODO(adb): put these in a header file so they are not duplicated between here and
1187 // cuda_mapd_rt.cu
1188 __device__ double atomicMin(double* address, double val) {
1189  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1190  unsigned long long int old = *address_as_ull, assumed;
1191 
1192  do {
1193  assumed = old;
1194  old = atomicCAS(address_as_ull,
1195  assumed,
1196  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1197  } while (assumed != old);
1198 
1199  return __longlong_as_double(old);
1200 }
1201 } // namespace
1202 #endif
1203 
1204 template <size_t N>
1205 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1206  const JoinColumn* join_column,
1207  const JoinColumnTypeInfo* type_info,
1208  const double bucket_sz_threshold
1209 #ifndef __CUDACC__
1210  ,
1211  const int32_t cpu_thread_idx,
1212  const int32_t cpu_thread_count
1213 #endif
1214 ) {
1215 #ifdef __CUDACC__
1216  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1217  int32_t step = blockDim.x * gridDim.x;
1218 #else
1219  int32_t start = cpu_thread_idx;
1220  int32_t step = cpu_thread_count;
1221 #endif
1222  JoinColumnIterator it(join_column, type_info, start, step);
1223  for (; it; ++it) {
1224  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1225  double bounds[2 * N];
1226  for (size_t j = 0; j < 2 * N; j++) {
1227  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1228  }
1229 
1230  for (size_t j = 0; j < N; j++) {
1231  const auto diff = bounds[j + N] - bounds[j];
1232 #ifdef __CUDACC__
1233  if (diff > bucket_sz_threshold) {
1234  atomicMin(&bucket_sizes_for_thread[j], diff);
1235  }
1236 #else
1237  if (diff > bucket_sz_threshold && diff < bucket_sizes_for_thread[j]) {
1238  bucket_sizes_for_thread[j] = diff;
1239  }
1240 #endif
1241  }
1242  }
1243 }
1244 
1245 #ifndef __CUDACC__
1246 
1247 template <typename InputIterator, typename OutputIterator>
1248 void inclusive_scan(InputIterator first,
1249  InputIterator last,
1250  OutputIterator out,
1251  const size_t thread_count) {
1252  using ElementType = typename InputIterator::value_type;
1253  using OffsetType = typename InputIterator::difference_type;
1254  const OffsetType elem_count = last - first;
1255  if (elem_count < 10000 || thread_count <= 1) {
1256  ElementType sum = 0;
1257  for (auto iter = first; iter != last; ++iter, ++out) {
1258  *out = sum += *iter;
1259  }
1260  return;
1261  }
1262 
1263  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1264  OffsetType start_off = 0;
1265  OffsetType end_off = std::min(step, elem_count);
1266  std::vector<ElementType> partial_sums(thread_count);
1267  std::vector<std::future<void>> counter_threads;
1268  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1269  start_off = std::min(start_off + step, elem_count),
1270  end_off = std::min(start_off + step, elem_count)) {
1271  counter_threads.push_back(std::async(
1272  std::launch::async,
1273  [first, out](
1274  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1275  ElementType sum = 0;
1276  for (auto in_iter = first + start, out_iter = out + start;
1277  in_iter != (first + end);
1278  ++in_iter, ++out_iter) {
1279  *out_iter = sum += *in_iter;
1280  }
1281  partial_sum = sum;
1282  },
1283  std::ref(partial_sums[thread_idx]),
1284  start_off,
1285  end_off));
1286  }
1287  for (auto& child : counter_threads) {
1288  child.get();
1289  }
1290 
1291  ElementType sum = 0;
1292  for (auto& s : partial_sums) {
1293  s += sum;
1294  sum = s;
1295  }
1296 
1297  counter_threads.clear();
1298  start_off = std::min(step, elem_count);
1299  end_off = std::min(start_off + step, elem_count);
1300  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1301  start_off = std::min(start_off + step, elem_count),
1302  end_off = std::min(start_off + step, elem_count)) {
1303  counter_threads.push_back(std::async(
1304  std::launch::async,
1305  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1306  for (auto iter = out + start; iter != (out + end); ++iter) {
1307  *iter += prev_sum;
1308  }
1309  },
1310  partial_sums[thread_idx],
1311  start_off,
1312  end_off));
1313  }
1314  for (auto& child : counter_threads) {
1315  child.get();
1316  }
1317 }
1318 
1319 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1321  const int64_t hash_entry_count,
1322  const int32_t invalid_slot_val,
1323  const JoinColumn& join_column,
1324  const JoinColumnTypeInfo& type_info,
1325  const void* sd_inner_proxy,
1326  const void* sd_outer_proxy,
1327  const unsigned cpu_thread_count,
1328  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1329  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1330  int32_t* pos_buff = buff;
1331  int32_t* count_buff = buff + hash_entry_count;
1332  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1333  std::vector<std::future<void>> counter_threads;
1334  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1335  counter_threads.push_back(std::async(
1336  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1337  }
1338 
1339  for (auto& child : counter_threads) {
1340  child.get();
1341  }
1342 
1343  std::vector<int32_t> count_copy(hash_entry_count, 0);
1344  CHECK_GT(hash_entry_count, int64_t(0));
1345  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1346 #if HAVE_CUDA
1347  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1348 #else
1350  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1351 #endif
1352  std::vector<std::future<void>> pos_threads;
1353  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1354  pos_threads.push_back(std::async(
1355  std::launch::async,
1356  [&](size_t thread_idx) {
1357  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1358  if (count_buff[i]) {
1359  pos_buff[i] = count_copy[i];
1360  }
1361  }
1362  },
1363  cpu_thread_idx));
1364  }
1365  for (auto& child : pos_threads) {
1366  child.get();
1367  }
1368 
1369  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1370  std::vector<std::future<void>> rowid_threads;
1371  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1372  rowid_threads.push_back(std::async(
1373  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1374  }
1375 
1376  for (auto& child : rowid_threads) {
1377  child.get();
1378  }
1379 }
1380 
1381 void fill_one_to_many_hash_table(int32_t* buff,
1382  const HashEntryInfo hash_entry_info,
1383  const int32_t invalid_slot_val,
1384  const JoinColumn& join_column,
1385  const JoinColumnTypeInfo& type_info,
1386  const void* sd_inner_proxy,
1387  const void* sd_outer_proxy,
1388  const unsigned cpu_thread_count) {
1389  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1390  invalid_slot_val,
1391  &join_column,
1392  &type_info,
1393  sd_inner_proxy,
1394  sd_outer_proxy](auto cpu_thread_idx,
1395  auto cpu_thread_count) {
1397  (count_buff,
1398  invalid_slot_val,
1399  join_column,
1400  type_info,
1401  sd_inner_proxy,
1402  sd_outer_proxy,
1403  cpu_thread_idx,
1404  cpu_thread_count);
1405  };
1406  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1407  buff,
1408  invalid_slot_val,
1409  &join_column,
1410  &type_info,
1411  sd_inner_proxy,
1412  sd_outer_proxy](auto cpu_thread_idx,
1413  auto cpu_thread_count) {
1415  (buff,
1416  hash_entry_count,
1417  invalid_slot_val,
1418  join_column,
1419  type_info,
1420  sd_inner_proxy,
1421  sd_outer_proxy,
1422  cpu_thread_idx,
1423  cpu_thread_count);
1424  };
1425 
1427  hash_entry_info.hash_entry_count,
1428  invalid_slot_val,
1429  join_column,
1430  type_info,
1431  sd_inner_proxy,
1432  sd_outer_proxy,
1433  cpu_thread_count,
1434  launch_count_matches,
1435  launch_fill_row_ids);
1436 }
1437 
1439  const HashEntryInfo hash_entry_info,
1440  const int32_t invalid_slot_val,
1441  const JoinColumn& join_column,
1442  const JoinColumnTypeInfo& type_info,
1443  const void* sd_inner_proxy,
1444  const void* sd_outer_proxy,
1445  const unsigned cpu_thread_count) {
1446  auto bucket_normalization = hash_entry_info.bucket_normalization;
1447  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1448  auto launch_count_matches = [bucket_normalization,
1449  count_buff = buff + hash_entry_count,
1450  invalid_slot_val,
1451  &join_column,
1452  &type_info,
1453  sd_inner_proxy,
1454  sd_outer_proxy](auto cpu_thread_idx,
1455  auto cpu_thread_count) {
1457  (count_buff,
1458  invalid_slot_val,
1459  join_column,
1460  type_info,
1461  sd_inner_proxy,
1462  sd_outer_proxy,
1463  cpu_thread_idx,
1464  cpu_thread_count,
1465  bucket_normalization);
1466  };
1467  auto launch_fill_row_ids = [bucket_normalization,
1468  hash_entry_count,
1469  buff,
1470  invalid_slot_val,
1471  &join_column,
1472  &type_info,
1473  sd_inner_proxy,
1474  sd_outer_proxy](auto cpu_thread_idx,
1475  auto cpu_thread_count) {
1477  (buff,
1478  hash_entry_count,
1479  invalid_slot_val,
1480  join_column,
1481  type_info,
1482  sd_inner_proxy,
1483  sd_outer_proxy,
1484  cpu_thread_idx,
1485  cpu_thread_count,
1486  bucket_normalization);
1487  };
1488 
1490  hash_entry_count,
1491  invalid_slot_val,
1492  join_column,
1493  type_info,
1494  sd_inner_proxy,
1495  sd_outer_proxy,
1496  cpu_thread_count,
1497  launch_count_matches,
1498  launch_fill_row_ids);
1499 }
1500 
1501 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1503  int32_t* buff,
1504  const int64_t hash_entry_count,
1505  const int32_t invalid_slot_val,
1506  const JoinColumn& join_column,
1507  const JoinColumnTypeInfo& type_info,
1508  const ShardInfo& shard_info,
1509  const void* sd_inner_proxy,
1510  const void* sd_outer_proxy,
1511  const unsigned cpu_thread_count,
1512  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1513  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1514  int32_t* pos_buff = buff;
1515  int32_t* count_buff = buff + hash_entry_count;
1516  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1517  std::vector<std::future<void>> counter_threads;
1518  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1519  counter_threads.push_back(std::async(
1520  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1521  }
1522 
1523  for (auto& child : counter_threads) {
1524  child.get();
1525  }
1526 
1527  std::vector<int32_t> count_copy(hash_entry_count, 0);
1528  CHECK_GT(hash_entry_count, int64_t(0));
1529  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1531  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1532  std::vector<std::future<void>> pos_threads;
1533  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1534  pos_threads.push_back(std::async(
1535  std::launch::async,
1536  [&](const unsigned thread_idx) {
1537  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1538  if (count_buff[i]) {
1539  pos_buff[i] = count_copy[i];
1540  }
1541  }
1542  },
1543  cpu_thread_idx));
1544  }
1545  for (auto& child : pos_threads) {
1546  child.get();
1547  }
1548 
1549  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1550  std::vector<std::future<void>> rowid_threads;
1551  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1552  rowid_threads.push_back(std::async(
1553  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1554  }
1555 
1556  for (auto& child : rowid_threads) {
1557  child.get();
1558  }
1559 }
1560 
1562  const int64_t hash_entry_count,
1563  const int32_t invalid_slot_val,
1564  const JoinColumn& join_column,
1565  const JoinColumnTypeInfo& type_info,
1566  const ShardInfo& shard_info,
1567  const void* sd_inner_proxy,
1568  const void* sd_outer_proxy,
1569  const unsigned cpu_thread_count) {
1570  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1571  invalid_slot_val,
1572  &join_column,
1573  &type_info,
1574  &shard_info
1575 #ifndef __CUDACC__
1576  ,
1577  sd_inner_proxy,
1578  sd_outer_proxy
1579 #endif
1580  ](auto cpu_thread_idx, auto cpu_thread_count) {
1581  return SUFFIX(count_matches_sharded)(count_buff,
1582  invalid_slot_val,
1583  join_column,
1584  type_info,
1585  shard_info
1586 #ifndef __CUDACC__
1587  ,
1588  sd_inner_proxy,
1589  sd_outer_proxy,
1590  cpu_thread_idx,
1591  cpu_thread_count
1592 #endif
1593  );
1594  };
1595 
1596  auto launch_fill_row_ids = [buff,
1597  hash_entry_count,
1598  invalid_slot_val,
1599  &join_column,
1600  &type_info,
1601  &shard_info
1602 #ifndef __CUDACC__
1603  ,
1604  sd_inner_proxy,
1605  sd_outer_proxy
1606 #endif
1607  ](auto cpu_thread_idx, auto cpu_thread_count) {
1608  return SUFFIX(fill_row_ids_sharded)(buff,
1609  hash_entry_count,
1610  invalid_slot_val,
1611  join_column,
1612  type_info,
1613  shard_info
1614 #ifndef __CUDACC__
1615  ,
1616  sd_inner_proxy,
1617  sd_outer_proxy,
1618  cpu_thread_idx,
1619  cpu_thread_count);
1620 #endif
1621  };
1622 
1624  hash_entry_count,
1625  invalid_slot_val,
1626  join_column,
1627  type_info,
1628  shard_info
1629 #ifndef __CUDACC__
1630  ,
1631  sd_inner_proxy,
1632  sd_outer_proxy,
1633  cpu_thread_count
1634 #endif
1635  ,
1636  launch_count_matches,
1637  launch_fill_row_ids);
1638 }
1639 
1640 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1641  const int64_t entry_count,
1642  const size_t key_component_count,
1643  const bool with_val_slot,
1644  const int32_t invalid_slot_val,
1645  const int32_t cpu_thread_idx,
1646  const int32_t cpu_thread_count) {
1647  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1648  entry_count,
1649  key_component_count,
1650  with_val_slot,
1651  invalid_slot_val,
1652  cpu_thread_idx,
1653  cpu_thread_count);
1654 }
1655 
1656 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1657  const int64_t entry_count,
1658  const size_t key_component_count,
1659  const bool with_val_slot,
1660  const int32_t invalid_slot_val,
1661  const int32_t cpu_thread_idx,
1662  const int32_t cpu_thread_count) {
1663  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1664  entry_count,
1665  key_component_count,
1666  with_val_slot,
1667  invalid_slot_val,
1668  cpu_thread_idx,
1669  cpu_thread_count);
1670 }
1671 
1672 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1673  const int64_t entry_count,
1674  const int32_t invalid_slot_val,
1675  const size_t key_component_count,
1676  const bool with_val_slot,
1677  const GenericKeyHandler* key_handler,
1678  const int64_t num_elems,
1679  const int32_t cpu_thread_idx,
1680  const int32_t cpu_thread_count) {
1681  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1682  entry_count,
1683  invalid_slot_val,
1684  key_component_count,
1685  with_val_slot,
1686  key_handler,
1687  num_elems,
1688  cpu_thread_idx,
1689  cpu_thread_count);
1690 }
1691 
1693  const int64_t entry_count,
1694  const int32_t invalid_slot_val,
1695  const size_t key_component_count,
1696  const bool with_val_slot,
1697  const OverlapsKeyHandler* key_handler,
1698  const int64_t num_elems,
1699  const int32_t cpu_thread_idx,
1700  const int32_t cpu_thread_count) {
1701  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1702  entry_count,
1703  invalid_slot_val,
1704  key_component_count,
1705  with_val_slot,
1706  key_handler,
1707  num_elems,
1708  cpu_thread_idx,
1709  cpu_thread_count);
1710 }
1711 
1712 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1713  const int64_t entry_count,
1714  const int32_t invalid_slot_val,
1715  const size_t key_component_count,
1716  const bool with_val_slot,
1717  const GenericKeyHandler* key_handler,
1718  const int64_t num_elems,
1719  const int32_t cpu_thread_idx,
1720  const int32_t cpu_thread_count) {
1721  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1722  entry_count,
1723  invalid_slot_val,
1724  key_component_count,
1725  with_val_slot,
1726  key_handler,
1727  num_elems,
1728  cpu_thread_idx,
1729  cpu_thread_count);
1730 }
1731 
1733  const int64_t entry_count,
1734  const int32_t invalid_slot_val,
1735  const size_t key_component_count,
1736  const bool with_val_slot,
1737  const OverlapsKeyHandler* key_handler,
1738  const int64_t num_elems,
1739  const int32_t cpu_thread_idx,
1740  const int32_t cpu_thread_count) {
1741  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1742  entry_count,
1743  invalid_slot_val,
1744  key_component_count,
1745  with_val_slot,
1746  key_handler,
1747  num_elems,
1748  cpu_thread_idx,
1749  cpu_thread_count);
1750 }
1751 
1752 template <typename T>
1754  int32_t* buff,
1755  const T* composite_key_dict,
1756  const int64_t hash_entry_count,
1757  const int32_t invalid_slot_val,
1758  const size_t key_component_count,
1759  const std::vector<JoinColumn>& join_column_per_key,
1760  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1761  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1762  const std::vector<const void*>& sd_inner_proxy_per_key,
1763  const std::vector<const void*>& sd_outer_proxy_per_key,
1764  const size_t cpu_thread_count) {
1765  int32_t* pos_buff = buff;
1766  int32_t* count_buff = buff + hash_entry_count;
1767  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1768  std::vector<std::future<void>> counter_threads;
1769  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1770  if (join_buckets_per_key.size() > 0) {
1771  counter_threads.push_back(
1772  std::async(std::launch::async,
1773  [count_buff,
1774  composite_key_dict,
1775  &hash_entry_count,
1776  &join_buckets_per_key,
1777  &join_column_per_key,
1778  cpu_thread_idx,
1779  cpu_thread_count] {
1780  const auto key_handler = OverlapsKeyHandler(
1781  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1782  &join_column_per_key[0],
1783  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1784  count_matches_baseline(count_buff,
1785  composite_key_dict,
1786  hash_entry_count,
1787  &key_handler,
1788  join_column_per_key[0].num_elems,
1789  cpu_thread_idx,
1790  cpu_thread_count);
1791  }));
1792  } else {
1793  counter_threads.push_back(std::async(
1794  std::launch::async,
1795  [count_buff,
1796  composite_key_dict,
1797  &key_component_count,
1798  &hash_entry_count,
1799  &join_column_per_key,
1800  &type_info_per_key,
1801  &sd_inner_proxy_per_key,
1802  &sd_outer_proxy_per_key,
1803  cpu_thread_idx,
1804  cpu_thread_count] {
1805  const auto key_handler = GenericKeyHandler(key_component_count,
1806  true,
1807  &join_column_per_key[0],
1808  &type_info_per_key[0],
1809  &sd_inner_proxy_per_key[0],
1810  &sd_outer_proxy_per_key[0]);
1811  count_matches_baseline(count_buff,
1812  composite_key_dict,
1813  hash_entry_count,
1814  &key_handler,
1815  join_column_per_key[0].num_elems,
1816  cpu_thread_idx,
1817  cpu_thread_count);
1818  }));
1819  }
1820  }
1821 
1822  for (auto& child : counter_threads) {
1823  child.get();
1824  }
1825 
1826  std::vector<int32_t> count_copy(hash_entry_count, 0);
1827  CHECK_GT(hash_entry_count, int64_t(0));
1828  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1830  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1831  std::vector<std::future<void>> pos_threads;
1832  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1833  pos_threads.push_back(std::async(
1834  std::launch::async,
1835  [&](const int thread_idx) {
1836  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1837  if (count_buff[i]) {
1838  pos_buff[i] = count_copy[i];
1839  }
1840  }
1841  },
1842  cpu_thread_idx));
1843  }
1844  for (auto& child : pos_threads) {
1845  child.get();
1846  }
1847 
1848  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1849  std::vector<std::future<void>> rowid_threads;
1850  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1851  if (join_buckets_per_key.size() > 0) {
1852  rowid_threads.push_back(
1853  std::async(std::launch::async,
1854  [buff,
1855  composite_key_dict,
1856  hash_entry_count,
1857  invalid_slot_val,
1858  &join_column_per_key,
1859  &join_buckets_per_key,
1860  cpu_thread_idx,
1861  cpu_thread_count] {
1862  const auto key_handler = OverlapsKeyHandler(
1863  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1864  &join_column_per_key[0],
1865  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1867  (buff,
1868  composite_key_dict,
1869  hash_entry_count,
1870  invalid_slot_val,
1871  &key_handler,
1872  join_column_per_key[0].num_elems,
1873  cpu_thread_idx,
1874  cpu_thread_count);
1875  }));
1876  } else {
1877  rowid_threads.push_back(std::async(std::launch::async,
1878  [buff,
1879  composite_key_dict,
1880  hash_entry_count,
1881  invalid_slot_val,
1882  key_component_count,
1883  &join_column_per_key,
1884  &type_info_per_key,
1885  &sd_inner_proxy_per_key,
1886  &sd_outer_proxy_per_key,
1887  cpu_thread_idx,
1888  cpu_thread_count] {
1889  const auto key_handler = GenericKeyHandler(
1890  key_component_count,
1891  true,
1892  &join_column_per_key[0],
1893  &type_info_per_key[0],
1894  &sd_inner_proxy_per_key[0],
1895  &sd_outer_proxy_per_key[0]);
1897  (buff,
1898  composite_key_dict,
1899  hash_entry_count,
1900  invalid_slot_val,
1901  &key_handler,
1902  join_column_per_key[0].num_elems,
1903  cpu_thread_idx,
1904  cpu_thread_count);
1905  }));
1906  }
1907  }
1908 
1909  for (auto& child : rowid_threads) {
1910  child.get();
1911  }
1912 }
1913 
1915  int32_t* buff,
1916  const int32_t* composite_key_dict,
1917  const int64_t hash_entry_count,
1918  const int32_t invalid_slot_val,
1919  const size_t key_component_count,
1920  const std::vector<JoinColumn>& join_column_per_key,
1921  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1922  const std::vector<JoinBucketInfo>& join_bucket_info,
1923  const std::vector<const void*>& sd_inner_proxy_per_key,
1924  const std::vector<const void*>& sd_outer_proxy_per_key,
1925  const int32_t cpu_thread_count) {
1926  fill_one_to_many_baseline_hash_table<int32_t>(buff,
1927  composite_key_dict,
1928  hash_entry_count,
1929  invalid_slot_val,
1930  key_component_count,
1931  join_column_per_key,
1932  type_info_per_key,
1933  join_bucket_info,
1934  sd_inner_proxy_per_key,
1935  sd_outer_proxy_per_key,
1936  cpu_thread_count);
1937 }
1938 
1940  int32_t* buff,
1941  const int64_t* composite_key_dict,
1942  const int64_t hash_entry_count,
1943  const int32_t invalid_slot_val,
1944  const size_t key_component_count,
1945  const std::vector<JoinColumn>& join_column_per_key,
1946  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1947  const std::vector<JoinBucketInfo>& join_bucket_info,
1948  const std::vector<const void*>& sd_inner_proxy_per_key,
1949  const std::vector<const void*>& sd_outer_proxy_per_key,
1950  const int32_t cpu_thread_count) {
1951  fill_one_to_many_baseline_hash_table<int64_t>(buff,
1952  composite_key_dict,
1953  hash_entry_count,
1954  invalid_slot_val,
1955  key_component_count,
1956  join_column_per_key,
1957  type_info_per_key,
1958  join_bucket_info,
1959  sd_inner_proxy_per_key,
1960  sd_outer_proxy_per_key,
1961  cpu_thread_count);
1962 }
1963 
1964 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
1965  const uint32_t b,
1966  const size_t padded_size_bytes,
1967  const std::vector<JoinColumn>& join_column_per_key,
1968  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1969  const int thread_count) {
1970  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1971  CHECK(!join_column_per_key.empty());
1972 
1973  std::vector<std::future<void>> approx_distinct_threads;
1974  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1975  approx_distinct_threads.push_back(std::async(
1976  std::launch::async,
1977  [&join_column_per_key,
1978  &type_info_per_key,
1979  b,
1980  hll_buffer_all_cpus,
1981  padded_size_bytes,
1982  thread_idx,
1983  thread_count] {
1984  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1985 
1986  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
1987  false,
1988  &join_column_per_key[0],
1989  &type_info_per_key[0],
1990  nullptr,
1991  nullptr);
1993  nullptr,
1994  b,
1995  join_column_per_key[0].num_elems,
1996  &key_handler,
1997  thread_idx,
1998  thread_count);
1999  }));
2000  }
2001  for (auto& child : approx_distinct_threads) {
2002  child.get();
2003  }
2004 }
2005 
2007  uint8_t* hll_buffer_all_cpus,
2008  std::vector<int32_t>& row_counts,
2009  const uint32_t b,
2010  const size_t padded_size_bytes,
2011  const std::vector<JoinColumn>& join_column_per_key,
2012  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2013  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2014  const int thread_count) {
2015  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2016  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2017  CHECK(!join_column_per_key.empty());
2018 
2019  std::vector<std::future<void>> approx_distinct_threads;
2020  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2021  approx_distinct_threads.push_back(std::async(
2022  std::launch::async,
2023  [&join_column_per_key,
2024  &join_buckets_per_key,
2025  &row_counts,
2026  b,
2027  hll_buffer_all_cpus,
2028  padded_size_bytes,
2029  thread_idx,
2030  thread_count] {
2031  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2032 
2033  const auto key_handler = OverlapsKeyHandler(
2034  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
2035  &join_column_per_key[0],
2036  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
2038  row_counts.data(),
2039  b,
2040  join_column_per_key[0].num_elems,
2041  &key_handler,
2042  thread_idx,
2043  thread_count);
2044  }));
2045  }
2046  for (auto& child : approx_distinct_threads) {
2047  child.get();
2048  }
2049 
2051  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2052 }
2053 
2054 void compute_bucket_sizes_on_cpu(std::vector<double>& bucket_sizes_for_dimension,
2055  const JoinColumn& join_column,
2056  const JoinColumnTypeInfo& type_info,
2057  const double bucket_size_threshold,
2058  const int thread_count) {
2059  std::vector<std::vector<double>> bucket_sizes_for_threads;
2060  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2061  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(),
2062  std::numeric_limits<double>::max());
2063  }
2064  std::vector<std::future<void>> threads;
2065  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2066  threads.push_back(std::async(std::launch::async,
2067  compute_bucket_sizes_impl<2>,
2068  bucket_sizes_for_threads[thread_idx].data(),
2069  &join_column,
2070  &type_info,
2071  bucket_size_threshold,
2072  thread_idx,
2073  thread_count));
2074  }
2075  for (auto& child : threads) {
2076  child.get();
2077  }
2078 
2079  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2080  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2081  if (bucket_sizes_for_threads[thread_idx][i] < bucket_sizes_for_dimension[i]) {
2082  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2083  }
2084  }
2085  }
2086 }
2087 
2088 #endif // ifndef __CUDACC__
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t num_shards
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:31
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const size_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:74
__device__ double atomicMin(double *address, double val)
#define GLOBAL
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define SUFFIX(name)
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::string getString(int32_t string_id) const
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
const int64_t null_val
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define CHECK_GT(x, y)
Definition: Logger.h:209
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
#define DEVICE
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:39
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:45
static constexpr int32_t INVALID_STR_ID
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE T SUFFIX() get_invalid_key()
#define CHECK_NE(x, y)
Definition: Logger.h:206
#define mapd_cas(address, compare, val)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
int64_t bucket_normalization
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
#define cas_cst(ptr, expected, desired)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t shard
size_t hash_entry_count
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:25
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:89
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:60
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
size_t getNormalizedHashEntryCount() const
#define store_cst(ptr, val)
#define CHECK(condition)
Definition: Logger.h:197
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:126
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)