OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
24 #include "Shared/shard_key.h"
25 #ifdef __CUDACC__
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #include <future>
38 #endif
39 
40 #if HAVE_CUDA
41 #include <thrust/scan.h>
42 #endif
43 #include "Shared/funcannotations.h"
44 
45 #include <cmath>
46 #include <numeric>
47 
48 #ifndef __CUDACC__
49 namespace {
50 
71 inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
72  const int64_t min_elem,
73  const int64_t max_elem,
74  const void* sd_inner_proxy,
75  const void* sd_outer_proxy) {
76  CHECK(sd_outer_proxy);
77  const auto sd_inner_dict_proxy =
78  static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
79  const auto sd_outer_dict_proxy =
80  static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
81  const auto elem_str = sd_inner_dict_proxy->getString(elem);
82  const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
83  if (outer_id > max_elem || outer_id < min_elem) {
85  }
86  return outer_id;
87 }
88 
89 } // namespace
90 #endif
91 
92 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
93  const int64_t hash_entry_count,
94  const int32_t invalid_slot_val,
95  const int32_t cpu_thread_idx,
96  const int32_t cpu_thread_count) {
97 #ifdef __CUDACC__
98  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
99  int32_t step = blockDim.x * gridDim.x;
100 #else
101  int32_t start = cpu_thread_idx;
102  int32_t step = cpu_thread_count;
103 #endif
104  for (int64_t i = start; i < hash_entry_count; i += step) {
105  groups_buffer[i] = invalid_slot_val;
106  }
107 }
108 
109 #ifdef __CUDACC__
110 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
111 #elif defined(_MSC_VER)
112 #define mapd_cas(address, compare, val) \
113  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
114  static_cast<long>(val), \
115  static_cast<long>(compare))
116 #else
117 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
118 #endif
119 
120 template <typename HASHTABLE_FILLING_FUNC>
121 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
122  const int32_t invalid_slot_val,
123  const JoinColumn join_column,
124  const JoinColumnTypeInfo type_info,
125  const void* sd_inner_proxy,
126  const void* sd_outer_proxy,
127  const int32_t cpu_thread_idx,
128  const int32_t cpu_thread_count,
129  HASHTABLE_FILLING_FUNC filling_func) {
130 #ifdef __CUDACC__
131  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
132  int32_t step = blockDim.x * gridDim.x;
133 #else
134  int32_t start = cpu_thread_idx;
135  int32_t step = cpu_thread_count;
136 #endif
137  JoinColumnTyped col{&join_column, &type_info};
138  for (auto item : col.slice(start, step)) {
139  const size_t index = item.index;
140  int64_t elem = item.element;
141  if (elem == type_info.null_val) {
142  if (type_info.uses_bw_eq) {
143  elem = type_info.translated_null_val;
144  } else {
145  continue;
146  }
147  }
148 #ifndef __CUDACC__
149  if (sd_inner_proxy &&
150  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
151  const auto outer_id = translate_str_id_to_outer_dict(
152  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
153  if (outer_id == StringDictionary::INVALID_STR_ID) {
154  continue;
155  }
156  elem = outer_id;
157  }
158  CHECK_GE(elem, type_info.min_val)
159  << "Element " << elem << " less than min val " << type_info.min_val;
160 #endif
161  if (filling_func(elem, index)) {
162  return -1;
163  }
164  }
165  return 0;
166 };
167 
169  const int32_t invalid_slot_val,
170  const bool for_semi_join,
171  const JoinColumn join_column,
172  const JoinColumnTypeInfo type_info,
173  const void* sd_inner_proxy,
174  const void* sd_outer_proxy,
175  const int32_t cpu_thread_idx,
176  const int32_t cpu_thread_count,
177  const int64_t bucket_normalization) {
178  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
180  auto hashtable_filling_func = [&](auto elem, size_t index) {
181  auto entry_ptr = SUFFIX(get_bucketized_hash_slot)(
182  buff, elem, type_info.min_val, bucket_normalization);
183  return filling_func(index, entry_ptr, invalid_slot_val);
184  };
185 
186  return fill_hash_join_buff_impl(buff,
187  invalid_slot_val,
188  join_column,
189  type_info,
190  sd_inner_proxy,
191  sd_outer_proxy,
192  cpu_thread_idx,
193  cpu_thread_count,
194  hashtable_filling_func);
195 }
196 
197 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
198  const int32_t invalid_slot_val,
199  const bool for_semi_join,
200  const JoinColumn join_column,
201  const JoinColumnTypeInfo type_info,
202  const void* sd_inner_proxy,
203  const void* sd_outer_proxy,
204  const int32_t cpu_thread_idx,
205  const int32_t cpu_thread_count) {
206  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
208  auto hashtable_filling_func = [&](auto elem, size_t index) {
209  auto entry_ptr = SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
210  return filling_func(index, entry_ptr, invalid_slot_val);
211  };
212 
213  return fill_hash_join_buff_impl(buff,
214  invalid_slot_val,
215  join_column,
216  type_info,
217  sd_inner_proxy,
218  sd_outer_proxy,
219  cpu_thread_idx,
220  cpu_thread_count,
221  hashtable_filling_func);
222 }
223 
224 template <typename HASHTABLE_FILLING_FUNC>
226  const int32_t invalid_slot_val,
227  const JoinColumn join_column,
228  const JoinColumnTypeInfo type_info,
229  const ShardInfo shard_info,
230  const void* sd_inner_proxy,
231  const void* sd_outer_proxy,
232  const int32_t cpu_thread_idx,
233  const int32_t cpu_thread_count,
234  HASHTABLE_FILLING_FUNC filling_func) {
235 #ifdef __CUDACC__
236  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
237  int32_t step = blockDim.x * gridDim.x;
238 #else
239  int32_t start = cpu_thread_idx;
240  int32_t step = cpu_thread_count;
241 #endif
242  JoinColumnTyped col{&join_column, &type_info};
243  for (auto item : col.slice(start, step)) {
244  const size_t index = item.index;
245  int64_t elem = item.element;
246  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
247  if (shard != shard_info.shard) {
248  continue;
249  }
250  if (elem == type_info.null_val) {
251  if (type_info.uses_bw_eq) {
252  elem = type_info.translated_null_val;
253  } else {
254  continue;
255  }
256  }
257 #ifndef __CUDACC__
258  if (sd_inner_proxy &&
259  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
260  const auto outer_id = translate_str_id_to_outer_dict(
261  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
262  if (outer_id == StringDictionary::INVALID_STR_ID) {
263  continue;
264  }
265  elem = outer_id;
266  }
267  CHECK_GE(elem, type_info.min_val)
268  << "Element " << elem << " less than min val " << type_info.min_val;
269 #endif
270  if (filling_func(elem, shard, index)) {
271  return -1;
272  }
273  }
274  return 0;
275 }
276 
278  int32_t* buff,
279  const int32_t invalid_slot_val,
280  const bool for_semi_join,
281  const JoinColumn join_column,
282  const JoinColumnTypeInfo type_info,
283  const ShardInfo shard_info,
284  const void* sd_inner_proxy,
285  const void* sd_outer_proxy,
286  const int32_t cpu_thread_idx,
287  const int32_t cpu_thread_count,
288  const int64_t bucket_normalization) {
289  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
291  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
292  auto entry_ptr =
294  elem,
295  type_info.min_val,
296  shard_info.entry_count_per_shard,
297  shard,
298  shard_info.num_shards,
299  shard_info.device_count,
300  bucket_normalization);
301  return filling_func(index, entry_ptr, invalid_slot_val);
302  };
303 
305  invalid_slot_val,
306  join_column,
307  type_info,
308  shard_info,
309  sd_inner_proxy,
310  sd_outer_proxy,
311  cpu_thread_idx,
312  cpu_thread_count,
313  hashtable_filling_func);
314 }
315 
317  const int32_t invalid_slot_val,
318  const bool for_semi_join,
319  const JoinColumn join_column,
320  const JoinColumnTypeInfo type_info,
321  const ShardInfo shard_info,
322  const void* sd_inner_proxy,
323  const void* sd_outer_proxy,
324  const int32_t cpu_thread_idx,
325  const int32_t cpu_thread_count) {
326  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
328  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
329  auto entry_ptr = SUFFIX(get_hash_slot_sharded_opt)(buff,
330  elem,
331  type_info.min_val,
332  shard_info.entry_count_per_shard,
333  shard,
334  shard_info.num_shards,
335  shard_info.device_count);
336  return filling_func(index, entry_ptr, invalid_slot_val);
337  };
338 
340  invalid_slot_val,
341  join_column,
342  type_info,
343  shard_info,
344  sd_inner_proxy,
345  sd_outer_proxy,
346  cpu_thread_idx,
347  cpu_thread_count,
348  hashtable_filling_func);
349 }
350 
351 template <typename T>
353  const int64_t entry_count,
354  const size_t key_component_count,
355  const bool with_val_slot,
356  const int32_t invalid_slot_val,
357  const int32_t cpu_thread_idx,
358  const int32_t cpu_thread_count) {
359 #ifdef __CUDACC__
360  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
361  int32_t step = blockDim.x * gridDim.x;
362 #else
363  int32_t start = cpu_thread_idx;
364  int32_t step = cpu_thread_count;
365 #endif
366  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
367  const T empty_key = SUFFIX(get_invalid_key)<T>();
368  for (int64_t h = start; h < entry_count; h += step) {
369  int64_t off = h * hash_entry_size;
370  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
371  for (size_t i = 0; i < key_component_count; ++i) {
372  row_ptr[i] = empty_key;
373  }
374  if (with_val_slot) {
375  row_ptr[key_component_count] = invalid_slot_val;
376  }
377  }
378 }
379 
380 #ifdef __CUDACC__
381 template <typename T>
382 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
383  const uint32_t h,
384  const T* key,
385  const size_t key_component_count,
386  const int64_t hash_entry_size) {
387  uint32_t off = h * hash_entry_size;
388  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
389  const T empty_key = SUFFIX(get_invalid_key)<T>();
390  {
391  const T old = atomicCAS(row_ptr, empty_key, *key);
392  if (empty_key == old && key_component_count > 1) {
393  for (int64_t i = 1; i <= key_component_count - 1; ++i) {
394  atomicExch(row_ptr + i, key[i]);
395  }
396  }
397  }
398  if (key_component_count > 1) {
399  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
400  // spin until the winning thread has finished writing the entire key and the init
401  // value
402  }
403  }
404  bool match = true;
405  for (uint32_t i = 0; i < key_component_count; ++i) {
406  if (row_ptr[i] != key[i]) {
407  match = false;
408  break;
409  }
410  }
411 
412  if (match) {
413  return reinterpret_cast<T*>(row_ptr + key_component_count);
414  }
415  return nullptr;
416 }
417 #else
418 
419 #ifdef _MSC_VER
420 #define cas_cst(ptr, expected, desired) \
421  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
422  reinterpret_cast<void*>(&desired), \
423  expected) == expected)
424 #define store_cst(ptr, val) \
425  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
426  reinterpret_cast<void*>(val))
427 #define load_cst(ptr) \
428  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
429 #else
430 #define cas_cst(ptr, expected, desired) \
431  __atomic_compare_exchange_n( \
432  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
433 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
434 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
435 #endif
436 
437 template <typename T>
439  const uint32_t h,
440  const T* key,
441  const size_t key_component_count,
442  const int64_t hash_entry_size) {
443  uint32_t off = h * hash_entry_size;
444  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
445  T empty_key = SUFFIX(get_invalid_key)<T>();
446  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
447  if (UNLIKELY(*key == write_pending)) {
448  // Address the singularity case where the first column contains the pending
449  // write special value. Should never happen, but avoid doing wrong things.
450  return nullptr;
451  }
452  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
453  if (success) {
454  if (key_component_count > 1) {
455  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
456  }
457  store_cst(row_ptr, *key);
458  return reinterpret_cast<T*>(row_ptr + key_component_count);
459  }
460  while (load_cst(row_ptr) == write_pending) {
461  // spin until the winning thread has finished writing the entire key
462  }
463  for (size_t i = 0; i < key_component_count; ++i) {
464  if (load_cst(row_ptr + i) != key[i]) {
465  return nullptr;
466  }
467  }
468  return reinterpret_cast<T*>(row_ptr + key_component_count);
469 }
470 
471 #undef load_cst
472 #undef store_cst
473 #undef cas_cst
474 
475 #endif // __CUDACC__
476 
477 template <typename T>
478 DEVICE int write_baseline_hash_slot(const int32_t val,
479  int8_t* hash_buff,
480  const int64_t entry_count,
481  const T* key,
482  const size_t key_component_count,
483  const bool with_val_slot,
484  const int32_t invalid_slot_val,
485  const size_t key_size_in_bytes,
486  const size_t hash_entry_size) {
487  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
488  T* matching_group = get_matching_baseline_hash_slot_at(
489  hash_buff, h, key, key_component_count, hash_entry_size);
490  if (!matching_group) {
491  uint32_t h_probe = (h + 1) % entry_count;
492  while (h_probe != h) {
493  matching_group = get_matching_baseline_hash_slot_at(
494  hash_buff, h_probe, key, key_component_count, hash_entry_size);
495  if (matching_group) {
496  break;
497  }
498  h_probe = (h_probe + 1) % entry_count;
499  }
500  }
501  if (!matching_group) {
502  return -2;
503  }
504  if (!with_val_slot) {
505  return 0;
506  }
507  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
508  return -1;
509  }
510  return 0;
511 }
512 
513 template <typename T>
515  int8_t* hash_buff,
516  const int64_t entry_count,
517  const T* key,
518  const size_t key_component_count,
519  const bool with_val_slot,
520  const int32_t invalid_slot_val,
521  const size_t key_size_in_bytes,
522  const size_t hash_entry_size) {
523  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
524  T* matching_group = get_matching_baseline_hash_slot_at(
525  hash_buff, h, key, key_component_count, hash_entry_size);
526  if (!matching_group) {
527  uint32_t h_probe = (h + 1) % entry_count;
528  while (h_probe != h) {
529  matching_group = get_matching_baseline_hash_slot_at(
530  hash_buff, h_probe, key, key_component_count, hash_entry_size);
531  if (matching_group) {
532  break;
533  }
534  h_probe = (h_probe + 1) % entry_count;
535  }
536  }
537  if (!matching_group) {
538  return -2;
539  }
540  if (!with_val_slot) {
541  return 0;
542  }
543  mapd_cas(matching_group, invalid_slot_val, val);
544  return 0;
545 }
546 
547 template <typename T, typename FILL_HANDLER>
549  const int64_t entry_count,
550  const int32_t invalid_slot_val,
551  const bool for_semi_join,
552  const size_t key_component_count,
553  const bool with_val_slot,
554  const FILL_HANDLER* f,
555  const int64_t num_elems,
556  const int32_t cpu_thread_idx,
557  const int32_t cpu_thread_count) {
558 #ifdef __CUDACC__
559  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
560  int32_t step = blockDim.x * gridDim.x;
561 #else
562  int32_t start = cpu_thread_idx;
563  int32_t step = cpu_thread_count;
564 #endif
565 
566  T key_scratch_buff[g_maximum_conditions_to_coalesce];
567  const size_t key_size_in_bytes = key_component_count * sizeof(T);
568  const size_t hash_entry_size =
569  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
570  auto key_buff_handler = [hash_buff,
571  entry_count,
572  with_val_slot,
573  invalid_slot_val,
574  key_size_in_bytes,
575  hash_entry_size,
576  &for_semi_join](const int64_t entry_idx,
577  const T* key_scratch_buffer,
578  const size_t key_component_count) {
579  if (for_semi_join) {
580  return write_baseline_hash_slot_for_semi_join<T>(entry_idx,
581  hash_buff,
582  entry_count,
583  key_scratch_buffer,
584  key_component_count,
585  with_val_slot,
586  invalid_slot_val,
587  key_size_in_bytes,
588  hash_entry_size);
589  } else {
590  return write_baseline_hash_slot<T>(entry_idx,
591  hash_buff,
592  entry_count,
593  key_scratch_buffer,
594  key_component_count,
595  with_val_slot,
596  invalid_slot_val,
597  key_size_in_bytes,
598  hash_entry_size);
599  }
600  };
601 
602  JoinColumnTuple cols(
603  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
604  for (auto& it : cols.slice(start, step)) {
605  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
606  if (err) {
607  return err;
608  }
609  }
610  return 0;
611 }
612 
613 #undef mapd_cas
614 
615 #ifdef __CUDACC__
616 #define mapd_add(address, val) atomicAdd(address, val)
617 #elif defined(_MSC_VER)
618 #define mapd_add(address, val) \
619  InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
620  static_cast<long>(val))
621 #else
622 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
623 #endif
624 
625 template <typename SLOT_SELECTOR>
626 DEVICE void count_matches_impl(int32_t* count_buff,
627  const int32_t invalid_slot_val,
628  const JoinColumn join_column,
629  const JoinColumnTypeInfo type_info
630 #ifndef __CUDACC__
631  ,
632  const void* sd_inner_proxy,
633  const void* sd_outer_proxy,
634  const int32_t cpu_thread_idx,
635  const int32_t cpu_thread_count
636 #endif
637  ,
638  SLOT_SELECTOR slot_selector) {
639 #ifdef __CUDACC__
640  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
641  int32_t step = blockDim.x * gridDim.x;
642 #else
643  int32_t start = cpu_thread_idx;
644  int32_t step = cpu_thread_count;
645 #endif
646  JoinColumnTyped col{&join_column, &type_info};
647  for (auto item : col.slice(start, step)) {
648  int64_t elem = item.element;
649  if (elem == type_info.null_val) {
650  if (type_info.uses_bw_eq) {
651  elem = type_info.translated_null_val;
652  } else {
653  continue;
654  }
655  }
656 #ifndef __CUDACC__
657  if (sd_inner_proxy &&
658  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
659  const auto outer_id = translate_str_id_to_outer_dict(
660  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
661  if (outer_id == StringDictionary::INVALID_STR_ID) {
662  continue;
663  }
664  elem = outer_id;
665  }
666  CHECK_GE(elem, type_info.min_val)
667  << "Element " << elem << " less than min val " << type_info.min_val;
668 #endif
669  auto* entry_ptr = slot_selector(count_buff, elem);
670  mapd_add(entry_ptr, int32_t(1));
671  }
672 }
673 
674 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
675  const int32_t invalid_slot_val,
676  const JoinColumn join_column,
677  const JoinColumnTypeInfo type_info
678 #ifndef __CUDACC__
679  ,
680  const void* sd_inner_proxy,
681  const void* sd_outer_proxy,
682  const int32_t cpu_thread_idx,
683  const int32_t cpu_thread_count
684 #endif
685 ) {
686  auto slot_sel = [&type_info](auto count_buff, auto elem) {
687  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
688  };
689  count_matches_impl(count_buff,
690  invalid_slot_val,
691  join_column,
692  type_info
693 #ifndef __CUDACC__
694  ,
695  sd_inner_proxy,
696  sd_outer_proxy,
697  cpu_thread_idx,
698  cpu_thread_count
699 #endif
700  ,
701  slot_sel);
702 }
703 
704 GLOBAL void SUFFIX(count_matches_bucketized)(int32_t* count_buff,
705  const int32_t invalid_slot_val,
706  const JoinColumn join_column,
707  const JoinColumnTypeInfo type_info
708 #ifndef __CUDACC__
709  ,
710  const void* sd_inner_proxy,
711  const void* sd_outer_proxy,
712  const int32_t cpu_thread_idx,
713  const int32_t cpu_thread_count
714 #endif
715  ,
716  const int64_t bucket_normalization) {
717  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
719  count_buff, elem, type_info.min_val, bucket_normalization);
720  };
721  count_matches_impl(count_buff,
722  invalid_slot_val,
723  join_column,
724  type_info
725 #ifndef __CUDACC__
726  ,
727  sd_inner_proxy,
728  sd_outer_proxy,
729  cpu_thread_idx,
730  cpu_thread_count
731 #endif
732  ,
733  slot_sel);
734 }
735 
736 GLOBAL void SUFFIX(count_matches_sharded)(int32_t* count_buff,
737  const int32_t invalid_slot_val,
738  const JoinColumn join_column,
739  const JoinColumnTypeInfo type_info,
740  const ShardInfo shard_info
741 #ifndef __CUDACC__
742  ,
743  const void* sd_inner_proxy,
744  const void* sd_outer_proxy,
745  const int32_t cpu_thread_idx,
746  const int32_t cpu_thread_count
747 #endif
748 ) {
749 #ifdef __CUDACC__
750  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
751  int32_t step = blockDim.x * gridDim.x;
752 #else
753  int32_t start = cpu_thread_idx;
754  int32_t step = cpu_thread_count;
755 #endif
756  JoinColumnTyped col{&join_column, &type_info};
757  for (auto item : col.slice(start, step)) {
758  int64_t elem = item.element;
759  if (elem == type_info.null_val) {
760  if (type_info.uses_bw_eq) {
761  elem = type_info.translated_null_val;
762  } else {
763  continue;
764  }
765  }
766 #ifndef __CUDACC__
767  if (sd_inner_proxy &&
768  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
769  const auto outer_id = translate_str_id_to_outer_dict(
770  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
771  if (outer_id == StringDictionary::INVALID_STR_ID) {
772  continue;
773  }
774  elem = outer_id;
775  }
776  CHECK_GE(elem, type_info.min_val)
777  << "Element " << elem << " less than min val " << type_info.min_val;
778 #endif
779  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
780  elem,
781  type_info.min_val,
782  shard_info.entry_count_per_shard,
783  shard_info.num_shards,
784  shard_info.device_count);
785  mapd_add(entry_ptr, int32_t(1));
786  }
787 }
788 
789 template <typename T>
791  const T* key,
792  const size_t key_component_count,
793  const T* composite_key_dict,
794  const int64_t entry_count,
795  const size_t key_size_in_bytes) {
796  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
797  uint32_t off = h * key_component_count;
798  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
799  return &composite_key_dict[off];
800  }
801  uint32_t h_probe = (h + 1) % entry_count;
802  while (h_probe != h) {
803  off = h_probe * key_component_count;
804  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
805  return &composite_key_dict[off];
806  }
807  h_probe = (h_probe + 1) % entry_count;
808  }
809 #ifndef __CUDACC__
810  CHECK(false);
811 #else
812  assert(false);
813 #endif
814  return nullptr;
815 }
816 
817 template <typename T, typename KEY_HANDLER>
818 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
819  const T* composite_key_dict,
820  const int64_t entry_count,
821  const KEY_HANDLER* f,
822  const int64_t num_elems
823 #ifndef __CUDACC__
824  ,
825  const int32_t cpu_thread_idx,
826  const int32_t cpu_thread_count
827 #endif
828 ) {
829 #ifdef __CUDACC__
830  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
831  int32_t step = blockDim.x * gridDim.x;
832 #else
833  int32_t start = cpu_thread_idx;
834  int32_t step = cpu_thread_count;
835 #endif
836 #ifdef __CUDACC__
837  assert(composite_key_dict);
838 #endif
839  T key_scratch_buff[g_maximum_conditions_to_coalesce];
840  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
841  auto key_buff_handler = [composite_key_dict,
842  entry_count,
843  count_buff,
844  key_size_in_bytes](const int64_t row_entry_idx,
845  const T* key_scratch_buff,
846  const size_t key_component_count) {
847  const auto matching_group =
849  key_component_count,
850  composite_key_dict,
851  entry_count,
852  key_size_in_bytes);
853  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
854  mapd_add(&count_buff[entry_idx], int32_t(1));
855  return 0;
856  };
857 
858  JoinColumnTuple cols(
859  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
860  for (auto& it : cols.slice(start, step)) {
861  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
862  }
863 }
864 
865 template <typename SLOT_SELECTOR>
866 DEVICE void fill_row_ids_impl(int32_t* buff,
867  const int64_t hash_entry_count,
868  const int32_t invalid_slot_val,
869  const JoinColumn join_column,
870  const JoinColumnTypeInfo type_info
871 #ifndef __CUDACC__
872  ,
873  const void* sd_inner_proxy,
874  const void* sd_outer_proxy,
875  const int32_t cpu_thread_idx,
876  const int32_t cpu_thread_count
877 #endif
878  ,
879  SLOT_SELECTOR slot_selector) {
880  int32_t* pos_buff = buff;
881  int32_t* count_buff = buff + hash_entry_count;
882  int32_t* id_buff = count_buff + hash_entry_count;
883 
884 #ifdef __CUDACC__
885  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
886  int32_t step = blockDim.x * gridDim.x;
887 #else
888  int32_t start = cpu_thread_idx;
889  int32_t step = cpu_thread_count;
890 #endif
891  JoinColumnTyped col{&join_column, &type_info};
892  for (auto item : col.slice(start, step)) {
893  const size_t index = item.index;
894  int64_t elem = item.element;
895  if (elem == type_info.null_val) {
896  if (type_info.uses_bw_eq) {
897  elem = type_info.translated_null_val;
898  } else {
899  continue;
900  }
901  }
902 #ifndef __CUDACC__
903  if (sd_inner_proxy &&
904  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
905  const auto outer_id = translate_str_id_to_outer_dict(
906  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
907  if (outer_id == StringDictionary::INVALID_STR_ID) {
908  continue;
909  }
910  elem = outer_id;
911  }
912  CHECK_GE(elem, type_info.min_val)
913  << "Element " << elem << " less than min val " << type_info.min_val;
914 #endif
915  auto pos_ptr = slot_selector(pos_buff, elem);
916 #ifndef __CUDACC__
917  CHECK_NE(*pos_ptr, invalid_slot_val);
918 #endif
919  const auto bin_idx = pos_ptr - pos_buff;
920  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
921  id_buff[id_buff_idx] = static_cast<int32_t>(index);
922  }
923 }
924 
925 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
926  const int64_t hash_entry_count,
927  const int32_t invalid_slot_val,
928  const JoinColumn join_column,
929  const JoinColumnTypeInfo type_info
930 #ifndef __CUDACC__
931  ,
932  const void* sd_inner_proxy,
933  const void* sd_outer_proxy,
934  const int32_t cpu_thread_idx,
935  const int32_t cpu_thread_count
936 #endif
937 ) {
938  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
939  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
940  };
941 
942  fill_row_ids_impl(buff,
943  hash_entry_count,
944  invalid_slot_val,
945  join_column,
946  type_info
947 #ifndef __CUDACC__
948  ,
949  sd_inner_proxy,
950  sd_outer_proxy,
951  cpu_thread_idx,
952  cpu_thread_count
953 #endif
954  ,
955  slot_sel);
956 }
957 
959  const int64_t hash_entry_count,
960  const int32_t invalid_slot_val,
961  const JoinColumn join_column,
962  const JoinColumnTypeInfo type_info
963 #ifndef __CUDACC__
964  ,
965  const void* sd_inner_proxy,
966  const void* sd_outer_proxy,
967  const int32_t cpu_thread_idx,
968  const int32_t cpu_thread_count
969 #endif
970  ,
971  const int64_t bucket_normalization) {
972  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
974  pos_buff, elem, type_info.min_val, bucket_normalization);
975  };
976  fill_row_ids_impl(buff,
977  hash_entry_count,
978  invalid_slot_val,
979  join_column,
980  type_info
981 #ifndef __CUDACC__
982  ,
983  sd_inner_proxy,
984  sd_outer_proxy,
985  cpu_thread_idx,
986  cpu_thread_count
987 #endif
988  ,
989  slot_sel);
990 }
991 
992 template <typename SLOT_SELECTOR>
994  const int64_t hash_entry_count,
995  const int32_t invalid_slot_val,
996  const JoinColumn join_column,
997  const JoinColumnTypeInfo type_info,
998  const ShardInfo shard_info
999 #ifndef __CUDACC__
1000  ,
1001  const void* sd_inner_proxy,
1002  const void* sd_outer_proxy,
1003  const int32_t cpu_thread_idx,
1004  const int32_t cpu_thread_count
1005 #endif
1006  ,
1007  SLOT_SELECTOR slot_selector) {
1008 
1009  int32_t* pos_buff = buff;
1010  int32_t* count_buff = buff + hash_entry_count;
1011  int32_t* id_buff = count_buff + hash_entry_count;
1012 
1013 #ifdef __CUDACC__
1014  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1015  int32_t step = blockDim.x * gridDim.x;
1016 #else
1017  int32_t start = cpu_thread_idx;
1018  int32_t step = cpu_thread_count;
1019 #endif
1020  JoinColumnTyped col{&join_column, &type_info};
1021  for (auto item : col.slice(start, step)) {
1022  const size_t index = item.index;
1023  int64_t elem = item.element;
1024  if (elem == type_info.null_val) {
1025  if (type_info.uses_bw_eq) {
1026  elem = type_info.translated_null_val;
1027  } else {
1028  continue;
1029  }
1030  }
1031 #ifndef __CUDACC__
1032  if (sd_inner_proxy &&
1033  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
1034  const auto outer_id = translate_str_id_to_outer_dict(
1035  elem, type_info.min_val, type_info.max_val, sd_inner_proxy, sd_outer_proxy);
1036  if (outer_id == StringDictionary::INVALID_STR_ID) {
1037  continue;
1038  }
1039  elem = outer_id;
1040  }
1041  CHECK_GE(elem, type_info.min_val)
1042  << "Element " << elem << " less than min val " << type_info.min_val;
1043 #endif
1044  auto* pos_ptr = slot_selector(pos_buff, elem);
1045 #ifndef __CUDACC__
1046  CHECK_NE(*pos_ptr, invalid_slot_val);
1047 #endif
1048  const auto bin_idx = pos_ptr - pos_buff;
1049  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1050  id_buff[id_buff_idx] = static_cast<int32_t>(index);
1051  }
1052 }
1053 
1054 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
1055  const int64_t hash_entry_count,
1056  const int32_t invalid_slot_val,
1057  const JoinColumn join_column,
1058  const JoinColumnTypeInfo type_info,
1059  const ShardInfo shard_info
1060 #ifndef __CUDACC__
1061  ,
1062  const void* sd_inner_proxy,
1063  const void* sd_outer_proxy,
1064  const int32_t cpu_thread_idx,
1065  const int32_t cpu_thread_count
1066 #endif
1067 ) {
1068  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
1069  return SUFFIX(get_hash_slot_sharded)(pos_buff,
1070  elem,
1071  type_info.min_val,
1072  shard_info.entry_count_per_shard,
1073  shard_info.num_shards,
1074  shard_info.device_count);
1075  };
1076 
1077  fill_row_ids_impl(buff,
1078  hash_entry_count,
1079  invalid_slot_val,
1080  join_column,
1081  type_info
1082 #ifndef __CUDACC__
1083  ,
1084  sd_inner_proxy,
1085  sd_outer_proxy,
1086  cpu_thread_idx,
1087  cpu_thread_count
1088 #endif
1089  ,
1090  slot_sel);
1091 }
1092 
1094  const int64_t hash_entry_count,
1095  const int32_t invalid_slot_val,
1096  const JoinColumn join_column,
1097  const JoinColumnTypeInfo type_info,
1098  const ShardInfo shard_info
1099 #ifndef __CUDACC__
1100  ,
1101  const void* sd_inner_proxy,
1102  const void* sd_outer_proxy,
1103  const int32_t cpu_thread_idx,
1104  const int32_t cpu_thread_count
1105 #endif
1106  ,
1107  const int64_t bucket_normalization) {
1108  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1109  auto elem) {
1110  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1111  elem,
1112  type_info.min_val,
1113  shard_info.entry_count_per_shard,
1114  shard_info.num_shards,
1115  shard_info.device_count,
1116  bucket_normalization);
1117  };
1118 
1119  fill_row_ids_impl(buff,
1120  hash_entry_count,
1121  invalid_slot_val,
1122  join_column,
1123  type_info
1124 #ifndef __CUDACC__
1125  ,
1126  sd_inner_proxy,
1127  sd_outer_proxy,
1128  cpu_thread_idx,
1129  cpu_thread_count
1130 #endif
1131  ,
1132  slot_sel);
1133 }
1134 
1135 template <typename T, typename KEY_HANDLER>
1137  const T* composite_key_dict,
1138  const int64_t hash_entry_count,
1139  const int32_t invalid_slot_val,
1140  const KEY_HANDLER* f,
1141  const int64_t num_elems
1142 #ifndef __CUDACC__
1143  ,
1144  const int32_t cpu_thread_idx,
1145  const int32_t cpu_thread_count
1146 #endif
1147 ) {
1148  int32_t* pos_buff = buff;
1149  int32_t* count_buff = buff + hash_entry_count;
1150  int32_t* id_buff = count_buff + hash_entry_count;
1151 #ifdef __CUDACC__
1152  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1153  int32_t step = blockDim.x * gridDim.x;
1154 #else
1155  int32_t start = cpu_thread_idx;
1156  int32_t step = cpu_thread_count;
1157 #endif
1158 
1159  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1160 #ifdef __CUDACC__
1161  assert(composite_key_dict);
1162 #endif
1163  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1164  auto key_buff_handler = [composite_key_dict,
1165  hash_entry_count,
1166  pos_buff,
1167  invalid_slot_val,
1168  count_buff,
1169  id_buff,
1170  key_size_in_bytes](const int64_t row_index,
1171  const T* key_scratch_buff,
1172  const size_t key_component_count) {
1173  const T* matching_group =
1175  key_component_count,
1176  composite_key_dict,
1177  hash_entry_count,
1178  key_size_in_bytes);
1179  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1180  int32_t* pos_ptr = pos_buff + entry_idx;
1181 #ifndef __CUDACC__
1182  CHECK_NE(*pos_ptr, invalid_slot_val);
1183 #endif
1184  const auto bin_idx = pos_ptr - pos_buff;
1185  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1186  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1187  return 0;
1188  };
1189 
1190  JoinColumnTuple cols(
1191  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1192  for (auto& it : cols.slice(start, step)) {
1193  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1194  }
1195  return;
1196 }
1197 
1198 #undef mapd_add
1199 
1200 template <typename KEY_HANDLER>
1202  int32_t* row_count_buffer,
1203  const uint32_t b,
1204  const int64_t num_elems,
1205  const KEY_HANDLER* f
1206 #ifndef __CUDACC__
1207  ,
1208  const int32_t cpu_thread_idx,
1209  const int32_t cpu_thread_count
1210 #endif
1211 ) {
1212 #ifdef __CUDACC__
1213  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1214  int32_t step = blockDim.x * gridDim.x;
1215 #else
1216  int32_t start = cpu_thread_idx;
1217  int32_t step = cpu_thread_count;
1218 #endif
1219 
1220  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1221  const int64_t entry_idx,
1222  const int64_t* key_scratch_buff,
1223  const size_t key_component_count) {
1224  if (row_count_buffer) {
1225  row_count_buffer[entry_idx] += 1;
1226  }
1227 
1228  const uint64_t hash =
1229  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1230  const uint32_t index = hash >> (64 - b);
1231  const auto rank = get_rank(hash << b, 64 - b);
1232 #ifdef __CUDACC__
1233  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1234 #else
1235  hll_buffer[index] = std::max(hll_buffer[index], rank);
1236 #endif
1237 
1238  return 0;
1239  };
1240 
1241  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1242 
1243  JoinColumnTuple cols(
1244  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1245  for (auto& it : cols.slice(start, step)) {
1246  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1247  }
1248 }
1249 
1250 #ifdef __CUDACC__
1251 namespace {
1252 // TODO(adb): put these in a header file so they are not duplicated between here and
1253 // cuda_mapd_rt.cu
1254 __device__ double atomicMin(double* address, double val) {
1255  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1256  unsigned long long int old = *address_as_ull, assumed;
1257 
1258  do {
1259  assumed = old;
1260  old = atomicCAS(address_as_ull,
1261  assumed,
1262  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1263  } while (assumed != old);
1264 
1265  return __longlong_as_double(old);
1266 }
1267 } // namespace
1268 #endif
1269 
1270 template <size_t N>
1271 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1272  const JoinColumn* join_column,
1273  const JoinColumnTypeInfo* type_info,
1274  const double* bucket_size_thresholds
1275 #ifndef __CUDACC__
1276  ,
1277  const int32_t cpu_thread_idx,
1278  const int32_t cpu_thread_count
1279 #endif
1280 ) {
1281 #ifdef __CUDACC__
1282  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1283  int32_t step = blockDim.x * gridDim.x;
1284 #else
1285  int32_t start = cpu_thread_idx;
1286  int32_t step = cpu_thread_count;
1287 #endif
1288  JoinColumnIterator it(join_column, type_info, start, step);
1289  for (; it; ++it) {
1290  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1291  double bounds[2 * N];
1292  for (size_t j = 0; j < 2 * N; j++) {
1293  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1294  }
1295 
1296  for (size_t j = 0; j < N; j++) {
1297  const auto diff = bounds[j + N] - bounds[j];
1298 #ifdef __CUDACC__
1299  if (diff > bucket_size_thresholds[j]) {
1300  atomicMin(&bucket_sizes_for_thread[j], diff);
1301  }
1302 #else
1303  if (diff < bucket_size_thresholds[j] && diff > bucket_sizes_for_thread[j]) {
1304  bucket_sizes_for_thread[j] = diff;
1305  }
1306 #endif
1307  }
1308  }
1309 }
1310 
1311 #ifndef __CUDACC__
1312 
1313 template <typename InputIterator, typename OutputIterator>
1314 void inclusive_scan(InputIterator first,
1315  InputIterator last,
1316  OutputIterator out,
1317  const size_t thread_count) {
1318  using ElementType = typename InputIterator::value_type;
1319  using OffsetType = typename InputIterator::difference_type;
1320  const OffsetType elem_count = last - first;
1321  if (elem_count < 10000 || thread_count <= 1) {
1322  ElementType sum = 0;
1323  for (auto iter = first; iter != last; ++iter, ++out) {
1324  *out = sum += *iter;
1325  }
1326  return;
1327  }
1328 
1329  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1330  OffsetType start_off = 0;
1331  OffsetType end_off = std::min(step, elem_count);
1332  std::vector<ElementType> partial_sums(thread_count);
1333  std::vector<std::future<void>> counter_threads;
1334  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1335  start_off = std::min(start_off + step, elem_count),
1336  end_off = std::min(start_off + step, elem_count)) {
1337  counter_threads.push_back(std::async(
1339  [first, out](
1340  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1341  ElementType sum = 0;
1342  for (auto in_iter = first + start, out_iter = out + start;
1343  in_iter != (first + end);
1344  ++in_iter, ++out_iter) {
1345  *out_iter = sum += *in_iter;
1346  }
1347  partial_sum = sum;
1348  },
1349  std::ref(partial_sums[thread_idx]),
1350  start_off,
1351  end_off));
1352  }
1353  for (auto& child : counter_threads) {
1354  child.get();
1355  }
1356 
1357  ElementType sum = 0;
1358  for (auto& s : partial_sums) {
1359  s += sum;
1360  sum = s;
1361  }
1362 
1363  counter_threads.clear();
1364  start_off = std::min(step, elem_count);
1365  end_off = std::min(start_off + step, elem_count);
1366  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1367  start_off = std::min(start_off + step, elem_count),
1368  end_off = std::min(start_off + step, elem_count)) {
1369  counter_threads.push_back(std::async(
1371  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1372  for (auto iter = out + start; iter != (out + end); ++iter) {
1373  *iter += prev_sum;
1374  }
1375  },
1376  partial_sums[thread_idx],
1377  start_off,
1378  end_off));
1379  }
1380  for (auto& child : counter_threads) {
1381  child.get();
1382  }
1383 }
1384 
1385 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1387  const int64_t hash_entry_count,
1388  const int32_t invalid_slot_val,
1389  const JoinColumn& join_column,
1390  const JoinColumnTypeInfo& type_info,
1391  const void* sd_inner_proxy,
1392  const void* sd_outer_proxy,
1393  const unsigned cpu_thread_count,
1394  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1395  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1396  auto timer = DEBUG_TIMER(__func__);
1397  int32_t* pos_buff = buff;
1398  int32_t* count_buff = buff + hash_entry_count;
1399  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1400  std::vector<std::future<void>> counter_threads;
1401  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1402  counter_threads.push_back(std::async(
1403  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1404  }
1405 
1406  for (auto& child : counter_threads) {
1407  child.get();
1408  }
1409 
1410  std::vector<int32_t> count_copy(hash_entry_count, 0);
1411  CHECK_GT(hash_entry_count, int64_t(0));
1412  memcpy(count_copy.data() + 1, count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1413 #if HAVE_CUDA
1414  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1415 #else
1417  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1418 #endif
1419  std::vector<std::future<void>> pos_threads;
1420  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1421  pos_threads.push_back(std::async(
1423  [&](size_t thread_idx) {
1424  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1425  if (count_buff[i]) {
1426  pos_buff[i] = count_copy[i];
1427  }
1428  }
1429  },
1430  cpu_thread_idx));
1431  }
1432  for (auto& child : pos_threads) {
1433  child.get();
1434  }
1435 
1436  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1437  std::vector<std::future<void>> rowid_threads;
1438  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1439  rowid_threads.push_back(std::async(
1440  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1441  }
1442 
1443  for (auto& child : rowid_threads) {
1444  child.get();
1445  }
1446 }
1447 
1448 void fill_one_to_many_hash_table(int32_t* buff,
1449  const HashEntryInfo hash_entry_info,
1450  const int32_t invalid_slot_val,
1451  const JoinColumn& join_column,
1452  const JoinColumnTypeInfo& type_info,
1453  const void* sd_inner_proxy,
1454  const void* sd_outer_proxy,
1455  const unsigned cpu_thread_count) {
1456  auto timer = DEBUG_TIMER(__func__);
1457  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1458  invalid_slot_val,
1459  &join_column,
1460  &type_info,
1461  sd_inner_proxy,
1462  sd_outer_proxy](auto cpu_thread_idx,
1463  auto cpu_thread_count) {
1465  (count_buff,
1466  invalid_slot_val,
1467  join_column,
1468  type_info,
1469  sd_inner_proxy,
1470  sd_outer_proxy,
1471  cpu_thread_idx,
1472  cpu_thread_count);
1473  };
1474  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1475  buff,
1476  invalid_slot_val,
1477  &join_column,
1478  &type_info,
1479  sd_inner_proxy,
1480  sd_outer_proxy](auto cpu_thread_idx,
1481  auto cpu_thread_count) {
1483  (buff,
1484  hash_entry_count,
1485  invalid_slot_val,
1486  join_column,
1487  type_info,
1488  sd_inner_proxy,
1489  sd_outer_proxy,
1490  cpu_thread_idx,
1491  cpu_thread_count);
1492  };
1493 
1495  hash_entry_info.hash_entry_count,
1496  invalid_slot_val,
1497  join_column,
1498  type_info,
1499  sd_inner_proxy,
1500  sd_outer_proxy,
1501  cpu_thread_count,
1502  launch_count_matches,
1503  launch_fill_row_ids);
1504 }
1505 
1507  const HashEntryInfo hash_entry_info,
1508  const int32_t invalid_slot_val,
1509  const JoinColumn& join_column,
1510  const JoinColumnTypeInfo& type_info,
1511  const void* sd_inner_proxy,
1512  const void* sd_outer_proxy,
1513  const unsigned cpu_thread_count) {
1514  auto timer = DEBUG_TIMER(__func__);
1515  auto bucket_normalization = hash_entry_info.bucket_normalization;
1516  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1517  auto launch_count_matches = [bucket_normalization,
1518  count_buff = buff + hash_entry_count,
1519  invalid_slot_val,
1520  &join_column,
1521  &type_info,
1522  sd_inner_proxy,
1523  sd_outer_proxy](auto cpu_thread_idx,
1524  auto cpu_thread_count) {
1526  (count_buff,
1527  invalid_slot_val,
1528  join_column,
1529  type_info,
1530  sd_inner_proxy,
1531  sd_outer_proxy,
1532  cpu_thread_idx,
1533  cpu_thread_count,
1534  bucket_normalization);
1535  };
1536  auto launch_fill_row_ids = [bucket_normalization,
1537  hash_entry_count,
1538  buff,
1539  invalid_slot_val,
1540  &join_column,
1541  &type_info,
1542  sd_inner_proxy,
1543  sd_outer_proxy](auto cpu_thread_idx,
1544  auto cpu_thread_count) {
1546  (buff,
1547  hash_entry_count,
1548  invalid_slot_val,
1549  join_column,
1550  type_info,
1551  sd_inner_proxy,
1552  sd_outer_proxy,
1553  cpu_thread_idx,
1554  cpu_thread_count,
1555  bucket_normalization);
1556  };
1557 
1559  hash_entry_count,
1560  invalid_slot_val,
1561  join_column,
1562  type_info,
1563  sd_inner_proxy,
1564  sd_outer_proxy,
1565  cpu_thread_count,
1566  launch_count_matches,
1567  launch_fill_row_ids);
1568 }
1569 
1570 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1572  int32_t* buff,
1573  const int64_t hash_entry_count,
1574  const int32_t invalid_slot_val,
1575  const JoinColumn& join_column,
1576  const JoinColumnTypeInfo& type_info,
1577  const ShardInfo& shard_info,
1578  const void* sd_inner_proxy,
1579  const void* sd_outer_proxy,
1580  const unsigned cpu_thread_count,
1581  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1582  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1583  auto timer = DEBUG_TIMER(__func__);
1584  int32_t* pos_buff = buff;
1585  int32_t* count_buff = buff + hash_entry_count;
1586  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1587  std::vector<std::future<void>> counter_threads;
1588  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1589  counter_threads.push_back(std::async(
1590  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1591  }
1592 
1593  for (auto& child : counter_threads) {
1594  child.get();
1595  }
1596 
1597  std::vector<int32_t> count_copy(hash_entry_count, 0);
1598  CHECK_GT(hash_entry_count, int64_t(0));
1599  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1601  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1602  std::vector<std::future<void>> pos_threads;
1603  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1604  pos_threads.push_back(std::async(
1606  [&](const unsigned thread_idx) {
1607  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1608  if (count_buff[i]) {
1609  pos_buff[i] = count_copy[i];
1610  }
1611  }
1612  },
1613  cpu_thread_idx));
1614  }
1615  for (auto& child : pos_threads) {
1616  child.get();
1617  }
1618 
1619  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1620  std::vector<std::future<void>> rowid_threads;
1621  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1622  rowid_threads.push_back(std::async(
1623  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1624  }
1625 
1626  for (auto& child : rowid_threads) {
1627  child.get();
1628  }
1629 }
1630 
1632  const int64_t hash_entry_count,
1633  const int32_t invalid_slot_val,
1634  const JoinColumn& join_column,
1635  const JoinColumnTypeInfo& type_info,
1636  const ShardInfo& shard_info,
1637  const void* sd_inner_proxy,
1638  const void* sd_outer_proxy,
1639  const unsigned cpu_thread_count) {
1640  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1641  invalid_slot_val,
1642  &join_column,
1643  &type_info,
1644  &shard_info
1645 #ifndef __CUDACC__
1646  ,
1647  sd_inner_proxy,
1648  sd_outer_proxy
1649 #endif
1650  ](auto cpu_thread_idx, auto cpu_thread_count) {
1651  return SUFFIX(count_matches_sharded)(count_buff,
1652  invalid_slot_val,
1653  join_column,
1654  type_info,
1655  shard_info
1656 #ifndef __CUDACC__
1657  ,
1658  sd_inner_proxy,
1659  sd_outer_proxy,
1660  cpu_thread_idx,
1661  cpu_thread_count
1662 #endif
1663  );
1664  };
1665 
1666  auto launch_fill_row_ids = [buff,
1667  hash_entry_count,
1668  invalid_slot_val,
1669  &join_column,
1670  &type_info,
1671  &shard_info
1672 #ifndef __CUDACC__
1673  ,
1674  sd_inner_proxy,
1675  sd_outer_proxy
1676 #endif
1677  ](auto cpu_thread_idx, auto cpu_thread_count) {
1678  return SUFFIX(fill_row_ids_sharded)(buff,
1679  hash_entry_count,
1680  invalid_slot_val,
1681  join_column,
1682  type_info,
1683  shard_info
1684 #ifndef __CUDACC__
1685  ,
1686  sd_inner_proxy,
1687  sd_outer_proxy,
1688  cpu_thread_idx,
1689  cpu_thread_count);
1690 #endif
1691  };
1692 
1694  hash_entry_count,
1695  invalid_slot_val,
1696  join_column,
1697  type_info,
1698  shard_info
1699 #ifndef __CUDACC__
1700  ,
1701  sd_inner_proxy,
1702  sd_outer_proxy,
1703  cpu_thread_count
1704 #endif
1705  ,
1706  launch_count_matches,
1707  launch_fill_row_ids);
1708 }
1709 
1710 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1711  const int64_t entry_count,
1712  const size_t key_component_count,
1713  const bool with_val_slot,
1714  const int32_t invalid_slot_val,
1715  const int32_t cpu_thread_idx,
1716  const int32_t cpu_thread_count) {
1717  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1718  entry_count,
1719  key_component_count,
1720  with_val_slot,
1721  invalid_slot_val,
1722  cpu_thread_idx,
1723  cpu_thread_count);
1724 }
1725 
1726 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1727  const int64_t entry_count,
1728  const size_t key_component_count,
1729  const bool with_val_slot,
1730  const int32_t invalid_slot_val,
1731  const int32_t cpu_thread_idx,
1732  const int32_t cpu_thread_count) {
1733  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1734  entry_count,
1735  key_component_count,
1736  with_val_slot,
1737  invalid_slot_val,
1738  cpu_thread_idx,
1739  cpu_thread_count);
1740 }
1741 
1742 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1743  const int64_t entry_count,
1744  const int32_t invalid_slot_val,
1745  const bool for_semi_join,
1746  const size_t key_component_count,
1747  const bool with_val_slot,
1748  const GenericKeyHandler* key_handler,
1749  const int64_t num_elems,
1750  const int32_t cpu_thread_idx,
1751  const int32_t cpu_thread_count) {
1752  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1753  entry_count,
1754  invalid_slot_val,
1755  for_semi_join,
1756  key_component_count,
1757  with_val_slot,
1758  key_handler,
1759  num_elems,
1760  cpu_thread_idx,
1761  cpu_thread_count);
1762 }
1763 
1765  const int64_t entry_count,
1766  const int32_t invalid_slot_val,
1767  const size_t key_component_count,
1768  const bool with_val_slot,
1769  const OverlapsKeyHandler* key_handler,
1770  const int64_t num_elems,
1771  const int32_t cpu_thread_idx,
1772  const int32_t cpu_thread_count) {
1773  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1774  entry_count,
1775  invalid_slot_val,
1776  false,
1777  key_component_count,
1778  with_val_slot,
1779  key_handler,
1780  num_elems,
1781  cpu_thread_idx,
1782  cpu_thread_count);
1783 }
1784 
1786  const size_t entry_count,
1787  const int32_t invalid_slot_val,
1788  const size_t key_component_count,
1789  const bool with_val_slot,
1790  const RangeKeyHandler* key_handler,
1791  const size_t num_elems,
1792  const int32_t cpu_thread_idx,
1793  const int32_t cpu_thread_count) {
1794  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1795  entry_count,
1796  invalid_slot_val,
1797  false,
1798  key_component_count,
1799  with_val_slot,
1800  key_handler,
1801  num_elems,
1802  cpu_thread_idx,
1803  cpu_thread_count);
1804 }
1805 
1806 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1807  const int64_t entry_count,
1808  const int32_t invalid_slot_val,
1809  const bool for_semi_join,
1810  const size_t key_component_count,
1811  const bool with_val_slot,
1812  const GenericKeyHandler* key_handler,
1813  const int64_t num_elems,
1814  const int32_t cpu_thread_idx,
1815  const int32_t cpu_thread_count) {
1816  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1817  entry_count,
1818  invalid_slot_val,
1819  for_semi_join,
1820  key_component_count,
1821  with_val_slot,
1822  key_handler,
1823  num_elems,
1824  cpu_thread_idx,
1825  cpu_thread_count);
1826 }
1827 
1829  const int64_t entry_count,
1830  const int32_t invalid_slot_val,
1831  const size_t key_component_count,
1832  const bool with_val_slot,
1833  const OverlapsKeyHandler* key_handler,
1834  const int64_t num_elems,
1835  const int32_t cpu_thread_idx,
1836  const int32_t cpu_thread_count) {
1837  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1838  entry_count,
1839  invalid_slot_val,
1840  false,
1841  key_component_count,
1842  with_val_slot,
1843  key_handler,
1844  num_elems,
1845  cpu_thread_idx,
1846  cpu_thread_count);
1847 }
1848 
1850  const size_t entry_count,
1851  const int32_t invalid_slot_val,
1852  const size_t key_component_count,
1853  const bool with_val_slot,
1854  const RangeKeyHandler* key_handler,
1855  const size_t num_elems,
1856  const int32_t cpu_thread_idx,
1857  const int32_t cpu_thread_count) {
1858  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1859  entry_count,
1860  invalid_slot_val,
1861  false,
1862  key_component_count,
1863  with_val_slot,
1864  key_handler,
1865  num_elems,
1866  cpu_thread_idx,
1867  cpu_thread_count);
1868 }
1869 
1870 template <typename T>
1872  int32_t* buff,
1873  const T* composite_key_dict,
1874  const int64_t hash_entry_count,
1875  const int32_t invalid_slot_val,
1876  const size_t key_component_count,
1877  const std::vector<JoinColumn>& join_column_per_key,
1878  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1879  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1880  const std::vector<const void*>& sd_inner_proxy_per_key,
1881  const std::vector<const void*>& sd_outer_proxy_per_key,
1882  const size_t cpu_thread_count,
1883  const bool is_range_join,
1884  const bool is_geo_compressed) {
1885  int32_t* pos_buff = buff;
1886  int32_t* count_buff = buff + hash_entry_count;
1887  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1888  std::vector<std::future<void>> counter_threads;
1889  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1890  if (is_range_join) {
1891  counter_threads.push_back(std::async(
1893  [count_buff,
1894  composite_key_dict,
1895  &hash_entry_count,
1896  &join_buckets_per_key,
1897  &join_column_per_key,
1898  &is_geo_compressed,
1899  cpu_thread_idx,
1900  cpu_thread_count] {
1901  const auto key_handler = RangeKeyHandler(
1902  is_geo_compressed,
1903  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1904  &join_column_per_key[0],
1905  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1906  count_matches_baseline(count_buff,
1907  composite_key_dict,
1908  hash_entry_count,
1909  &key_handler,
1910  join_column_per_key[0].num_elems,
1911  cpu_thread_idx,
1912  cpu_thread_count);
1913  }));
1914  } else if (join_buckets_per_key.size() > 0) {
1915  counter_threads.push_back(std::async(
1917  [count_buff,
1918  composite_key_dict,
1919  &hash_entry_count,
1920  &join_buckets_per_key,
1921  &join_column_per_key,
1922  cpu_thread_idx,
1923  cpu_thread_count] {
1924  const auto key_handler = OverlapsKeyHandler(
1925  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1926  &join_column_per_key[0],
1927  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1928  count_matches_baseline(count_buff,
1929  composite_key_dict,
1930  hash_entry_count,
1931  &key_handler,
1932  join_column_per_key[0].num_elems,
1933  cpu_thread_idx,
1934  cpu_thread_count);
1935  }));
1936  } else {
1937  counter_threads.push_back(std::async(
1939  [count_buff,
1940  composite_key_dict,
1941  &key_component_count,
1942  &hash_entry_count,
1943  &join_column_per_key,
1944  &type_info_per_key,
1945  &sd_inner_proxy_per_key,
1946  &sd_outer_proxy_per_key,
1947  cpu_thread_idx,
1948  cpu_thread_count] {
1949  const auto key_handler = GenericKeyHandler(key_component_count,
1950  true,
1951  &join_column_per_key[0],
1952  &type_info_per_key[0],
1953  &sd_inner_proxy_per_key[0],
1954  &sd_outer_proxy_per_key[0]);
1955  count_matches_baseline(count_buff,
1956  composite_key_dict,
1957  hash_entry_count,
1958  &key_handler,
1959  join_column_per_key[0].num_elems,
1960  cpu_thread_idx,
1961  cpu_thread_count);
1962  }));
1963  }
1964  }
1965 
1966  for (auto& child : counter_threads) {
1967  child.get();
1968  }
1969 
1970  std::vector<int32_t> count_copy(hash_entry_count, 0);
1971  CHECK_GT(hash_entry_count, int64_t(0));
1972  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1974  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1975  std::vector<std::future<void>> pos_threads;
1976  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1977  pos_threads.push_back(std::async(
1979  [&](const int thread_idx) {
1980  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1981  if (count_buff[i]) {
1982  pos_buff[i] = count_copy[i];
1983  }
1984  }
1985  },
1986  cpu_thread_idx));
1987  }
1988  for (auto& child : pos_threads) {
1989  child.get();
1990  }
1991 
1992  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1993  std::vector<std::future<void>> rowid_threads;
1994  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1995  if (is_range_join) {
1996  rowid_threads.push_back(std::async(
1998  [buff,
1999  composite_key_dict,
2000  hash_entry_count,
2001  invalid_slot_val,
2002  &join_column_per_key,
2003  &join_buckets_per_key,
2004  &is_geo_compressed,
2005  cpu_thread_idx,
2006  cpu_thread_count] {
2007  const auto key_handler = RangeKeyHandler(
2008  is_geo_compressed,
2009  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2010  &join_column_per_key[0],
2011  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2013  (buff,
2014  composite_key_dict,
2015  hash_entry_count,
2016  invalid_slot_val,
2017  &key_handler,
2018  join_column_per_key[0].num_elems,
2019  cpu_thread_idx,
2020  cpu_thread_count);
2021  }));
2022  } else if (join_buckets_per_key.size() > 0) {
2023  rowid_threads.push_back(std::async(
2025  [buff,
2026  composite_key_dict,
2027  hash_entry_count,
2028  invalid_slot_val,
2029  &join_column_per_key,
2030  &join_buckets_per_key,
2031  cpu_thread_idx,
2032  cpu_thread_count] {
2033  const auto key_handler = OverlapsKeyHandler(
2034  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2035  &join_column_per_key[0],
2036  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2038  (buff,
2039  composite_key_dict,
2040  hash_entry_count,
2041  invalid_slot_val,
2042  &key_handler,
2043  join_column_per_key[0].num_elems,
2044  cpu_thread_idx,
2045  cpu_thread_count);
2046  }));
2047  } else {
2048  rowid_threads.push_back(std::async(std::launch::async,
2049  [buff,
2050  composite_key_dict,
2051  hash_entry_count,
2052  invalid_slot_val,
2053  key_component_count,
2054  &join_column_per_key,
2055  &type_info_per_key,
2056  &sd_inner_proxy_per_key,
2057  &sd_outer_proxy_per_key,
2058  cpu_thread_idx,
2059  cpu_thread_count] {
2060  const auto key_handler = GenericKeyHandler(
2061  key_component_count,
2062  true,
2063  &join_column_per_key[0],
2064  &type_info_per_key[0],
2065  &sd_inner_proxy_per_key[0],
2066  &sd_outer_proxy_per_key[0]);
2068  (buff,
2069  composite_key_dict,
2070  hash_entry_count,
2071  invalid_slot_val,
2072  &key_handler,
2073  join_column_per_key[0].num_elems,
2074  cpu_thread_idx,
2075  cpu_thread_count);
2076  }));
2077  }
2078  }
2079 
2080  for (auto& child : rowid_threads) {
2081  child.get();
2082  }
2083 }
2084 
2086  int32_t* buff,
2087  const int32_t* composite_key_dict,
2088  const int64_t hash_entry_count,
2089  const int32_t invalid_slot_val,
2090  const size_t key_component_count,
2091  const std::vector<JoinColumn>& join_column_per_key,
2092  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2093  const std::vector<JoinBucketInfo>& join_bucket_info,
2094  const std::vector<const void*>& sd_inner_proxy_per_key,
2095  const std::vector<const void*>& sd_outer_proxy_per_key,
2096  const int32_t cpu_thread_count,
2097  const bool is_range_join,
2098  const bool is_geo_compressed) {
2099  fill_one_to_many_baseline_hash_table<int32_t>(buff,
2100  composite_key_dict,
2101  hash_entry_count,
2102  invalid_slot_val,
2103  key_component_count,
2104  join_column_per_key,
2105  type_info_per_key,
2106  join_bucket_info,
2107  sd_inner_proxy_per_key,
2108  sd_outer_proxy_per_key,
2109  cpu_thread_count,
2110  is_range_join,
2111  is_geo_compressed);
2112 }
2113 
2115  int32_t* buff,
2116  const int64_t* composite_key_dict,
2117  const int64_t hash_entry_count,
2118  const int32_t invalid_slot_val,
2119  const size_t key_component_count,
2120  const std::vector<JoinColumn>& join_column_per_key,
2121  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2122  const std::vector<JoinBucketInfo>& join_bucket_info,
2123  const std::vector<const void*>& sd_inner_proxy_per_key,
2124  const std::vector<const void*>& sd_outer_proxy_per_key,
2125  const int32_t cpu_thread_count,
2126  const bool is_range_join,
2127  const bool is_geo_compressed) {
2128  fill_one_to_many_baseline_hash_table<int64_t>(buff,
2129  composite_key_dict,
2130  hash_entry_count,
2131  invalid_slot_val,
2132  key_component_count,
2133  join_column_per_key,
2134  type_info_per_key,
2135  join_bucket_info,
2136  sd_inner_proxy_per_key,
2137  sd_outer_proxy_per_key,
2138  cpu_thread_count,
2139  is_range_join,
2140  is_geo_compressed);
2141 }
2142 
2143 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
2144  const uint32_t b,
2145  const size_t padded_size_bytes,
2146  const std::vector<JoinColumn>& join_column_per_key,
2147  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2148  const int thread_count) {
2149  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2150  CHECK(!join_column_per_key.empty());
2151 
2152  std::vector<std::future<void>> approx_distinct_threads;
2153  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2154  approx_distinct_threads.push_back(std::async(
2156  [&join_column_per_key,
2157  &type_info_per_key,
2158  b,
2159  hll_buffer_all_cpus,
2160  padded_size_bytes,
2161  thread_idx,
2162  thread_count] {
2163  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2164 
2165  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
2166  false,
2167  &join_column_per_key[0],
2168  &type_info_per_key[0],
2169  nullptr,
2170  nullptr);
2172  nullptr,
2173  b,
2174  join_column_per_key[0].num_elems,
2175  &key_handler,
2176  thread_idx,
2177  thread_count);
2178  }));
2179  }
2180  for (auto& child : approx_distinct_threads) {
2181  child.get();
2182  }
2183 }
2184 
2186  uint8_t* hll_buffer_all_cpus,
2187  std::vector<int32_t>& row_counts,
2188  const uint32_t b,
2189  const size_t padded_size_bytes,
2190  const std::vector<JoinColumn>& join_column_per_key,
2191  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2192  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2193  const int thread_count) {
2194  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2195  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2196  CHECK(!join_column_per_key.empty());
2197 
2198  std::vector<std::future<void>> approx_distinct_threads;
2199  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2200  approx_distinct_threads.push_back(std::async(
2202  [&join_column_per_key,
2203  &join_buckets_per_key,
2204  &row_counts,
2205  b,
2206  hll_buffer_all_cpus,
2207  padded_size_bytes,
2208  thread_idx,
2209  thread_count] {
2210  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2211 
2212  const auto key_handler = OverlapsKeyHandler(
2213  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2214  &join_column_per_key[0],
2215  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2217  row_counts.data(),
2218  b,
2219  join_column_per_key[0].num_elems,
2220  &key_handler,
2221  thread_idx,
2222  thread_count);
2223  }));
2224  }
2225  for (auto& child : approx_distinct_threads) {
2226  child.get();
2227  }
2228 
2230  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2231 }
2232 
2234  uint8_t* hll_buffer_all_cpus,
2235  std::vector<int32_t>& row_counts,
2236  const uint32_t b,
2237  const size_t padded_size_bytes,
2238  const std::vector<JoinColumn>& join_column_per_key,
2239  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2240  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2241  const bool is_compressed,
2242  const int thread_count) {
2243  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2244  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2245  CHECK(!join_column_per_key.empty());
2246 
2247  std::vector<std::future<void>> approx_distinct_threads;
2248  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2249  approx_distinct_threads.push_back(std::async(
2251  [&join_column_per_key,
2252  &join_buckets_per_key,
2253  &row_counts,
2254  b,
2255  hll_buffer_all_cpus,
2256  padded_size_bytes,
2257  thread_idx,
2258  is_compressed,
2259  thread_count] {
2260  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2261 
2262  const auto key_handler = RangeKeyHandler(
2263  is_compressed,
2264  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2265  &join_column_per_key[0],
2266  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2268  row_counts.data(),
2269  b,
2270  join_column_per_key[0].num_elems,
2271  &key_handler,
2272  thread_idx,
2273  thread_count);
2274  }));
2275  }
2276  for (auto& child : approx_distinct_threads) {
2277  child.get();
2278  }
2279 
2281  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2282 }
2283 
2284 void compute_bucket_sizes_on_cpu(std::vector<double>& bucket_sizes_for_dimension,
2285  const JoinColumn& join_column,
2286  const JoinColumnTypeInfo& type_info,
2287  const std::vector<double>& bucket_size_thresholds,
2288  const int thread_count) {
2289  std::vector<std::vector<double>> bucket_sizes_for_threads;
2290  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2291  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(), 0.0);
2292  }
2293  std::vector<std::future<void>> threads;
2294  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2295  threads.push_back(std::async(std::launch::async,
2296  compute_bucket_sizes_impl<2>,
2297  bucket_sizes_for_threads[thread_idx].data(),
2298  &join_column,
2299  &type_info,
2300  bucket_size_thresholds.data(),
2301  thread_idx,
2302  thread_count));
2303  }
2304  for (auto& child : threads) {
2305  child.get();
2306  }
2307 
2308  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2309  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2310  if (bucket_sizes_for_threads[thread_idx][i] > bucket_sizes_for_dimension[i]) {
2311  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2312  }
2313  }
2314  }
2315 }
2316 
2317 #endif // ifndef __CUDACC__
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
const size_t num_shards
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:67
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:110
__device__ double atomicMin(double *address, double val)
#define GLOBAL
#define CHECK_GE(x, y)
Definition: Logger.h:222
#define SUFFIX(name)
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::string getString(int32_t string_id) const
int64_t translate_str_id_to_outer_dict(const int64_t elem, const int64_t min_elem, const int64_t max_elem, const void *sd_inner_proxy, const void *sd_outer_proxy)
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:55
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
const int64_t null_val
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
#define CHECK_GT(x, y)
Definition: Logger.h:221
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
#define DEVICE
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:75
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
future< Result > async(Fn &&fn, Args &&...args)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:81
static constexpr int32_t INVALID_STR_ID
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE T SUFFIX() get_invalid_key()
#define CHECK_NE(x, y)
Definition: Logger.h:218
#define mapd_cas(address, compare, val)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
int64_t bucket_normalization
#define cas_cst(ptr, expected, desired)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t shard
size_t hash_entry_count
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:25
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const size_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:125
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:96
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const void * > &sd_inner_proxy_per_key, const std::vector< const void * > &sd_outer_proxy_per_key, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void count_matches_impl(int32_t *count_buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
#define NEVER_INLINE
size_t getNormalizedHashEntryCount() const
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define store_cst(ptr, val)
constexpr unsigned N
Definition: Utm.h:118
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:45
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:134
char * f
DEVICE int write_baseline_hash_slot_for_semi_join(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t g_maximum_conditions_to_coalesce
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const void *sd_inner_proxy, const void *sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)