OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
19 #include "../../Shared/shard_key.h"
20 #include "../CompareKeysInl.h"
21 #include "../HyperLogLogRank.h"
22 #include "../JoinHashTable/HashJoinKeyHandlers.h"
23 #include "../JoinHashTable/JoinColumnIterator.h"
24 #include "../MurmurHash1Inl.h"
25 #ifdef __CUDACC__
26 #include "../DecodersImpl.h"
27 #include "../GpuRtConstants.h"
28 #include "../JoinHashTable/JoinHashImpl.h"
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #include <future>
38 #endif
39 
40 #if HAVE_CUDA
41 #include <thrust/scan.h>
42 #endif
43 #include "../../Shared/funcannotations.h"
44 
45 #include <cmath>
46 #include <numeric>
47 
48 #ifndef __CUDACC__
49 namespace {
50 
71 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
72  const int64_t min_elem,
73  const int64_t max_elem,
74  const void* sd_inner_proxy,
75  const void* sd_outer_proxy) {
76  CHECK(sd_outer_proxy);
77  const auto sd_inner_dict_proxy =
78  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
79  const auto sd_outer_dict_proxy =
80  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
81  const auto elem_str = sd_inner_dict_proxy->getString(elem);
82  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
83  if (outer_id > max_elem || outer_id < min_elem) {
85  }
86  return outer_id;
87 }
88 
89 } // namespace
90 #endif
91 
92 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
93  const int64_t hash_entry_count,
94  const int32_t invalid_slot_val,
95  const int32_t cpu_thread_idx,
96  const int32_t cpu_thread_count) {
97 #ifdef __CUDACC__
98  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
99  int32_t step = blockDim.x * gridDim.x;
100 #else
101  int32_t start = cpu_thread_idx;
102  int32_t step = cpu_thread_count;
103 #endif
104  for (int64_t i = start; i < hash_entry_count; i += step) {
105  groups_buffer[i] = invalid_slot_val;
106  }
107 }
108 
109 #ifdef __CUDACC__
110 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
111 #elif defined(_MSC_VER)
112 #define mapd_cas(address, compare, val) \
113  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
114  static_cast<long>(val), \
115  static_cast<long>(compare))
116 #else
117 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
118 #endif
119 
120 template <typename SLOT_SELECTOR>
121 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
122  const int32_t invalid_slot_val,
123  const JoinColumn join_column,
124  const JoinColumnTypeInfo type_info,
125  const void* sd_inner_proxy,
126  const void* sd_outer_proxy,
127  const int32_t cpu_thread_idx,
128  const int32_t cpu_thread_count,
129  SLOT_SELECTOR slot_sel) {
130 #ifdef __CUDACC__
131  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
132  int32_t step = blockDim.x * gridDim.x;
133 #else
134  int32_t start = cpu_thread_idx;
135  int32_t step = cpu_thread_count;
136 #endif
137  JoinColumnTyped col{&join_column, &type_info};
138  for (auto item : col.slice(start, step)) {
139  const size_t index = item.index;
140  int64_t elem = item.element;
141  if (elem == type_info.null_val) {
142  if (type_info.uses_bw_eq) {
143  elem = type_info.translated_null_val;
144  } else {
145  continue;
146  }
147  }
148 #ifndef __CUDACC__
149  if (sd_inner_proxy &&
150  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
151  const auto outer_id = translate_str_id_to_outer_dict(
152  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
153  if (outer_id == StringDictionary::INVALID_STR_ID) {
154  continue;
155  }
156  elem = outer_id;
157  }
158  CHECK_GE(elem, type_info.min_val)
159  << "Element " << elem << " less than min val " << type_info.min_val;
160 #endif
161  int32_t* entry_ptr = slot_sel(elem);
162  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
163  return -1;
164  }
165  }
166  return 0;
167 };
168 
170  const int32_t invalid_slot_val,
171  const JoinColumn join_column,
172  const JoinColumnTypeInfo type_info,
173  const void* sd_inner_proxy,
174  const void* sd_outer_proxy,
175  const int32_t cpu_thread_idx,
176  const int32_t cpu_thread_count,
177  const int64_t bucket_normalization) {
178  auto slot_selector = [&](auto elem) {
180  buff, elem, type_info.min_val, bucket_normalization);
181  };
182  return fill_hash_join_buff_impl(buff,
183  invalid_slot_val,
184  join_column,
185  type_info,
186  sd_inner_proxy,
187  sd_outer_proxy,
188  cpu_thread_idx,
189  cpu_thread_count,
190  slot_selector);
191 }
192 
193 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
194  const int32_t invalid_slot_val,
195  const JoinColumn join_column,
196  const JoinColumnTypeInfo type_info,
197  const void* sd_inner_proxy,
198  const void* sd_outer_proxy,
199  const int32_t cpu_thread_idx,
200  const int32_t cpu_thread_count) {
201  auto slot_selector = [&](auto elem) {
202  return SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
203  };
204  return fill_hash_join_buff_impl(buff,
205  invalid_slot_val,
206  join_column,
207  type_info,
208  sd_inner_proxy,
209  sd_outer_proxy,
210  cpu_thread_idx,
211  cpu_thread_count,
212  slot_selector);
213 }
214 
215 template <typename SLOT_SELECTOR>
217  const int32_t invalid_slot_val,
218  const JoinColumn join_column,
219  const JoinColumnTypeInfo type_info,
220  const ShardInfo shard_info,
221  const void* sd_inner_proxy,
222  const void* sd_outer_proxy,
223  const int32_t cpu_thread_idx,
224  const int32_t cpu_thread_count,
225  SLOT_SELECTOR slot_sel) {
226 #ifdef __CUDACC__
227  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
228  int32_t step = blockDim.x * gridDim.x;
229 #else
230  int32_t start = cpu_thread_idx;
231  int32_t step = cpu_thread_count;
232 #endif
233  JoinColumnTyped col{&join_column, &type_info};
234  for (auto item : col.slice(start, step)) {
235  const size_t index = item.index;
236  int64_t elem = item.element;
237  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
238  if (shard != shard_info.shard) {
239  continue;
240  }
241  if (elem == type_info.null_val) {
242  if (type_info.uses_bw_eq) {
243  elem = type_info.translated_null_val;
244  } else {
245  continue;
246  }
247  }
248 #ifndef __CUDACC__
249  if (sd_inner_proxy &&
250  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
251  const auto outer_id = translate_str_id_to_outer_dict(
252  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
253  if (outer_id == StringDictionary::INVALID_STR_ID) {
254  continue;
255  }
256  elem = outer_id;
257  }
258  CHECK_GE(elem, type_info.min_val)
259  << "Element " << elem << " less than min val " << type_info.min_val;
260 #endif
261  int32_t* entry_ptr = slot_sel(elem, shard);
262  if (mapd_cas(entry_ptr, invalid_slot_val, index) != invalid_slot_val) {
263  return -1;
264  }
265  }
266  return 0;
267 }
268 
270  int32_t* buff,
271  const int32_t invalid_slot_val,
272  const JoinColumn join_column,
273  const JoinColumnTypeInfo type_info,
274  const ShardInfo shard_info,
275  const void* sd_inner_proxy,
276  const void* sd_outer_proxy,
277  const int32_t cpu_thread_idx,
278  const int32_t cpu_thread_count,
279  const int64_t bucket_normalization) {
280  auto slot_selector = [&](auto elem, auto shard) -> auto {
282  elem,
283  type_info.min_val,
284  shard_info.entry_count_per_shard,
285  shard,
286  shard_info.num_shards,
287  shard_info.device_count,
288  bucket_normalization);
289  };
290 
292  invalid_slot_val,
293  join_column,
294  type_info,
295  shard_info,
296  sd_inner_proxy,
297  sd_outer_proxy,
298  cpu_thread_idx,
299  cpu_thread_count,
300  slot_selector);
301 }
302 
304  const int32_t invalid_slot_val,
305  const JoinColumn join_column,
306  const JoinColumnTypeInfo type_info,
307  const ShardInfo shard_info,
308  const void* sd_inner_proxy,
309  const void* sd_outer_proxy,
310  const int32_t cpu_thread_idx,
311  const int32_t cpu_thread_count) {
312  auto slot_selector = [&](auto elem, auto shard) {
313  return SUFFIX(get_hash_slot_sharded_opt)(buff,
314  elem,
315  type_info.min_val,
316  shard_info.entry_count_per_shard,
317  shard,
318  shard_info.num_shards,
319  shard_info.device_count);
320  };
322  invalid_slot_val,
323  join_column,
324  type_info,
325  shard_info,
326  sd_inner_proxy,
327  sd_outer_proxy,
328  cpu_thread_idx,
329  cpu_thread_count,
330  slot_selector);
331 }
332 
333 template <typename T>
335  const int64_t entry_count,
336  const size_t key_component_count,
337  const bool with_val_slot,
338  const int32_t invalid_slot_val,
339  const int32_t cpu_thread_idx,
340  const int32_t cpu_thread_count) {
341 #ifdef __CUDACC__
342  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
343  int32_t step = blockDim.x * gridDim.x;
344 #else
345  int32_t start = cpu_thread_idx;
346  int32_t step = cpu_thread_count;
347 #endif
348  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
349  const T empty_key = SUFFIX(get_invalid_key)<T>();
350  for (int64_t h = start; h < entry_count; h += step) {
351  int64_t off = h * hash_entry_size;
352  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
353  for (size_t i = 0; i < key_component_count; ++i) {
354  row_ptr[i] = empty_key;
355  }
356  if (with_val_slot) {
357  row_ptr[key_component_count] = invalid_slot_val;
358  }
359  }
360 }
361 
362 #ifdef __CUDACC__
363 template <typename T>
364 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
365  const uint32_t h,
366  const T* key,
367  const size_t key_component_count,
368  const int64_t hash_entry_size) {
369  uint32_t off = h * hash_entry_size;
370  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
371  const T empty_key = SUFFIX(get_invalid_key)<T>();
372  {
373  const T old = atomicCAS(row_ptr, empty_key, *key);
374  if (empty_key == old && key_component_count > 1) {
375  for (int64_t i = 1; i <= key_component_count - 1; ++i) {
376  atomicExch(row_ptr + i, key[i]);
377  }
378  }
379  }
380  if (key_component_count > 1) {
381  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
382  // spin until the winning thread has finished writing the entire key and the init
383  // value
384  }
385  }
386  bool match = true;
387  for (uint32_t i = 0; i < key_component_count; ++i) {
388  if (row_ptr[i] != key[i]) {
389  match = false;
390  break;
391  }
392  }
393 
394  if (match) {
395  return reinterpret_cast<T*>(row_ptr + key_component_count);
396  }
397  return nullptr;
398 }
399 #else
400 
401 #ifdef _MSC_VER
402 #define cas_cst(ptr, expected, desired) \
403  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
404  reinterpret_cast<void*>(&desired), \
405  expected) == expected)
406 #define store_cst(ptr, val) \
407  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
408  reinterpret_cast<void*>(val))
409 #define load_cst(ptr) \
410  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
411 #else
412 #define cas_cst(ptr, expected, desired) \
413  __atomic_compare_exchange_n( \
414  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
415 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
416 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
417 #endif
418 
419 template <typename T>
421  const uint32_t h,
422  const T* key,
423  const size_t key_component_count,
424  const int64_t hash_entry_size) {
425  uint32_t off = h * hash_entry_size;
426  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
427  T empty_key = SUFFIX(get_invalid_key)<T>();
428  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
429  if (UNLIKELY(*key == write_pending)) {
430  // Address the singularity case where the first column contains the pending
431  // write special value. Should never happen, but avoid doing wrong things.
432  return nullptr;
433  }
434  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
435  if (success) {
436  if (key_component_count > 1) {
437  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
438  }
439  store_cst(row_ptr, *key);
440  return reinterpret_cast<T*>(row_ptr + key_component_count);
441  }
442  while (load_cst(row_ptr) == write_pending) {
443  // spin until the winning thread has finished writing the entire key
444  }
445  for (size_t i = 0; i < key_component_count; ++i) {
446  if (load_cst(row_ptr + i) != key[i]) {
447  return nullptr;
448  }
449  }
450  return reinterpret_cast<T*>(row_ptr + key_component_count);
451 }
452 
453 #undef load_cst
454 #undef store_cst
455 #undef cas_cst
456 
457 #endif // __CUDACC__
458 
459 template <typename T>
460 DEVICE int write_baseline_hash_slot(const int32_t val,
461  int8_t* hash_buff,
462  const int64_t entry_count,
463  const T* key,
464  const size_t key_component_count,
465  const bool with_val_slot,
466  const int32_t invalid_slot_val,
467  const size_t key_size_in_bytes,
468  const size_t hash_entry_size) {
469  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
470  T* matching_group = get_matching_baseline_hash_slot_at(
471  hash_buff, h, key, key_component_count, hash_entry_size);
472  if (!matching_group) {
473  uint32_t h_probe = (h + 1) % entry_count;
474  while (h_probe != h) {
475  matching_group = get_matching_baseline_hash_slot_at(
476  hash_buff, h_probe, key, key_component_count, hash_entry_size);
477  if (matching_group) {
478  break;
479  }
480  h_probe = (h_probe + 1) % entry_count;
481  }
482  }
483  if (!matching_group) {
484  return -2;
485  }
486  if (!with_val_slot) {
487  return 0;
488  }
489  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
490  return -1;
491  }
492  return 0;
493 }
494 
495 template <typename T, typename FILL_HANDLER>
497  const int64_t entry_count,
498  const int32_t invalid_slot_val,
499  const size_t key_component_count,
500  const bool with_val_slot,
501  const FILL_HANDLER* f,
502  const int64_t num_elems,
503  const int32_t cpu_thread_idx,
504  const int32_t cpu_thread_count) {
505 #ifdef __CUDACC__
506  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
507  int32_t step = blockDim.x * gridDim.x;
508 #else
509  int32_t start = cpu_thread_idx;
510  int32_t step = cpu_thread_count;
511 #endif
512 
513  T key_scratch_buff[g_maximum_conditions_to_coalesce];
514  const size_t key_size_in_bytes = key_component_count * sizeof(T);
515  const size_t hash_entry_size =
516  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
517  auto key_buff_handler = [hash_buff,
518  entry_count,
519  with_val_slot,
520  invalid_slot_val,
521  key_size_in_bytes,
522  hash_entry_size](const int64_t entry_idx,
523  const T* key_scratch_buffer,
524  const size_t key_component_count) {
525  return write_baseline_hash_slot<T>(entry_idx,
526  hash_buff,
527  entry_count,
528  key_scratch_buffer,
529  key_component_count,
530  with_val_slot,
531  invalid_slot_val,
532  key_size_in_bytes,
533  hash_entry_size);
534  };
535 
536  JoinColumnTuple cols(
537  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
538  for (auto& it : cols.slice(start, step)) {
539  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
540  if (err) {
541  return err;
542  }
543  }
544  return 0;
545 }
546 
547 #undef mapd_cas
548 
549 #ifdef __CUDACC__
550 #define mapd_add(address, val) atomicAdd(address, val)
551 #elif defined(_MSC_VER)
552 #define mapd_add(address, val) \
553  InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
554  static_cast<long>(val))
555 #else
556 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
557 #endif
558 
559 template <typename SLOT_SELECTOR>
560 DEVICE void count_matches_impl(int32_t* count_buff,
561  const int32_t invalid_slot_val,
562  const JoinColumn join_column,
563  const JoinColumnTypeInfo type_info
564 #ifndef __CUDACC__
565  ,
566  const void* sd_inner_proxy,
567  const void* sd_outer_proxy,
568  const int32_t cpu_thread_idx,
569  const int32_t cpu_thread_count
570 #endif
571  ,
572  SLOT_SELECTOR slot_selector) {
573 #ifdef __CUDACC__
574  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
575  int32_t step = blockDim.x * gridDim.x;
576 #else
577  int32_t start = cpu_thread_idx;
578  int32_t step = cpu_thread_count;
579 #endif
580  JoinColumnTyped col{&join_column, &type_info};
581  for (auto item : col.slice(start, step)) {
582  int64_t elem = item.element;
583  if (elem == type_info.null_val) {
584  if (type_info.uses_bw_eq) {
585  elem = type_info.translated_null_val;
586  } else {
587  continue;
588  }
589  }
590 #ifndef __CUDACC__
591  if (sd_inner_proxy &&
592  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
593  const auto outer_id = translate_str_id_to_outer_dict(
594  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
595  if (outer_id == StringDictionary::INVALID_STR_ID) {
596  continue;
597  }
598  elem = outer_id;
599  }
600  CHECK_GE(elem, type_info.min_val)
601  << "Element " << elem << " less than min val " << type_info.min_val;
602 #endif
603  auto* entry_ptr = slot_selector(count_buff, elem);
604  mapd_add(entry_ptr, int32_t(1));
605  }
606 }
607 
608 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
609  const int32_t invalid_slot_val,
610  const JoinColumn join_column,
611  const JoinColumnTypeInfo type_info
612 #ifndef __CUDACC__
613  ,
614  const void* sd_inner_proxy,
615  const void* sd_outer_proxy,
616  const int32_t cpu_thread_idx,
617  const int32_t cpu_thread_count
618 #endif
619 ) {
620  auto slot_sel = [&type_info](auto count_buff, auto elem) {
621  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
622  };
623  count_matches_impl(count_buff,
624  invalid_slot_val,
625  join_column,
626  type_info
627 #ifndef __CUDACC__
628  ,
629  sd_inner_proxy,
630  sd_outer_proxy,
631  cpu_thread_idx,
632  cpu_thread_count
633 #endif
634  ,
635  slot_sel);
636 }
637 
638 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
639  const int32_t invalid_slot_val,
640  const JoinColumn join_column,
641  const JoinColumnTypeInfo type_info
642 #ifndef __CUDACC__
643  ,
644  const void* sd_inner_proxy,
645  const void* sd_outer_proxy,
646  const int32_t cpu_thread_idx,
647  const int32_t cpu_thread_count
648 #endif
649  ,
650  const int64_t bucket_normalization) {
651  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
653  count_buff, elem, type_info.min_val, bucket_normalization);
654  };
655  count_matches_impl(count_buff,
656  invalid_slot_val,
657  join_column,
658  type_info
659 #ifndef __CUDACC__
660  ,
661  sd_inner_proxy,
662  sd_outer_proxy,
663  cpu_thread_idx,
664  cpu_thread_count
665 #endif
666  ,
667  slot_sel);
668 }
669 
670 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
671  const int32_t invalid_slot_val,
672  const JoinColumn join_column,
673  const JoinColumnTypeInfo type_info,
674  const ShardInfo shard_info
675 #ifndef __CUDACC__
676  ,
677  const void* sd_inner_proxy,
678  const void* sd_outer_proxy,
679  const int32_t cpu_thread_idx,
680  const int32_t cpu_thread_count
681 #endif
682 ) {
683 #ifdef __CUDACC__
684  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
685  int32_t step = blockDim.x * gridDim.x;
686 #else
687  int32_t start = cpu_thread_idx;
688  int32_t step = cpu_thread_count;
689 #endif
690  JoinColumnTyped col{&join_column, &type_info};
691  for (auto item : col.slice(start, step)) {
692  int64_t elem = item.element;
693  if (elem == type_info.null_val) {
694  if (type_info.uses_bw_eq) {
695  elem = type_info.translated_null_val;
696  } else {
697  continue;
698  }
699  }
700 #ifndef __CUDACC__
701  if (sd_inner_proxy &&
702  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
703  const auto outer_id = translate_str_id_to_outer_dict(
704  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
705  if (outer_id == StringDictionary::INVALID_STR_ID) {
706  continue;
707  }
708  elem = outer_id;
709  }
710  CHECK_GE(elem, type_info.min_val)
711  << "Element " << elem << " less than min val " << type_info.min_val;
712 #endif
713  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
714  elem,
715  type_info.min_val,
716  shard_info.entry_count_per_shard,
717  shard_info.num_shards,
718  shard_info.device_count);
719  mapd_add(entry_ptr, int32_t(1));
720  }
721 }
722 
723 template <typename T>
725  const T* key,
726  const size_t key_component_count,
727  const T* composite_key_dict,
728  const int64_t entry_count,
729  const size_t key_size_in_bytes) {
730  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
731  uint32_t off = h * key_component_count;
732  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
733  return &composite_key_dict[off];
734  }
735  uint32_t h_probe = (h + 1) % entry_count;
736  while (h_probe != h) {
737  off = h_probe * key_component_count;
738  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
739  return &composite_key_dict[off];
740  }
741  h_probe = (h_probe + 1) % entry_count;
742  }
743 #ifndef __CUDACC__
744  CHECK(false);
745 #else
746  assert(false);
747 #endif
748  return nullptr;
749 }
750 
751 template <typename T, typename KEY_HANDLER>
752 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
753  const T* composite_key_dict,
754  const int64_t entry_count,
755  const KEY_HANDLER* f,
756  const int64_t num_elems
757 #ifndef __CUDACC__
758  ,
759  const int32_t cpu_thread_idx,
760  const int32_t cpu_thread_count
761 #endif
762 ) {
763 #ifdef __CUDACC__
764  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
765  int32_t step = blockDim.x * gridDim.x;
766 #else
767  int32_t start = cpu_thread_idx;
768  int32_t step = cpu_thread_count;
769 #endif
770 #ifdef __CUDACC__
771  assert(composite_key_dict);
772 #endif
773  T key_scratch_buff[g_maximum_conditions_to_coalesce];
774  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
775  auto key_buff_handler = [composite_key_dict,
776  entry_count,
777  count_buff,
778  key_size_in_bytes](const int64_t row_entry_idx,
779  const T* key_scratch_buff,
780  const size_t key_component_count) {
781  const auto matching_group =
783  key_component_count,
784  composite_key_dict,
785  entry_count,
786  key_size_in_bytes);
787  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
788  mapd_add(&count_buff[entry_idx], int32_t(1));
789  return 0;
790  };
791 
792  JoinColumnTuple cols(
793  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
794  for (auto& it : cols.slice(start, step)) {
795  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
796  }
797 }
798 
799 template <typename SLOT_SELECTOR>
800 DEVICE void fill_row_ids_impl(int32_t* buff,
801  const int64_t hash_entry_count,
802  const int32_t invalid_slot_val,
803  const JoinColumn join_column,
804  const JoinColumnTypeInfo type_info
805 #ifndef __CUDACC__
806  ,
807  const void* sd_inner_proxy,
808  const void* sd_outer_proxy,
809  const int32_t cpu_thread_idx,
810  const int32_t cpu_thread_count
811 #endif
812  ,
813  SLOT_SELECTOR slot_selector) {
814  int32_t* pos_buff = buff;
815  int32_t* count_buff = buff + hash_entry_count;
816  int32_t* id_buff = count_buff + hash_entry_count;
817 
818 #ifdef __CUDACC__
819  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
820  int32_t step = blockDim.x * gridDim.x;
821 #else
822  int32_t start = cpu_thread_idx;
823  int32_t step = cpu_thread_count;
824 #endif
825  JoinColumnTyped col{&join_column, &type_info};
826  for (auto item : col.slice(start, step)) {
827  const size_t index = item.index;
828  int64_t elem = item.element;
829  if (elem == type_info.null_val) {
830  if (type_info.uses_bw_eq) {
831  elem = type_info.translated_null_val;
832  } else {
833  continue;
834  }
835  }
836 #ifndef __CUDACC__
837  if (sd_inner_proxy &&
838  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
839  const auto outer_id = translate_str_id_to_outer_dict(
840  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
841  if (outer_id == StringDictionary::INVALID_STR_ID) {
842  continue;
843  }
844  elem = outer_id;
845  }
846  CHECK_GE(elem, type_info.min_val)
847  << "Element " << elem << " less than min val " << type_info.min_val;
848 #endif
849  auto pos_ptr = slot_selector(pos_buff, elem);
850 #ifndef __CUDACC__
851  CHECK_NE(*pos_ptr, invalid_slot_val);
852 #endif
853  const auto bin_idx = pos_ptr - pos_buff;
854  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
855  id_buff[id_buff_idx] = static_cast<int32_t>(index);
856  }
857 }
858 
859 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
860  const int64_t hash_entry_count,
861  const int32_t invalid_slot_val,
862  const JoinColumn join_column,
863  const JoinColumnTypeInfo type_info
864 #ifndef __CUDACC__
865  ,
866  const void* sd_inner_proxy,
867  const void* sd_outer_proxy,
868  const int32_t cpu_thread_idx,
869  const int32_t cpu_thread_count
870 #endif
871 ) {
872  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
873  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
874  };
875 
876  fill_row_ids_impl(buff,
877  hash_entry_count,
878  invalid_slot_val,
879  join_column,
880  type_info
881 #ifndef __CUDACC__
882  ,
883  sd_inner_proxy,
884  sd_outer_proxy,
885  cpu_thread_idx,
886  cpu_thread_count
887 #endif
888  ,
889  slot_sel);
890 }
891 
893  const int64_t hash_entry_count,
894  const int32_t invalid_slot_val,
895  const JoinColumn join_column,
896  const JoinColumnTypeInfo type_info
897 #ifndef __CUDACC__
898  ,
899  const void* sd_inner_proxy,
900  const void* sd_outer_proxy,
901  const int32_t cpu_thread_idx,
902  const int32_t cpu_thread_count
903 #endif
904  ,
905  const int64_t bucket_normalization) {
906  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
908  pos_buff, elem, type_info.min_val, bucket_normalization);
909  };
910  fill_row_ids_impl(buff,
911  hash_entry_count,
912  invalid_slot_val,
913  join_column,
914  type_info
915 #ifndef __CUDACC__
916  ,
917  sd_inner_proxy,
918  sd_outer_proxy,
919  cpu_thread_idx,
920  cpu_thread_count
921 #endif
922  ,
923  slot_sel);
924 }
925 
926 template <typename SLOT_SELECTOR>
928  const int64_t hash_entry_count,
929  const int32_t invalid_slot_val,
930  const JoinColumn join_column,
931  const JoinColumnTypeInfo type_info,
932  const ShardInfo shard_info
933 #ifndef __CUDACC__
934  ,
935  const void* sd_inner_proxy,
936  const void* sd_outer_proxy,
937  const int32_t cpu_thread_idx,
938  const int32_t cpu_thread_count
939 #endif
940  ,
941  SLOT_SELECTOR slot_selector) {
942 
943  int32_t* pos_buff = buff;
944  int32_t* count_buff = buff + hash_entry_count;
945  int32_t* id_buff = count_buff + hash_entry_count;
946 
947 #ifdef __CUDACC__
948  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
949  int32_t step = blockDim.x * gridDim.x;
950 #else
951  int32_t start = cpu_thread_idx;
952  int32_t step = cpu_thread_count;
953 #endif
954  JoinColumnTyped col{&join_column, &type_info};
955  for (auto item : col.slice(start, step)) {
956  const size_t index = item.index;
957  int64_t elem = item.element;
958  if (elem == type_info.null_val) {
959  if (type_info.uses_bw_eq) {
960  elem = type_info.translated_null_val;
961  } else {
962  continue;
963  }
964  }
965 #ifndef __CUDACC__
966  if (sd_inner_proxy &&
967  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
968  const auto outer_id = translate_str_id_to_outer_dict(
969  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
970  if (outer_id == StringDictionary::INVALID_STR_ID) {
971  continue;
972  }
973  elem = outer_id;
974  }
975  CHECK_GE(elem, type_info.min_val)
976  << "Element " << elem << " less than min val " << type_info.min_val;
977 #endif
978  auto* pos_ptr = slot_selector(pos_buff, elem);
979 #ifndef __CUDACC__
980  CHECK_NE(*pos_ptr, invalid_slot_val);
981 #endif
982  const auto bin_idx = pos_ptr - pos_buff;
983  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
984  id_buff[id_buff_idx] = static_cast<int32_t>(index);
985  }
986 }
987 
988 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
989  const int64_t hash_entry_count,
990  const int32_t invalid_slot_val,
991  const JoinColumn join_column,
992  const JoinColumnTypeInfo type_info,
993  const ShardInfo shard_info
994 #ifndef __CUDACC__
995  ,
996  const void* sd_inner_proxy,
997  const void* sd_outer_proxy,
998  const int32_t cpu_thread_idx,
999  const int32_t cpu_thread_count
1000 #endif
1001 ) {
1002  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
1003  return SUFFIX(get_hash_slot_sharded)(pos_buff,
1004  elem,
1005  type_info.min_val,
1006  shard_info.entry_count_per_shard,
1007  shard_info.num_shards,
1008  shard_info.device_count);
1009  };
1010 
1011  fill_row_ids_impl(buff,
1012  hash_entry_count,
1013  invalid_slot_val,
1014  join_column,
1015  type_info
1016 #ifndef __CUDACC__
1017  ,
1018  sd_inner_proxy,
1019  sd_outer_proxy,
1020  cpu_thread_idx,
1021  cpu_thread_count
1022 #endif
1023  ,
1024  slot_sel);
1025 }
1026 
1028  const int64_t hash_entry_count,
1029  const int32_t invalid_slot_val,
1030  const JoinColumn join_column,
1031  const JoinColumnTypeInfo type_info,
1032  const ShardInfo shard_info
1033 #ifndef __CUDACC__
1034  ,
1035  const void* sd_inner_proxy,
1036  const void* sd_outer_proxy,
1037  const int32_t cpu_thread_idx,
1038  const int32_t cpu_thread_count
1039 #endif
1040  ,
1041  const int64_t bucket_normalization) {
1042  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1043  auto elem) {
1044  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1045  elem,
1046  type_info.min_val,
1047  shard_info.entry_count_per_shard,
1048  shard_info.num_shards,
1049  shard_info.device_count,
1050  bucket_normalization);
1051  };
1052 
1053  fill_row_ids_impl(buff,
1054  hash_entry_count,
1055  invalid_slot_val,
1056  join_column,
1057  type_info
1058 #ifndef __CUDACC__
1059  ,
1060  sd_inner_proxy,
1061  sd_outer_proxy,
1062  cpu_thread_idx,
1063  cpu_thread_count
1064 #endif
1065  ,
1066  slot_sel);
1067 }
1068 
1069 template <typename T, typename KEY_HANDLER>
1071  const T* composite_key_dict,
1072  const int64_t hash_entry_count,
1073  const int32_t invalid_slot_val,
1074  const KEY_HANDLER* f,
1075  const int64_t num_elems
1076 #ifndef __CUDACC__
1077  ,
1078  const int32_t cpu_thread_idx,
1079  const int32_t cpu_thread_count
1080 #endif
1081 ) {
1082  int32_t* pos_buff = buff;
1083  int32_t* count_buff = buff + hash_entry_count;
1084  int32_t* id_buff = count_buff + hash_entry_count;
1085 #ifdef __CUDACC__
1086  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1087  int32_t step = blockDim.x * gridDim.x;
1088 #else
1089  int32_t start = cpu_thread_idx;
1090  int32_t step = cpu_thread_count;
1091 #endif
1092 
1093  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1094 #ifdef __CUDACC__
1095  assert(composite_key_dict);
1096 #endif
1097  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1098  auto key_buff_handler = [composite_key_dict,
1099  hash_entry_count,
1100  pos_buff,
1101  invalid_slot_val,
1102  count_buff,
1103  id_buff,
1104  key_size_in_bytes](const int64_t row_index,
1105  const T* key_scratch_buff,
1106  const size_t key_component_count) {
1107  const T* matching_group =
1109  key_component_count,
1110  composite_key_dict,
1111  hash_entry_count,
1112  key_size_in_bytes);
1113  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1114  int32_t* pos_ptr = pos_buff + entry_idx;
1115 #ifndef __CUDACC__
1116  CHECK_NE(*pos_ptr, invalid_slot_val);
1117 #endif
1118  const auto bin_idx = pos_ptr - pos_buff;
1119  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1120  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1121  return 0;
1122  };
1123 
1124  JoinColumnTuple cols(
1125  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1126  for (auto& it : cols.slice(start, step)) {
1127  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1128  }
1129  return;
1130 }
1131 
1132 #undef mapd_add
1133 
1134 template <typename KEY_HANDLER>
1136  int32_t* row_count_buffer,
1137  const uint32_t b,
1138  const int64_t num_elems,
1139  const KEY_HANDLER* f
1140 #ifndef __CUDACC__
1141  ,
1142  const int32_t cpu_thread_idx,
1143  const int32_t cpu_thread_count
1144 #endif
1145 ) {
1146 #ifdef __CUDACC__
1147  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1148  int32_t step = blockDim.x * gridDim.x;
1149 #else
1150  int32_t start = cpu_thread_idx;
1151  int32_t step = cpu_thread_count;
1152 #endif
1153 
1154  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1155  const int64_t entry_idx,
1156  const int64_t* key_scratch_buff,
1157  const size_t key_component_count) {
1158  if (row_count_buffer) {
1159  row_count_buffer[entry_idx] += 1;
1160  }
1161 
1162  const uint64_t hash =
1163  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1164  const uint32_t index = hash >> (64 - b);
1165  const auto rank = get_rank(hash << b, 64 - b);
1166 #ifdef __CUDACC__
1167  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1168 #else
1169  hll_buffer[index] = std::max(hll_buffer[index], rank);
1170 #endif
1171 
1172  return 0;
1173  };
1174 
1175  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1176 
1177  JoinColumnTuple cols(
1178  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1179  for (auto& it : cols.slice(start, step)) {
1180  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1181  }
1182 }
1183 
1184 #ifdef __CUDACC__
1185 namespace {
1186 // TODO(adb): put these in a header file so they are not duplicated between here and
1187 // cuda_mapd_rt.cu
1188 __device__ double atomicMin(double* address, double val) {
1189  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1190  unsigned long long int old = *address_as_ull, assumed;
1191 
1192  do {
1193  assumed = old;
1194  old = atomicCAS(address_as_ull,
1195  assumed,
1196  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1197  } while (assumed != old);
1198 
1199  return __longlong_as_double(old);
1200 }
1201 } // namespace
1202 #endif
1203 
1204 template <size_t N>
1205 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1206  const JoinColumn* join_column,
1207  const JoinColumnTypeInfo* type_info,
1208  const double bucket_sz_threshold,
1209  const int32_t cpu_thread_idx,
1210  const int32_t cpu_thread_count) {
1211 #ifdef __CUDACC__
1212  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1213  int32_t step = blockDim.x * gridDim.x;
1214 #else
1215  int32_t start = cpu_thread_idx;
1216  int32_t step = cpu_thread_count;
1217 #endif
1218  JoinColumnIterator it(join_column, type_info, start, step);
1219  for (; it; ++it) {
1220  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1221  double bounds[2 * N];
1222  for (size_t j = 0; j < 2 * N; j++) {
1223  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1224  }
1225 
1226  for (size_t j = 0; j < N; j++) {
1227  const auto diff = bounds[j + N] - bounds[j];
1228 #ifdef __CUDACC__
1229  if (diff > bucket_sz_threshold) {
1230  atomicMin(&bucket_sizes_for_thread[j], diff);
1231  }
1232 #else
1233  if (diff > bucket_sz_threshold && diff < bucket_sizes_for_thread[j]) {
1234  bucket_sizes_for_thread[j] = diff;
1235  }
1236 #endif
1237  }
1238  }
1239 }
1240 
1241 #ifndef __CUDACC__
1242 
1243 template <typename InputIterator, typename OutputIterator>
1244 void inclusive_scan(InputIterator first,
1245  InputIterator last,
1246  OutputIterator out,
1247  const size_t thread_count) {
1248  using ElementType = typename InputIterator::value_type;
1249  using OffsetType = typename InputIterator::difference_type;
1250  const OffsetType elem_count = last - first;
1251  if (elem_count < 10000 || thread_count <= 1) {
1252  ElementType sum = 0;
1253  for (auto iter = first; iter != last; ++iter, ++out) {
1254  *out = sum += *iter;
1255  }
1256  return;
1257  }
1258 
1259  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1260  OffsetType start_off = 0;
1261  OffsetType end_off = std::min(step, elem_count);
1262  std::vector<ElementType> partial_sums(thread_count);
1263  std::vector<std::future<void>> counter_threads;
1264  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1265  start_off = std::min(start_off + step, elem_count),
1266  end_off = std::min(start_off + step, elem_count)) {
1267  counter_threads.push_back(std::async(
1268  std::launch::async,
1269  [first, out](
1270  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1271  ElementType sum = 0;
1272  for (auto in_iter = first + start, out_iter = out + start;
1273  in_iter != (first + end);
1274  ++in_iter, ++out_iter) {
1275  *out_iter = sum += *in_iter;
1276  }
1277  partial_sum = sum;
1278  },
1279  std::ref(partial_sums[thread_idx]),
1280  start_off,
1281  end_off));
1282  }
1283  for (auto& child : counter_threads) {
1284  child.get();
1285  }
1286 
1287  ElementType sum = 0;
1288  for (auto& s : partial_sums) {
1289  s += sum;
1290  sum = s;
1291  }
1292 
1293  counter_threads.clear();
1294  start_off = std::min(step, elem_count);
1295  end_off = std::min(start_off + step, elem_count);
1296  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1297  start_off = std::min(start_off + step, elem_count),
1298  end_off = std::min(start_off + step, elem_count)) {
1299  counter_threads.push_back(std::async(
1300  std::launch::async,
1301  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1302  for (auto iter = out + start; iter != (out + end); ++iter) {
1303  *iter += prev_sum;
1304  }
1305  },
1306  partial_sums[thread_idx],
1307  start_off,
1308  end_off));
1309  }
1310  for (auto& child : counter_threads) {
1311  child.get();
1312  }
1313 }
1314 
1315 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1317  const int64_t hash_entry_count,
1318  const int32_t invalid_slot_val,
1319  const JoinColumn& join_column,
1320  const JoinColumnTypeInfo& type_info,
1321  const void* sd_inner_proxy,
1322  const void* sd_outer_proxy,
1323  const unsigned cpu_thread_count,
1324  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1325  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1326  int32_t* pos_buff = buff;
1327  int32_t* count_buff = buff + hash_entry_count;
1328  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1329  std::vector<std::future<void>> counter_threads;
1330  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1331  counter_threads.push_back(std::async(
1332  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1333  }
1334 
1335  for (auto& child : counter_threads) {
1336  child.get();
1337  }
1338 
1339  std::vector<int32_t> count_copy(hash_entry_count, 0);
1340  CHECK_GT(hash_entry_count, int64_t(0));
1341  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1342 #if HAVE_CUDA
1343  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1344 #else
1346  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1347 #endif
1348  std::vector<std::future<void>> pos_threads;
1349  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1350  pos_threads.push_back(std::async(
1351  std::launch::async,
1352  [&](size_t thread_idx) {
1353  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1354  if (count_buff[i]) {
1355  pos_buff[i] = count_copy[i];
1356  }
1357  }
1358  },
1359  cpu_thread_idx));
1360  }
1361  for (auto& child : pos_threads) {
1362  child.get();
1363  }
1364 
1365  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1366  std::vector<std::future<void>> rowid_threads;
1367  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1368  rowid_threads.push_back(std::async(
1369  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1370  }
1371 
1372  for (auto& child : rowid_threads) {
1373  child.get();
1374  }
1375 }
1376 
1377 void fill_one_to_many_hash_table(int32_t* buff,
1378  const HashEntryInfo hash_entry_info,
1379  const int32_t invalid_slot_val,
1380  const JoinColumn& join_column,
1381  const JoinColumnTypeInfo& type_info,
1382  const void* sd_inner_proxy,
1383  const void* sd_outer_proxy,
1384  const unsigned cpu_thread_count) {
1385  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1386  invalid_slot_val,
1387  &join_column,
1388  &type_info,
1389  sd_inner_proxy,
1390  sd_outer_proxy](auto cpu_thread_idx,
1391  auto cpu_thread_count) {
1393  (count_buff,
1394  invalid_slot_val,
1395  join_column,
1396  type_info,
1397  sd_inner_proxy,
1398  sd_outer_proxy,
1399  cpu_thread_idx,
1400  cpu_thread_count);
1401  };
1402  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1403  buff,
1404  invalid_slot_val,
1405  &join_column,
1406  &type_info,
1407  sd_inner_proxy,
1408  sd_outer_proxy](auto cpu_thread_idx,
1409  auto cpu_thread_count) {
1411  (buff,
1412  hash_entry_count,
1413  invalid_slot_val,
1414  join_column,
1415  type_info,
1416  sd_inner_proxy,
1417  sd_outer_proxy,
1418  cpu_thread_idx,
1419  cpu_thread_count);
1420  };
1421 
1423  hash_entry_info.hash_entry_count,
1424  invalid_slot_val,
1425  join_column,
1426  type_info,
1427  sd_inner_proxy,
1428  sd_outer_proxy,
1429  cpu_thread_count,
1430  launch_count_matches,
1431  launch_fill_row_ids);
1432 }
1433 
1435  const HashEntryInfo hash_entry_info,
1436  const int32_t invalid_slot_val,
1437  const JoinColumn& join_column,
1438  const JoinColumnTypeInfo& type_info,
1439  const void* sd_inner_proxy,
1440  const void* sd_outer_proxy,
1441  const unsigned cpu_thread_count) {
1442  auto bucket_normalization = hash_entry_info.bucket_normalization;
1443  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1444  auto launch_count_matches = [bucket_normalization,
1445  count_buff = buff + hash_entry_count,
1446  invalid_slot_val,
1447  &join_column,
1448  &type_info,
1449  sd_inner_proxy,
1450  sd_outer_proxy](auto cpu_thread_idx,
1451  auto cpu_thread_count) {
1453  (count_buff,
1454  invalid_slot_val,
1455  join_column,
1456  type_info,
1457  sd_inner_proxy,
1458  sd_outer_proxy,
1459  cpu_thread_idx,
1460  cpu_thread_count,
1461  bucket_normalization);
1462  };
1463  auto launch_fill_row_ids = [bucket_normalization,
1464  hash_entry_count,
1465  buff,
1466  invalid_slot_val,
1467  &join_column,
1468  &type_info,
1469  sd_inner_proxy,
1470  sd_outer_proxy](auto cpu_thread_idx,
1471  auto cpu_thread_count) {
1473  (buff,
1474  hash_entry_count,
1475  invalid_slot_val,
1476  join_column,
1477  type_info,
1478  sd_inner_proxy,
1479  sd_outer_proxy,
1480  cpu_thread_idx,
1481  cpu_thread_count,
1482  bucket_normalization);
1483  };
1484 
1486  hash_entry_count,
1487  invalid_slot_val,
1488  join_column,
1489  type_info,
1490  sd_inner_proxy,
1491  sd_outer_proxy,
1492  cpu_thread_count,
1493  launch_count_matches,
1494  launch_fill_row_ids);
1495 }
1496 
1497 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1499  int32_t* buff,
1500  const int64_t hash_entry_count,
1501  const int32_t invalid_slot_val,
1502  const JoinColumn& join_column,
1503  const JoinColumnTypeInfo& type_info,
1504  const ShardInfo& shard_info,
1505  const void* sd_inner_proxy,
1506  const void* sd_outer_proxy,
1507  const unsigned cpu_thread_count,
1508  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1509  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1510  int32_t* pos_buff = buff;
1511  int32_t* count_buff = buff + hash_entry_count;
1512  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1513  std::vector<std::future<void>> counter_threads;
1514  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1515  counter_threads.push_back(std::async(
1516  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1517  }
1518 
1519  for (auto& child : counter_threads) {
1520  child.get();
1521  }
1522 
1523  std::vector<int32_t> count_copy(hash_entry_count, 0);
1524  CHECK_GT(hash_entry_count, int64_t(0));
1525  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1527  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1528  std::vector<std::future<void>> pos_threads;
1529  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1530  pos_threads.push_back(std::async(
1531  std::launch::async,
1532  [&](const unsigned thread_idx) {
1533  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1534  if (count_buff[i]) {
1535  pos_buff[i] = count_copy[i];
1536  }
1537  }
1538  },
1539  cpu_thread_idx));
1540  }
1541  for (auto& child : pos_threads) {
1542  child.get();
1543  }
1544 
1545  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1546  std::vector<std::future<void>> rowid_threads;
1547  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1548  rowid_threads.push_back(std::async(
1549  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1550  }
1551 
1552  for (auto& child : rowid_threads) {
1553  child.get();
1554  }
1555 }
1556 
1558  const int64_t hash_entry_count,
1559  const int32_t invalid_slot_val,
1560  const JoinColumn& join_column,
1561  const JoinColumnTypeInfo& type_info,
1562  const ShardInfo& shard_info,
1563  const void* sd_inner_proxy,
1564  const void* sd_outer_proxy,
1565  const unsigned cpu_thread_count) {
1566  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1567  invalid_slot_val,
1568  &join_column,
1569  &type_info,
1570  &shard_info
1571 #ifndef __CUDACC__
1572  ,
1573  sd_inner_proxy,
1574  sd_outer_proxy
1575 #endif
1576  ](auto cpu_thread_idx, auto cpu_thread_count) {
1577  return SUFFIX(count_matches_sharded)(count_buff,
1578  invalid_slot_val,
1579  join_column,
1580  type_info,
1581  shard_info
1582 #ifndef __CUDACC__
1583  ,
1584  sd_inner_proxy,
1585  sd_outer_proxy,
1586  cpu_thread_idx,
1587  cpu_thread_count
1588 #endif
1589  );
1590  };
1591 
1592  auto launch_fill_row_ids = [buff,
1593  hash_entry_count,
1594  invalid_slot_val,
1595  &join_column,
1596  &type_info,
1597  &shard_info
1598 #ifndef __CUDACC__
1599  ,
1600  sd_inner_proxy,
1601  sd_outer_proxy
1602 #endif
1603  ](auto cpu_thread_idx, auto cpu_thread_count) {
1604  return SUFFIX(fill_row_ids_sharded)(buff,
1605  hash_entry_count,
1606  invalid_slot_val,
1607  join_column,
1608  type_info,
1609  shard_info
1610 #ifndef __CUDACC__
1611  ,
1612  sd_inner_proxy,
1613  sd_outer_proxy,
1614  cpu_thread_idx,
1615  cpu_thread_count);
1616 #endif
1617  };
1618 
1620  hash_entry_count,
1621  invalid_slot_val,
1622  join_column,
1623  type_info,
1624  shard_info
1625 #ifndef __CUDACC__
1626  ,
1627  sd_inner_proxy,
1628  sd_outer_proxy,
1629  cpu_thread_count
1630 #endif
1631  ,
1632  launch_count_matches,
1633  launch_fill_row_ids);
1634 }
1635 
1636 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1637  const int64_t entry_count,
1638  const size_t key_component_count,
1639  const bool with_val_slot,
1640  const int32_t invalid_slot_val,
1641  const int32_t cpu_thread_idx,
1642  const int32_t cpu_thread_count) {
1643  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1644  entry_count,
1645  key_component_count,
1646  with_val_slot,
1647  invalid_slot_val,
1648  cpu_thread_idx,
1649  cpu_thread_count);
1650 }
1651 
1652 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1653  const int64_t entry_count,
1654  const size_t key_component_count,
1655  const bool with_val_slot,
1656  const int32_t invalid_slot_val,
1657  const int32_t cpu_thread_idx,
1658  const int32_t cpu_thread_count) {
1659  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1660  entry_count,
1661  key_component_count,
1662  with_val_slot,
1663  invalid_slot_val,
1664  cpu_thread_idx,
1665  cpu_thread_count);
1666 }
1667 
1668 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1669  const int64_t entry_count,
1670  const int32_t invalid_slot_val,
1671  const size_t key_component_count,
1672  const bool with_val_slot,
1673  const GenericKeyHandler* key_handler,
1674  const int64_t num_elems,
1675  const int32_t cpu_thread_idx,
1676  const int32_t cpu_thread_count) {
1677  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1678  entry_count,
1679  invalid_slot_val,
1680  key_component_count,
1681  with_val_slot,
1682  key_handler,
1683  num_elems,
1684  cpu_thread_idx,
1685  cpu_thread_count);
1686 }
1687 
1689  const int64_t entry_count,
1690  const int32_t invalid_slot_val,
1691  const size_t key_component_count,
1692  const bool with_val_slot,
1693  const OverlapsKeyHandler* key_handler,
1694  const int64_t num_elems,
1695  const int32_t cpu_thread_idx,
1696  const int32_t cpu_thread_count) {
1697  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1698  entry_count,
1699  invalid_slot_val,
1700  key_component_count,
1701  with_val_slot,
1702  key_handler,
1703  num_elems,
1704  cpu_thread_idx,
1705  cpu_thread_count);
1706 }
1707 
1708 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1709  const int64_t entry_count,
1710  const int32_t invalid_slot_val,
1711  const size_t key_component_count,
1712  const bool with_val_slot,
1713  const GenericKeyHandler* key_handler,
1714  const int64_t num_elems,
1715  const int32_t cpu_thread_idx,
1716  const int32_t cpu_thread_count) {
1717  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1718  entry_count,
1719  invalid_slot_val,
1720  key_component_count,
1721  with_val_slot,
1722  key_handler,
1723  num_elems,
1724  cpu_thread_idx,
1725  cpu_thread_count);
1726 }
1727 
1729  const int64_t entry_count,
1730  const int32_t invalid_slot_val,
1731  const size_t key_component_count,
1732  const bool with_val_slot,
1733  const OverlapsKeyHandler* key_handler,
1734  const int64_t num_elems,
1735  const int32_t cpu_thread_idx,
1736  const int32_t cpu_thread_count) {
1737  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1738  entry_count,
1739  invalid_slot_val,
1740  key_component_count,
1741  with_val_slot,
1742  key_handler,
1743  num_elems,
1744  cpu_thread_idx,
1745  cpu_thread_count);
1746 }
1747 
1748 template <typename T>
1750  int32_t* buff,
1751  const T* composite_key_dict,
1752  const int64_t hash_entry_count,
1753  const int32_t invalid_slot_val,
1754  const size_t key_component_count,
1755  const std::vector<JoinColumn>& join_column_per_key,
1756  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1757  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1758  const std::vector<const void*>& sd_inner_proxy_per_key,
1759  const std::vector<const void*>& sd_outer_proxy_per_key,
1760  const size_t cpu_thread_count) {
1761  int32_t* pos_buff = buff;
1762  int32_t* count_buff = buff + hash_entry_count;
1763  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1764  std::vector<std::future<void>> counter_threads;
1765  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1766  if (join_buckets_per_key.size() > 0) {
1767  counter_threads.push_back(
1768  std::async(std::launch::async,
1769  [count_buff,
1770  composite_key_dict,
1771  &hash_entry_count,
1772  &join_buckets_per_key,
1773  &join_column_per_key,
1774  cpu_thread_idx,
1775  cpu_thread_count] {
1776  const auto key_handler = OverlapsKeyHandler(
1777  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1778  &join_column_per_key[0],
1779  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1780  count_matches_baseline(count_buff,
1781  composite_key_dict,
1782  hash_entry_count,
1783  &key_handler,
1784  join_column_per_key[0].num_elems,
1785  cpu_thread_idx,
1786  cpu_thread_count);
1787  }));
1788  } else {
1789  counter_threads.push_back(std::async(
1790  std::launch::async,
1791  [count_buff,
1792  composite_key_dict,
1793  &key_component_count,
1794  &hash_entry_count,
1795  &join_column_per_key,
1796  &type_info_per_key,
1797  &sd_inner_proxy_per_key,
1798  &sd_outer_proxy_per_key,
1799  cpu_thread_idx,
1800  cpu_thread_count] {
1801  const auto key_handler = GenericKeyHandler(key_component_count,
1802  true,
1803  &join_column_per_key[0],
1804  &type_info_per_key[0],
1805  &sd_inner_proxy_per_key[0],
1806  &sd_outer_proxy_per_key[0]);
1807  count_matches_baseline(count_buff,
1808  composite_key_dict,
1809  hash_entry_count,
1810  &key_handler,
1811  join_column_per_key[0].num_elems,
1812  cpu_thread_idx,
1813  cpu_thread_count);
1814  }));
1815  }
1816  }
1817 
1818  for (auto& child : counter_threads) {
1819  child.get();
1820  }
1821 
1822  std::vector<int32_t> count_copy(hash_entry_count, 0);
1823  CHECK_GT(hash_entry_count, int64_t(0));
1824  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1826  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1827  std::vector<std::future<void>> pos_threads;
1828  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1829  pos_threads.push_back(std::async(
1830  std::launch::async,
1831  [&](const int thread_idx) {
1832  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1833  if (count_buff[i]) {
1834  pos_buff[i] = count_copy[i];
1835  }
1836  }
1837  },
1838  cpu_thread_idx));
1839  }
1840  for (auto& child : pos_threads) {
1841  child.get();
1842  }
1843 
1844  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1845  std::vector<std::future<void>> rowid_threads;
1846  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1847  if (join_buckets_per_key.size() > 0) {
1848  rowid_threads.push_back(
1849  std::async(std::launch::async,
1850  [buff,
1851  composite_key_dict,
1852  hash_entry_count,
1853  invalid_slot_val,
1854  &join_column_per_key,
1855  &join_buckets_per_key,
1856  cpu_thread_idx,
1857  cpu_thread_count] {
1858  const auto key_handler = OverlapsKeyHandler(
1859  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
1860  &join_column_per_key[0],
1861  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
1863  (buff,
1864  composite_key_dict,
1865  hash_entry_count,
1866  invalid_slot_val,
1867  &key_handler,
1868  join_column_per_key[0].num_elems,
1869  cpu_thread_idx,
1870  cpu_thread_count);
1871  }));
1872  } else {
1873  rowid_threads.push_back(std::async(std::launch::async,
1874  [buff,
1875  composite_key_dict,
1876  hash_entry_count,
1877  invalid_slot_val,
1878  key_component_count,
1879  &join_column_per_key,
1880  &type_info_per_key,
1881  &sd_inner_proxy_per_key,
1882  &sd_outer_proxy_per_key,
1883  cpu_thread_idx,
1884  cpu_thread_count] {
1885  const auto key_handler = GenericKeyHandler(
1886  key_component_count,
1887  true,
1888  &join_column_per_key[0],
1889  &type_info_per_key[0],
1890  &sd_inner_proxy_per_key[0],
1891  &sd_outer_proxy_per_key[0]);
1893  (buff,
1894  composite_key_dict,
1895  hash_entry_count,
1896  invalid_slot_val,
1897  &key_handler,
1898  join_column_per_key[0].num_elems,
1899  cpu_thread_idx,
1900  cpu_thread_count);
1901  }));
1902  }
1903  }
1904 
1905  for (auto& child : rowid_threads) {
1906  child.get();
1907  }
1908 }
1909 
1911  int32_t* buff,
1912  const int32_t* composite_key_dict,
1913  const int64_t hash_entry_count,
1914  const int32_t invalid_slot_val,
1915  const size_t key_component_count,
1916  const std::vector<JoinColumn>& join_column_per_key,
1917  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1918  const std::vector<JoinBucketInfo>& join_bucket_info,
1919  const std::vector<const void*>& sd_inner_proxy_per_key,
1920  const std::vector<const void*>& sd_outer_proxy_per_key,
1921  const int32_t cpu_thread_count) {
1922  fill_one_to_many_baseline_hash_table<int32_t>(buff,
1923  composite_key_dict,
1924  hash_entry_count,
1925  invalid_slot_val,
1926  key_component_count,
1927  join_column_per_key,
1928  type_info_per_key,
1929  join_bucket_info,
1930  sd_inner_proxy_per_key,
1931  sd_outer_proxy_per_key,
1932  cpu_thread_count);
1933 }
1934 
1936  int32_t* buff,
1937  const int64_t* composite_key_dict,
1938  const int64_t hash_entry_count,
1939  const int32_t invalid_slot_val,
1940  const size_t key_component_count,
1941  const std::vector<JoinColumn>& join_column_per_key,
1942  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1943  const std::vector<JoinBucketInfo>& join_bucket_info,
1944  const std::vector<const void*>& sd_inner_proxy_per_key,
1945  const std::vector<const void*>& sd_outer_proxy_per_key,
1946  const int32_t cpu_thread_count) {
1947  fill_one_to_many_baseline_hash_table<int64_t>(buff,
1948  composite_key_dict,
1949  hash_entry_count,
1950  invalid_slot_val,
1951  key_component_count,
1952  join_column_per_key,
1953  type_info_per_key,
1954  join_bucket_info,
1955  sd_inner_proxy_per_key,
1956  sd_outer_proxy_per_key,
1957  cpu_thread_count);
1958 }
1959 
1960 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
1961  const uint32_t b,
1962  const size_t padded_size_bytes,
1963  const std::vector<JoinColumn>& join_column_per_key,
1964  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1965  const int thread_count) {
1966  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
1967  CHECK(!join_column_per_key.empty());
1968 
1969  std::vector<std::future<void>> approx_distinct_threads;
1970  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
1971  approx_distinct_threads.push_back(std::async(
1972  std::launch::async,
1973  [&join_column_per_key,
1974  &type_info_per_key,
1975  b,
1976  hll_buffer_all_cpus,
1977  padded_size_bytes,
1978  thread_idx,
1979  thread_count] {
1980  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
1981 
1982  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
1983  false,
1984  &join_column_per_key[0],
1985  &type_info_per_key[0],
1986  nullptr,
1987  nullptr);
1989  nullptr,
1990  b,
1991  join_column_per_key[0].num_elems,
1992  &key_handler,
1993  thread_idx,
1994  thread_count);
1995  }));
1996  }
1997  for (auto& child : approx_distinct_threads) {
1998  child.get();
1999  }
2000 }
2001 
2003  uint8_t* hll_buffer_all_cpus,
2004  std::vector<int32_t>& row_counts,
2005  const uint32_t b,
2006  const size_t padded_size_bytes,
2007  const std::vector<JoinColumn>& join_column_per_key,
2008  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2009  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2010  const int thread_count) {
2011  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2012  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2013  CHECK(!join_column_per_key.empty());
2014 
2015  std::vector<std::future<void>> approx_distinct_threads;
2016  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2017  approx_distinct_threads.push_back(std::async(
2018  std::launch::async,
2019  [&join_column_per_key,
2020  &join_buckets_per_key,
2021  &row_counts,
2022  b,
2023  hll_buffer_all_cpus,
2024  padded_size_bytes,
2025  thread_idx,
2026  thread_count] {
2027  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2028 
2029  const auto key_handler = OverlapsKeyHandler(
2030  join_buckets_per_key[0].bucket_sizes_for_dimension.size(),
2031  &join_column_per_key[0],
2032  join_buckets_per_key[0].bucket_sizes_for_dimension.data());
2034  row_counts.data(),
2035  b,
2036  join_column_per_key[0].num_elems,
2037  &key_handler,
2038  thread_idx,
2039  thread_count);
2040  }));
2041  }
2042  for (auto& child : approx_distinct_threads) {
2043  child.get();
2044  }
2045 
2047  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2048 }
2049 
2050 void compute_bucket_sizes(std::vector<double>& bucket_sizes_for_dimension,
2051  const JoinColumn& join_column,
2052  const JoinColumnTypeInfo& type_info,
2053  const double bucket_size_threshold,
2054  const int thread_count) {
2055  std::vector<std::vector<double>> bucket_sizes_for_threads;
2056  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2057  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(),
2058  std::numeric_limits<double>::max());
2059  }
2060  std::vector<std::future<void>> threads;
2061  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2062  threads.push_back(std::async(std::launch::async,
2063  compute_bucket_sizes_impl<2>,
2064  bucket_sizes_for_threads[thread_idx].data(),
2065  &join_column,
2066  &type_info,
2067  bucket_size_threshold,
2068  thread_idx,
2069  thread_count));
2070  }
2071  for (auto& child : threads) {
2072  child.get();
2073  }
2074 
2075  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2076  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2077  if (bucket_sizes_for_threads[thread_idx][i] < bucket_sizes_for_dimension[i]) {
2078  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2079  }
2080  }
2081  }
2082 }
2083 
2084 #endif // ifndef __CUDACC__
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t num_shards
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
void compute_bucket_sizes(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const double bucket_size_threshold, const int thread_count)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:31
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const size_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:74
__device__ double atomicMin(double *address, double val)
#define GLOBAL
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define SUFFIX(name)
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::string getString(int32_t string_id) const
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
const int64_t null_val
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
#define CHECK_GT(x, y)
Definition: Logger.h:209
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
#define DEVICE
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:39
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:45
static constexpr int32_t INVALID_STR_ID
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE T SUFFIX() get_invalid_key()
#define CHECK_NE(x, y)
Definition: Logger.h:206
#define mapd_cas(address, compare, val)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
int64_t bucket_normalization
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
#define cas_cst(ptr, expected, desired)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t shard
size_t hash_entry_count
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:25
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double bucket_sz_threshold, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_sel)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:89
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:60
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
size_t getNormalizedHashEntryCount() const
#define store_cst(ptr, val)
#define CHECK(condition)
Definition: Logger.h:197
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:126
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)