OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
HashJoinRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "HashJoinRuntime.h"
18 
24 #include "Shared/shard_key.h"
25 #ifdef __CUDACC__
29 #else
30 #include "Logger/Logger.h"
31 
33 #include "Shared/likely.h"
36 
37 #ifdef HAVE_TBB
38 #include <tbb/parallel_for.h>
39 #endif
40 
41 #include <future>
42 #endif
43 
44 #if HAVE_CUDA
45 #include <thrust/scan.h>
46 #endif
47 #include "Shared/funcannotations.h"
48 
49 #include <cmath>
50 #include <numeric>
51 
52 #ifndef __CUDACC__
53 namespace {
54 
55 inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
56  const int64_t min_inner_elem,
57  const int64_t min_outer_elem,
58  const int64_t max_outer_elem,
59  const int32_t* inner_to_outer_translation_map) {
60  const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
61  if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
63  }
64  return outer_id;
65 }
66 
67 } // namespace
68 #endif
69 
70 DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer,
71  const int64_t hash_entry_count,
72  const int32_t invalid_slot_val,
73  const int32_t cpu_thread_idx,
74  const int32_t cpu_thread_count) {
75 #ifdef __CUDACC__
76  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
77  int32_t step = blockDim.x * gridDim.x;
78 #else
79  int32_t start = cpu_thread_idx;
80  int32_t step = cpu_thread_count;
81 #endif
82  for (int64_t i = start; i < hash_entry_count; i += step) {
83  groups_buffer[i] = invalid_slot_val;
84  }
85 }
86 
87 #ifndef __CUDACC__
88 #ifdef HAVE_TBB
89 
90 void SUFFIX(init_hash_join_buff_tbb)(int32_t* groups_buffer,
91  const int64_t hash_entry_count,
92  const int32_t invalid_slot_val) {
93  tbb::parallel_for(tbb::blocked_range<int64_t>(0, hash_entry_count),
94  [=](const tbb::blocked_range<int64_t>& r) {
95  const auto start_idx = r.begin();
96  const auto end_idx = r.end();
97  for (auto entry_idx = start_idx; entry_idx != end_idx;
98  ++entry_idx) {
99  groups_buffer[entry_idx] = invalid_slot_val;
100  }
101  });
102 }
103 
104 #endif // #ifdef HAVE_TBB
105 #endif // #ifndef __CUDACC__
106 
107 #ifdef __CUDACC__
108 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
109 #elif defined(_MSC_VER)
110 #define mapd_cas(address, compare, val) \
111  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
112  static_cast<long>(val), \
113  static_cast<long>(compare))
114 #else
115 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
116 #endif
117 
118 template <typename HASHTABLE_FILLING_FUNC>
119 DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
120  const JoinColumn join_column,
121  const JoinColumnTypeInfo type_info,
122  const int32_t* sd_inner_to_outer_translation_map,
123  const int32_t min_inner_elem,
124  const int32_t cpu_thread_idx,
125  const int32_t cpu_thread_count,
126  HASHTABLE_FILLING_FUNC filling_func) {
127 #ifdef __CUDACC__
128  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
129  int32_t step = blockDim.x * gridDim.x;
130 #else
131  int32_t start = cpu_thread_idx;
132  int32_t step = cpu_thread_count;
133 #endif
134  JoinColumnTyped col{&join_column, &type_info};
135  for (auto item : col.slice(start, step)) {
136  const size_t index = item.index;
137  int64_t elem = item.element;
138  if (elem == type_info.null_val) {
139  if (type_info.uses_bw_eq) {
140  elem = type_info.translated_null_val;
141  } else {
142  continue;
143  }
144  }
145 #ifndef __CUDACC__
146  if (sd_inner_to_outer_translation_map &&
147  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
148  const auto outer_id = map_str_id_to_outer_dict(elem,
149  min_inner_elem,
150  type_info.min_val,
151  type_info.max_val,
152  sd_inner_to_outer_translation_map);
153  if (outer_id == StringDictionary::INVALID_STR_ID) {
154  continue;
155  }
156  elem = outer_id;
157  }
158 #endif
159  if (filling_func(elem, index)) {
160  return -1;
161  }
162  }
163  return 0;
164 };
165 
167  int32_t* buff,
168  const int32_t invalid_slot_val,
169  const bool for_semi_join,
170  const JoinColumn join_column,
171  const JoinColumnTypeInfo type_info,
172  const int32_t* sd_inner_to_outer_translation_map,
173  const int32_t min_inner_elem,
174  const int32_t cpu_thread_idx,
175  const int32_t cpu_thread_count,
176  const int64_t bucket_normalization) {
177  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
179  auto hashtable_filling_func = [&](auto elem, size_t index) {
180  auto entry_ptr = SUFFIX(get_bucketized_hash_slot)(
181  buff, elem, type_info.min_val, bucket_normalization);
182  return filling_func(index, entry_ptr, invalid_slot_val);
183  };
184 
185  return fill_hash_join_buff_impl(buff,
186  join_column,
187  type_info,
188  sd_inner_to_outer_translation_map,
189  min_inner_elem,
190  cpu_thread_idx,
191  cpu_thread_count,
192  hashtable_filling_func);
193 }
194 
195 DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
196  const int32_t invalid_slot_val,
197  const bool for_semi_join,
198  const JoinColumn join_column,
199  const JoinColumnTypeInfo type_info,
200  const int32_t* sd_inner_to_outer_translation_map,
201  const int32_t min_inner_elem,
202  const int32_t cpu_thread_idx,
203  const int32_t cpu_thread_count) {
204  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
206  auto hashtable_filling_func = [&](auto elem, size_t index) {
207  auto entry_ptr = SUFFIX(get_hash_slot)(buff, elem, type_info.min_val);
208  return filling_func(index, entry_ptr, invalid_slot_val);
209  };
210 
211  return fill_hash_join_buff_impl(buff,
212  join_column,
213  type_info,
214  sd_inner_to_outer_translation_map,
215  min_inner_elem,
216  cpu_thread_idx,
217  cpu_thread_count,
218  hashtable_filling_func);
219 }
220 
221 template <typename HASHTABLE_FILLING_FUNC>
223  int32_t* buff,
224  const JoinColumn join_column,
225  const JoinColumnTypeInfo type_info,
226  const ShardInfo shard_info,
227  const int32_t* sd_inner_to_outer_translation_map,
228  const int32_t min_inner_elem,
229  const int32_t cpu_thread_idx,
230  const int32_t cpu_thread_count,
231  HASHTABLE_FILLING_FUNC filling_func) {
232 #ifdef __CUDACC__
233  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
234  int32_t step = blockDim.x * gridDim.x;
235 #else
236  int32_t start = cpu_thread_idx;
237  int32_t step = cpu_thread_count;
238 #endif
239  JoinColumnTyped col{&join_column, &type_info};
240  for (auto item : col.slice(start, step)) {
241  const size_t index = item.index;
242  int64_t elem = item.element;
243  size_t shard = SHARD_FOR_KEY(elem, shard_info.num_shards);
244  if (shard != shard_info.shard) {
245  continue;
246  }
247  if (elem == type_info.null_val) {
248  if (type_info.uses_bw_eq) {
249  elem = type_info.translated_null_val;
250  } else {
251  continue;
252  }
253  }
254 #ifndef __CUDACC__
255  if (sd_inner_to_outer_translation_map &&
256  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
257  const auto outer_id = map_str_id_to_outer_dict(elem,
258  min_inner_elem,
259  type_info.min_val,
260  type_info.max_val,
261  sd_inner_to_outer_translation_map);
262  if (outer_id == StringDictionary::INVALID_STR_ID) {
263  continue;
264  }
265  elem = outer_id;
266  }
267 #endif
268  if (filling_func(elem, shard, index)) {
269  return -1;
270  }
271  }
272  return 0;
273 }
274 
276  int32_t* buff,
277  const int32_t invalid_slot_val,
278  const bool for_semi_join,
279  const JoinColumn join_column,
280  const JoinColumnTypeInfo type_info,
281  const ShardInfo shard_info,
282  const int32_t* sd_inner_to_outer_translation_map,
283  const int32_t min_inner_elem,
284  const int32_t cpu_thread_idx,
285  const int32_t cpu_thread_count,
286  const int64_t bucket_normalization) {
287  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
289  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
290  auto entry_ptr =
292  elem,
293  type_info.min_val,
294  shard_info.entry_count_per_shard,
295  shard,
296  shard_info.num_shards,
297  shard_info.device_count,
298  bucket_normalization);
299  return filling_func(index, entry_ptr, invalid_slot_val);
300  };
301 
303  join_column,
304  type_info,
305  shard_info,
306  sd_inner_to_outer_translation_map,
307  min_inner_elem,
308  cpu_thread_idx,
309  cpu_thread_count,
310  hashtable_filling_func);
311 }
312 
314  int32_t* buff,
315  const int32_t invalid_slot_val,
316  const bool for_semi_join,
317  const JoinColumn join_column,
318  const JoinColumnTypeInfo type_info,
319  const ShardInfo shard_info,
320  const int32_t* sd_inner_to_outer_translation_map,
321  const int32_t min_inner_elem,
322  const int32_t cpu_thread_idx,
323  const int32_t cpu_thread_count) {
324  auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
326  auto hashtable_filling_func = [&](auto elem, auto shard, size_t index) {
327  auto entry_ptr = SUFFIX(get_hash_slot_sharded_opt)(buff,
328  elem,
329  type_info.min_val,
330  shard_info.entry_count_per_shard,
331  shard,
332  shard_info.num_shards,
333  shard_info.device_count);
334  return filling_func(index, entry_ptr, invalid_slot_val);
335  };
336 
338  join_column,
339  type_info,
340  shard_info,
341  sd_inner_to_outer_translation_map,
342  min_inner_elem,
343  cpu_thread_idx,
344  cpu_thread_count,
345  hashtable_filling_func);
346 }
347 
348 template <typename T>
350  const int64_t entry_count,
351  const size_t key_component_count,
352  const bool with_val_slot,
353  const int32_t invalid_slot_val,
354  const int32_t cpu_thread_idx,
355  const int32_t cpu_thread_count) {
356 #ifdef __CUDACC__
357  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
358  int32_t step = blockDim.x * gridDim.x;
359 #else
360  int32_t start = cpu_thread_idx;
361  int32_t step = cpu_thread_count;
362 #endif
363  auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
364  const T empty_key = SUFFIX(get_invalid_key)<T>();
365  for (int64_t h = start; h < entry_count; h += step) {
366  int64_t off = h * hash_entry_size;
367  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
368  for (size_t i = 0; i < key_component_count; ++i) {
369  row_ptr[i] = empty_key;
370  }
371  if (with_val_slot) {
372  row_ptr[key_component_count] = invalid_slot_val;
373  }
374  }
375 }
376 
377 #ifndef __CUDACC__
378 #ifdef HAVE_TBB
379 
380 template <typename T>
381 DEVICE void SUFFIX(init_baseline_hash_join_buff_tbb)(int8_t* hash_buff,
382  const int64_t entry_count,
383  const size_t key_component_count,
384  const bool with_val_slot,
385  const int32_t invalid_slot_val) {
386  const auto hash_entry_size =
387  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
388  const T empty_key = SUFFIX(get_invalid_key)<T>();
389  tbb::parallel_for(tbb::blocked_range<int64_t>(0, entry_count),
390  [=](const tbb::blocked_range<int64_t>& r) {
391  const auto start_idx = r.begin();
392  const auto end_idx = r.end();
393  for (int64_t entry_idx = start_idx; entry_idx < end_idx;
394  ++entry_idx) {
395  const int64_t offset = entry_idx * hash_entry_size;
396  auto row_ptr = reinterpret_cast<T*>(hash_buff + offset);
397  for (size_t k = 0; k < key_component_count; ++k) {
398  row_ptr[k] = empty_key;
399  }
400  if (with_val_slot) {
401  row_ptr[key_component_count] = invalid_slot_val;
402  }
403  }
404  });
405 }
406 
407 #endif // #ifdef HAVE_TBB
408 #endif // #ifndef __CUDACC__
409 
410 #ifdef __CUDACC__
411 template <typename T>
412 __device__ T* get_matching_baseline_hash_slot_at(int8_t* hash_buff,
413  const uint32_t h,
414  const T* key,
415  const size_t key_component_count,
416  const int64_t hash_entry_size) {
417  uint32_t off = h * hash_entry_size;
418  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
419  const T empty_key = SUFFIX(get_invalid_key)<T>();
420  {
421  const T old = atomicCAS(row_ptr, empty_key, *key);
422  if (empty_key == old && key_component_count > 1) {
423  for (int64_t i = 1; i <= key_component_count - 1; ++i) {
424  atomicExch(row_ptr + i, key[i]);
425  }
426  }
427  }
428  if (key_component_count > 1) {
429  while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
430  // spin until the winning thread has finished writing the entire key and the init
431  // value
432  }
433  }
434  bool match = true;
435  for (uint32_t i = 0; i < key_component_count; ++i) {
436  if (row_ptr[i] != key[i]) {
437  match = false;
438  break;
439  }
440  }
441 
442  if (match) {
443  return reinterpret_cast<T*>(row_ptr + key_component_count);
444  }
445  return nullptr;
446 }
447 #else
448 
449 #ifdef _MSC_VER
450 #define cas_cst(ptr, expected, desired) \
451  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
452  reinterpret_cast<void*>(&desired), \
453  expected) == expected)
454 #define store_cst(ptr, val) \
455  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
456  reinterpret_cast<void*>(val))
457 #define load_cst(ptr) \
458  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
459 #else
460 #define cas_cst(ptr, expected, desired) \
461  __atomic_compare_exchange_n( \
462  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
463 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
464 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
465 #endif
466 
467 template <typename T>
469  const uint32_t h,
470  const T* key,
471  const size_t key_component_count,
472  const int64_t hash_entry_size) {
473  uint32_t off = h * hash_entry_size;
474  auto row_ptr = reinterpret_cast<T*>(hash_buff + off);
475  T empty_key = SUFFIX(get_invalid_key)<T>();
476  T write_pending = SUFFIX(get_invalid_key)<T>() - 1;
477  if (UNLIKELY(*key == write_pending)) {
478  // Address the singularity case where the first column contains the pending
479  // write special value. Should never happen, but avoid doing wrong things.
480  return nullptr;
481  }
482  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
483  if (success) {
484  if (key_component_count > 1) {
485  memcpy(row_ptr + 1, key + 1, (key_component_count - 1) * sizeof(T));
486  }
487  store_cst(row_ptr, *key);
488  return reinterpret_cast<T*>(row_ptr + key_component_count);
489  }
490  while (load_cst(row_ptr) == write_pending) {
491  // spin until the winning thread has finished writing the entire key
492  }
493  for (size_t i = 0; i < key_component_count; ++i) {
494  if (load_cst(row_ptr + i) != key[i]) {
495  return nullptr;
496  }
497  }
498  return reinterpret_cast<T*>(row_ptr + key_component_count);
499 }
500 
501 #undef load_cst
502 #undef store_cst
503 #undef cas_cst
504 
505 #endif // __CUDACC__
506 
507 template <typename T>
508 DEVICE int write_baseline_hash_slot(const int32_t val,
509  int8_t* hash_buff,
510  const int64_t entry_count,
511  const T* key,
512  const size_t key_component_count,
513  const bool with_val_slot,
514  const int32_t invalid_slot_val,
515  const size_t key_size_in_bytes,
516  const size_t hash_entry_size) {
517  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
518  T* matching_group = get_matching_baseline_hash_slot_at(
519  hash_buff, h, key, key_component_count, hash_entry_size);
520  if (!matching_group) {
521  uint32_t h_probe = (h + 1) % entry_count;
522  while (h_probe != h) {
523  matching_group = get_matching_baseline_hash_slot_at(
524  hash_buff, h_probe, key, key_component_count, hash_entry_size);
525  if (matching_group) {
526  break;
527  }
528  h_probe = (h_probe + 1) % entry_count;
529  }
530  }
531  if (!matching_group) {
532  return -2;
533  }
534  if (!with_val_slot) {
535  return 0;
536  }
537  if (mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
538  return -1;
539  }
540  return 0;
541 }
542 
543 template <typename T>
545  int8_t* hash_buff,
546  const int64_t entry_count,
547  const T* key,
548  const size_t key_component_count,
549  const bool with_val_slot,
550  const int32_t invalid_slot_val,
551  const size_t key_size_in_bytes,
552  const size_t hash_entry_size) {
553  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
554  T* matching_group = get_matching_baseline_hash_slot_at(
555  hash_buff, h, key, key_component_count, hash_entry_size);
556  if (!matching_group) {
557  uint32_t h_probe = (h + 1) % entry_count;
558  while (h_probe != h) {
559  matching_group = get_matching_baseline_hash_slot_at(
560  hash_buff, h_probe, key, key_component_count, hash_entry_size);
561  if (matching_group) {
562  break;
563  }
564  h_probe = (h_probe + 1) % entry_count;
565  }
566  }
567  if (!matching_group) {
568  return -2;
569  }
570  if (!with_val_slot) {
571  return 0;
572  }
573  mapd_cas(matching_group, invalid_slot_val, val);
574  return 0;
575 }
576 
577 template <typename T, typename FILL_HANDLER>
579  const int64_t entry_count,
580  const int32_t invalid_slot_val,
581  const bool for_semi_join,
582  const size_t key_component_count,
583  const bool with_val_slot,
584  const FILL_HANDLER* f,
585  const int64_t num_elems,
586  const int32_t cpu_thread_idx,
587  const int32_t cpu_thread_count) {
588 #ifdef __CUDACC__
589  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
590  int32_t step = blockDim.x * gridDim.x;
591 #else
592  int32_t start = cpu_thread_idx;
593  int32_t step = cpu_thread_count;
594 #endif
595 
596  T key_scratch_buff[g_maximum_conditions_to_coalesce];
597  const size_t key_size_in_bytes = key_component_count * sizeof(T);
598  const size_t hash_entry_size =
599  (key_component_count + (with_val_slot ? 1 : 0)) * sizeof(T);
600  auto key_buff_handler = [hash_buff,
601  entry_count,
602  with_val_slot,
603  invalid_slot_val,
604  key_size_in_bytes,
605  hash_entry_size,
606  &for_semi_join](const int64_t entry_idx,
607  const T* key_scratch_buffer,
608  const size_t key_component_count) {
609  if (for_semi_join) {
610  return write_baseline_hash_slot_for_semi_join<T>(entry_idx,
611  hash_buff,
612  entry_count,
613  key_scratch_buffer,
614  key_component_count,
615  with_val_slot,
616  invalid_slot_val,
617  key_size_in_bytes,
618  hash_entry_size);
619  } else {
620  return write_baseline_hash_slot<T>(entry_idx,
621  hash_buff,
622  entry_count,
623  key_scratch_buffer,
624  key_component_count,
625  with_val_slot,
626  invalid_slot_val,
627  key_size_in_bytes,
628  hash_entry_size);
629  }
630  };
631 
632  JoinColumnTuple cols(
633  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
634  for (auto& it : cols.slice(start, step)) {
635  const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
636  if (err) {
637  return err;
638  }
639  }
640  return 0;
641 }
642 
643 #undef mapd_cas
644 
645 #ifdef __CUDACC__
646 #define mapd_add(address, val) atomicAdd(address, val)
647 #elif defined(_MSC_VER)
648 #define mapd_add(address, val) \
649  InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
650  static_cast<long>(val))
651 #else
652 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
653 #endif
654 
655 template <typename SLOT_SELECTOR>
656 DEVICE void count_matches_impl(int32_t* count_buff,
657  const JoinColumn join_column,
658  const JoinColumnTypeInfo type_info
659 #ifndef __CUDACC__
660  ,
661  const int32_t* sd_inner_to_outer_translation_map,
662  const int32_t min_inner_elem,
663  const int32_t cpu_thread_idx,
664  const int32_t cpu_thread_count
665 #endif
666  ,
667  SLOT_SELECTOR slot_selector) {
668 #ifdef __CUDACC__
669  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
670  int32_t step = blockDim.x * gridDim.x;
671 #else
672  int32_t start = cpu_thread_idx;
673  int32_t step = cpu_thread_count;
674 #endif
675  JoinColumnTyped col{&join_column, &type_info};
676  for (auto item : col.slice(start, step)) {
677  int64_t elem = item.element;
678  if (elem == type_info.null_val) {
679  if (type_info.uses_bw_eq) {
680  elem = type_info.translated_null_val;
681  } else {
682  continue;
683  }
684  }
685 #ifndef __CUDACC__
686  if (sd_inner_to_outer_translation_map &&
687  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
688  const auto outer_id = map_str_id_to_outer_dict(elem,
689  min_inner_elem,
690  type_info.min_val,
691  type_info.max_val,
692  sd_inner_to_outer_translation_map);
693  if (outer_id == StringDictionary::INVALID_STR_ID) {
694  continue;
695  }
696  elem = outer_id;
697  }
698 #endif
699  auto* entry_ptr = slot_selector(count_buff, elem);
700  mapd_add(entry_ptr, int32_t(1));
701  }
702 }
703 
704 GLOBAL void SUFFIX(count_matches)(int32_t* count_buff,
705  const JoinColumn join_column,
706  const JoinColumnTypeInfo type_info
707 #ifndef __CUDACC__
708  ,
709  const int32_t* sd_inner_to_outer_translation_map,
710  const int32_t min_inner_elem,
711  const int32_t cpu_thread_idx,
712  const int32_t cpu_thread_count
713 #endif
714 ) {
715  auto slot_sel = [&type_info](auto count_buff, auto elem) {
716  return SUFFIX(get_hash_slot)(count_buff, elem, type_info.min_val);
717  };
718  count_matches_impl(count_buff,
719  join_column,
720  type_info
721 #ifndef __CUDACC__
722  ,
723  sd_inner_to_outer_translation_map,
724  min_inner_elem,
725  cpu_thread_idx,
726  cpu_thread_count
727 #endif
728  ,
729  slot_sel);
730 }
731 
733  int32_t* count_buff,
734  const JoinColumn join_column,
735  const JoinColumnTypeInfo type_info
736 #ifndef __CUDACC__
737  ,
738  const int32_t* sd_inner_to_outer_translation_map,
739  const int32_t min_inner_elem,
740  const int32_t cpu_thread_idx,
741  const int32_t cpu_thread_count
742 #endif
743  ,
744  const int64_t bucket_normalization) {
745  auto slot_sel = [bucket_normalization, &type_info](auto count_buff, auto elem) {
747  count_buff, elem, type_info.min_val, bucket_normalization);
748  };
749  count_matches_impl(count_buff,
750  join_column,
751  type_info
752 #ifndef __CUDACC__
753  ,
754  sd_inner_to_outer_translation_map,
755  min_inner_elem,
756  cpu_thread_idx,
757  cpu_thread_count
758 #endif
759  ,
760  slot_sel);
761 }
762 
764  int32_t* count_buff,
765  const JoinColumn join_column,
766  const JoinColumnTypeInfo type_info,
767  const ShardInfo shard_info
768 #ifndef __CUDACC__
769  ,
770  const int32_t* sd_inner_to_outer_translation_map,
771  const int32_t min_inner_elem,
772  const int32_t cpu_thread_idx,
773  const int32_t cpu_thread_count
774 #endif
775 ) {
776 #ifdef __CUDACC__
777  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
778  int32_t step = blockDim.x * gridDim.x;
779 #else
780  int32_t start = cpu_thread_idx;
781  int32_t step = cpu_thread_count;
782 #endif
783  JoinColumnTyped col{&join_column, &type_info};
784  for (auto item : col.slice(start, step)) {
785  int64_t elem = item.element;
786  if (elem == type_info.null_val) {
787  if (type_info.uses_bw_eq) {
788  elem = type_info.translated_null_val;
789  } else {
790  continue;
791  }
792  }
793 #ifndef __CUDACC__
794  if (sd_inner_to_outer_translation_map &&
795  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
796  const auto outer_id = map_str_id_to_outer_dict(elem,
797  min_inner_elem,
798  type_info.min_val,
799  type_info.max_val,
800  sd_inner_to_outer_translation_map);
801  if (outer_id == StringDictionary::INVALID_STR_ID) {
802  continue;
803  }
804  elem = outer_id;
805  }
806 #endif
807  int32_t* entry_ptr = SUFFIX(get_hash_slot_sharded)(count_buff,
808  elem,
809  type_info.min_val,
810  shard_info.entry_count_per_shard,
811  shard_info.num_shards,
812  shard_info.device_count);
813  mapd_add(entry_ptr, int32_t(1));
814  }
815 }
816 
817 template <typename T>
819  const T* key,
820  const size_t key_component_count,
821  const T* composite_key_dict,
822  const int64_t entry_count,
823  const size_t key_size_in_bytes) {
824  const uint32_t h = MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
825  uint32_t off = h * key_component_count;
826  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
827  return &composite_key_dict[off];
828  }
829  uint32_t h_probe = (h + 1) % entry_count;
830  while (h_probe != h) {
831  off = h_probe * key_component_count;
832  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
833  return &composite_key_dict[off];
834  }
835  h_probe = (h_probe + 1) % entry_count;
836  }
837 #ifndef __CUDACC__
838  CHECK(false);
839 #else
840  assert(false);
841 #endif
842  return nullptr;
843 }
844 
845 template <typename T, typename KEY_HANDLER>
846 GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff,
847  const T* composite_key_dict,
848  const int64_t entry_count,
849  const KEY_HANDLER* f,
850  const int64_t num_elems
851 #ifndef __CUDACC__
852  ,
853  const int32_t cpu_thread_idx,
854  const int32_t cpu_thread_count
855 #endif
856 ) {
857 #ifdef __CUDACC__
858  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
859  int32_t step = blockDim.x * gridDim.x;
860 #else
861  int32_t start = cpu_thread_idx;
862  int32_t step = cpu_thread_count;
863 #endif
864 #ifdef __CUDACC__
865  assert(composite_key_dict);
866 #endif
867  T key_scratch_buff[g_maximum_conditions_to_coalesce];
868  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
869  auto key_buff_handler = [composite_key_dict,
870  entry_count,
871  count_buff,
872  key_size_in_bytes](const int64_t row_entry_idx,
873  const T* key_scratch_buff,
874  const size_t key_component_count) {
875  const auto matching_group =
877  key_component_count,
878  composite_key_dict,
879  entry_count,
880  key_size_in_bytes);
881  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
882  mapd_add(&count_buff[entry_idx], int32_t(1));
883  return 0;
884  };
885 
886  JoinColumnTuple cols(
887  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
888  for (auto& it : cols.slice(start, step)) {
889  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
890  }
891 }
892 
893 template <typename SLOT_SELECTOR>
894 DEVICE void fill_row_ids_impl(int32_t* buff,
895  const int64_t hash_entry_count,
896  const JoinColumn join_column,
897  const JoinColumnTypeInfo type_info
898 #ifndef __CUDACC__
899  ,
900  const int32_t* sd_inner_to_outer_translation_map,
901  const int32_t min_inner_elem,
902  const int32_t cpu_thread_idx,
903  const int32_t cpu_thread_count
904 #endif
905  ,
906  SLOT_SELECTOR slot_selector) {
907  int32_t* pos_buff = buff;
908  int32_t* count_buff = buff + hash_entry_count;
909  int32_t* id_buff = count_buff + hash_entry_count;
910 
911 #ifdef __CUDACC__
912  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
913  int32_t step = blockDim.x * gridDim.x;
914 #else
915  int32_t start = cpu_thread_idx;
916  int32_t step = cpu_thread_count;
917 #endif
918  JoinColumnTyped col{&join_column, &type_info};
919  for (auto item : col.slice(start, step)) {
920  const size_t index = item.index;
921  int64_t elem = item.element;
922  if (elem == type_info.null_val) {
923  if (type_info.uses_bw_eq) {
924  elem = type_info.translated_null_val;
925  } else {
926  continue;
927  }
928  }
929 #ifndef __CUDACC__
930  if (sd_inner_to_outer_translation_map &&
931  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
932  const auto outer_id = map_str_id_to_outer_dict(elem,
933  min_inner_elem,
934  type_info.min_val,
935  type_info.max_val,
936  sd_inner_to_outer_translation_map);
937  if (outer_id == StringDictionary::INVALID_STR_ID) {
938  continue;
939  }
940  elem = outer_id;
941  }
942 #endif
943  auto pos_ptr = slot_selector(pos_buff, elem);
944  const auto bin_idx = pos_ptr - pos_buff;
945  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
946  id_buff[id_buff_idx] = static_cast<int32_t>(index);
947  }
948 }
949 
950 GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff,
951  const int64_t hash_entry_count,
952  const JoinColumn join_column,
953  const JoinColumnTypeInfo type_info
954 #ifndef __CUDACC__
955  ,
956  const int32_t* sd_inner_to_outer_translation_map,
957  const int32_t min_inner_elem,
958  const int32_t cpu_thread_idx,
959  const int32_t cpu_thread_count
960 #endif
961 ) {
962  auto slot_sel = [&type_info](auto pos_buff, auto elem) {
963  return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val);
964  };
965 
966  fill_row_ids_impl(buff,
967  hash_entry_count,
968  join_column,
969  type_info
970 #ifndef __CUDACC__
971  ,
972  sd_inner_to_outer_translation_map,
973  min_inner_elem,
974  cpu_thread_idx,
975  cpu_thread_count
976 #endif
977  ,
978  slot_sel);
979 }
980 
982  int32_t* buff,
983  const int64_t hash_entry_count,
984  const JoinColumn join_column,
985  const JoinColumnTypeInfo type_info
986 #ifndef __CUDACC__
987  ,
988  const int32_t* sd_inner_to_outer_translation_map,
989  const int32_t min_inner_elem,
990  const int32_t cpu_thread_idx,
991  const int32_t cpu_thread_count
992 #endif
993  ,
994  const int64_t bucket_normalization) {
995  auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) {
997  pos_buff, elem, type_info.min_val, bucket_normalization);
998  };
999  fill_row_ids_impl(buff,
1000  hash_entry_count,
1001  join_column,
1002  type_info
1003 #ifndef __CUDACC__
1004  ,
1005  sd_inner_to_outer_translation_map,
1006  min_inner_elem,
1007  cpu_thread_idx,
1008  cpu_thread_count
1009 #endif
1010  ,
1011  slot_sel);
1012 }
1013 
1014 template <typename SLOT_SELECTOR>
1016  const int64_t hash_entry_count,
1017  const JoinColumn join_column,
1018  const JoinColumnTypeInfo type_info,
1019  const ShardInfo shard_info
1020 #ifndef __CUDACC__
1021  ,
1022  const int32_t* sd_inner_to_outer_translation_map,
1023  const int32_t min_inner_elem,
1024  const int32_t cpu_thread_idx,
1025  const int32_t cpu_thread_count
1026 #endif
1027  ,
1028  SLOT_SELECTOR slot_selector) {
1029 
1030  int32_t* pos_buff = buff;
1031  int32_t* count_buff = buff + hash_entry_count;
1032  int32_t* id_buff = count_buff + hash_entry_count;
1033 
1034 #ifdef __CUDACC__
1035  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1036  int32_t step = blockDim.x * gridDim.x;
1037 #else
1038  int32_t start = cpu_thread_idx;
1039  int32_t step = cpu_thread_count;
1040 #endif
1041  JoinColumnTyped col{&join_column, &type_info};
1042  for (auto item : col.slice(start, step)) {
1043  const size_t index = item.index;
1044  int64_t elem = item.element;
1045  if (elem == type_info.null_val) {
1046  if (type_info.uses_bw_eq) {
1047  elem = type_info.translated_null_val;
1048  } else {
1049  continue;
1050  }
1051  }
1052 #ifndef __CUDACC__
1053  if (sd_inner_to_outer_translation_map &&
1054  (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
1055  const auto outer_id = map_str_id_to_outer_dict(elem,
1056  min_inner_elem,
1057  type_info.min_val,
1058  type_info.max_val,
1059  sd_inner_to_outer_translation_map);
1060  if (outer_id == StringDictionary::INVALID_STR_ID) {
1061  continue;
1062  }
1063  elem = outer_id;
1064  }
1065 #endif
1066  auto* pos_ptr = slot_selector(pos_buff, elem);
1067  const auto bin_idx = pos_ptr - pos_buff;
1068  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1069  id_buff[id_buff_idx] = static_cast<int32_t>(index);
1070  }
1071 }
1072 
1073 GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff,
1074  const int64_t hash_entry_count,
1075  const JoinColumn join_column,
1076  const JoinColumnTypeInfo type_info,
1077  const ShardInfo shard_info
1078 #ifndef __CUDACC__
1079  ,
1080  const int32_t* sd_inner_to_outer_translation_map,
1081  const int32_t min_inner_elem,
1082  const int32_t cpu_thread_idx,
1083  const int32_t cpu_thread_count
1084 #endif
1085 ) {
1086  auto slot_sel = [&type_info, &shard_info](auto pos_buff, auto elem) {
1087  return SUFFIX(get_hash_slot_sharded)(pos_buff,
1088  elem,
1089  type_info.min_val,
1090  shard_info.entry_count_per_shard,
1091  shard_info.num_shards,
1092  shard_info.device_count);
1093  };
1094 
1095  fill_row_ids_impl(buff,
1096  hash_entry_count,
1097  join_column,
1098  type_info
1099 #ifndef __CUDACC__
1100  ,
1101  sd_inner_to_outer_translation_map,
1102  min_inner_elem,
1103  cpu_thread_idx,
1104  cpu_thread_count
1105 #endif
1106  ,
1107  slot_sel);
1108 }
1109 
1111  int32_t* buff,
1112  const int64_t hash_entry_count,
1113  const JoinColumn join_column,
1114  const JoinColumnTypeInfo type_info,
1115  const ShardInfo shard_info
1116 #ifndef __CUDACC__
1117  ,
1118  const int32_t* sd_inner_to_outer_translation_map,
1119  const int32_t min_inner_elem,
1120  const int32_t cpu_thread_idx,
1121  const int32_t cpu_thread_count
1122 #endif
1123  ,
1124  const int64_t bucket_normalization) {
1125  auto slot_sel = [&shard_info, &type_info, bucket_normalization](auto pos_buff,
1126  auto elem) {
1127  return SUFFIX(get_bucketized_hash_slot_sharded)(pos_buff,
1128  elem,
1129  type_info.min_val,
1130  shard_info.entry_count_per_shard,
1131  shard_info.num_shards,
1132  shard_info.device_count,
1133  bucket_normalization);
1134  };
1135 
1136  fill_row_ids_impl(buff,
1137  hash_entry_count,
1138  join_column,
1139  type_info
1140 #ifndef __CUDACC__
1141  ,
1142  sd_inner_to_outer_translation_map,
1143  min_inner_elem,
1144  cpu_thread_idx,
1145  cpu_thread_count
1146 #endif
1147  ,
1148  slot_sel);
1149 }
1150 
1151 template <typename T, typename KEY_HANDLER>
1153  const T* composite_key_dict,
1154  const int64_t hash_entry_count,
1155  const KEY_HANDLER* f,
1156  const int64_t num_elems
1157 #ifndef __CUDACC__
1158  ,
1159  const int32_t cpu_thread_idx,
1160  const int32_t cpu_thread_count
1161 #endif
1162 ) {
1163  int32_t* pos_buff = buff;
1164  int32_t* count_buff = buff + hash_entry_count;
1165  int32_t* id_buff = count_buff + hash_entry_count;
1166 #ifdef __CUDACC__
1167  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1168  int32_t step = blockDim.x * gridDim.x;
1169 #else
1170  int32_t start = cpu_thread_idx;
1171  int32_t step = cpu_thread_count;
1172 #endif
1173 
1174  T key_scratch_buff[g_maximum_conditions_to_coalesce];
1175 #ifdef __CUDACC__
1176  assert(composite_key_dict);
1177 #endif
1178  const size_t key_size_in_bytes = f->get_key_component_count() * sizeof(T);
1179  auto key_buff_handler = [composite_key_dict,
1180  hash_entry_count,
1181  pos_buff,
1182  count_buff,
1183  id_buff,
1184  key_size_in_bytes](const int64_t row_index,
1185  const T* key_scratch_buff,
1186  const size_t key_component_count) {
1187  const T* matching_group =
1189  key_component_count,
1190  composite_key_dict,
1191  hash_entry_count,
1192  key_size_in_bytes);
1193  const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1194  int32_t* pos_ptr = pos_buff + entry_idx;
1195  const auto bin_idx = pos_ptr - pos_buff;
1196  const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1197  id_buff[id_buff_idx] = static_cast<int32_t>(row_index);
1198  return 0;
1199  };
1200 
1201  JoinColumnTuple cols(
1202  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1203  for (auto& it : cols.slice(start, step)) {
1204  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1205  }
1206  return;
1207 }
1208 
1209 #undef mapd_add
1210 
1211 template <typename KEY_HANDLER>
1213  int32_t* row_count_buffer,
1214  const uint32_t b,
1215  const int64_t num_elems,
1216  const KEY_HANDLER* f
1217 #ifndef __CUDACC__
1218  ,
1219  const int32_t cpu_thread_idx,
1220  const int32_t cpu_thread_count
1221 #endif
1222 ) {
1223 #ifdef __CUDACC__
1224  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1225  int32_t step = blockDim.x * gridDim.x;
1226 #else
1227  int32_t start = cpu_thread_idx;
1228  int32_t step = cpu_thread_count;
1229 #endif
1230 
1231  auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1232  const int64_t entry_idx,
1233  const int64_t* key_scratch_buff,
1234  const size_t key_component_count) {
1235  if (row_count_buffer) {
1236  row_count_buffer[entry_idx] += 1;
1237  }
1238 
1239  const uint64_t hash =
1240  MurmurHash64AImpl(key_scratch_buff, key_component_count * sizeof(int64_t), 0);
1241  const uint32_t index = hash >> (64 - b);
1242  const auto rank = get_rank(hash << b, 64 - b);
1243 #ifdef __CUDACC__
1244  atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1245 #else
1246  hll_buffer[index] = std::max(hll_buffer[index], rank);
1247 #endif
1248 
1249  return 0;
1250  };
1251 
1252  int64_t key_scratch_buff[g_maximum_conditions_to_coalesce];
1253 
1254  JoinColumnTuple cols(
1255  f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1256  for (auto& it : cols.slice(start, step)) {
1257  (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1258  }
1259 }
1260 
1261 #ifdef __CUDACC__
1262 namespace {
1263 // TODO(adb): put these in a header file so they are not duplicated between here and
1264 // cuda_mapd_rt.cu
1265 __device__ double atomicMin(double* address, double val) {
1266  unsigned long long int* address_as_ull = (unsigned long long int*)address;
1267  unsigned long long int old = *address_as_ull, assumed;
1268 
1269  do {
1270  assumed = old;
1271  old = atomicCAS(address_as_ull,
1272  assumed,
1273  __double_as_longlong(min(val, __longlong_as_double(assumed))));
1274  } while (assumed != old);
1275 
1276  return __longlong_as_double(old);
1277 }
1278 } // namespace
1279 #endif
1280 
1281 template <size_t N>
1282 GLOBAL void SUFFIX(compute_bucket_sizes_impl)(double* bucket_sizes_for_thread,
1283  const JoinColumn* join_column,
1284  const JoinColumnTypeInfo* type_info,
1285  const double* bucket_size_thresholds
1286 #ifndef __CUDACC__
1287  ,
1288  const int32_t cpu_thread_idx,
1289  const int32_t cpu_thread_count
1290 #endif
1291 ) {
1292 #ifdef __CUDACC__
1293  int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1294  int32_t step = blockDim.x * gridDim.x;
1295 #else
1296  int32_t start = cpu_thread_idx;
1297  int32_t step = cpu_thread_count;
1298 #endif
1299  JoinColumnIterator it(join_column, type_info, start, step);
1300  for (; it; ++it) {
1301  // We expect the bounds column to be (min, max) e.g. (x_min, y_min, x_max, y_max)
1302  double bounds[2 * N];
1303  for (size_t j = 0; j < 2 * N; j++) {
1304  bounds[j] = SUFFIX(fixed_width_double_decode_noinline)(it.ptr(), j);
1305  }
1306 
1307  for (size_t j = 0; j < N; j++) {
1308  const auto diff = bounds[j + N] - bounds[j];
1309 #ifdef __CUDACC__
1310  if (diff > bucket_size_thresholds[j]) {
1311  atomicMin(&bucket_sizes_for_thread[j], diff);
1312  }
1313 #else
1314  if (diff < bucket_size_thresholds[j] && diff > bucket_sizes_for_thread[j]) {
1315  bucket_sizes_for_thread[j] = diff;
1316  }
1317 #endif
1318  }
1319  }
1320 }
1321 
1322 #ifndef __CUDACC__
1323 
1324 template <typename InputIterator, typename OutputIterator>
1325 void inclusive_scan(InputIterator first,
1326  InputIterator last,
1327  OutputIterator out,
1328  const size_t thread_count) {
1329  using ElementType = typename InputIterator::value_type;
1330  using OffsetType = typename InputIterator::difference_type;
1331  const OffsetType elem_count = last - first;
1332  if (elem_count < 10000 || thread_count <= 1) {
1333  ElementType sum = 0;
1334  for (auto iter = first; iter != last; ++iter, ++out) {
1335  *out = sum += *iter;
1336  }
1337  return;
1338  }
1339 
1340  const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1341  OffsetType start_off = 0;
1342  OffsetType end_off = std::min(step, elem_count);
1343  std::vector<ElementType> partial_sums(thread_count);
1344  std::vector<std::future<void>> counter_threads;
1345  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1346  start_off = std::min(start_off + step, elem_count),
1347  end_off = std::min(start_off + step, elem_count)) {
1348  counter_threads.push_back(std::async(
1350  [first, out](
1351  ElementType& partial_sum, const OffsetType start, const OffsetType end) {
1352  ElementType sum = 0;
1353  for (auto in_iter = first + start, out_iter = out + start;
1354  in_iter != (first + end);
1355  ++in_iter, ++out_iter) {
1356  *out_iter = sum += *in_iter;
1357  }
1358  partial_sum = sum;
1359  },
1360  std::ref(partial_sums[thread_idx]),
1361  start_off,
1362  end_off));
1363  }
1364  for (auto& child : counter_threads) {
1365  child.get();
1366  }
1367 
1368  ElementType sum = 0;
1369  for (auto& s : partial_sums) {
1370  s += sum;
1371  sum = s;
1372  }
1373 
1374  counter_threads.clear();
1375  start_off = std::min(step, elem_count);
1376  end_off = std::min(start_off + step, elem_count);
1377  for (size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1378  start_off = std::min(start_off + step, elem_count),
1379  end_off = std::min(start_off + step, elem_count)) {
1380  counter_threads.push_back(std::async(
1382  [out](const ElementType prev_sum, const OffsetType start, const OffsetType end) {
1383  for (auto iter = out + start; iter != (out + end); ++iter) {
1384  *iter += prev_sum;
1385  }
1386  },
1387  partial_sums[thread_idx],
1388  start_off,
1389  end_off));
1390  }
1391  for (auto& child : counter_threads) {
1392  child.get();
1393  }
1394 }
1395 
1396 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1398  const int64_t hash_entry_count,
1399  const JoinColumn& join_column,
1400  const JoinColumnTypeInfo& type_info,
1401  const int32_t* sd_inner_to_outer_translation_map,
1402  const int32_t min_inner_elem,
1403  const unsigned cpu_thread_count,
1404  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1405  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1406  auto timer = DEBUG_TIMER(__func__);
1407  int32_t* pos_buff = buff;
1408  int32_t* count_buff = buff + hash_entry_count;
1409  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1410  std::vector<std::future<void>> counter_threads;
1411  for (unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1412  counter_threads.push_back(std::async(
1413  std::launch::async, count_matches_func, cpu_thread_idx, cpu_thread_count));
1414  }
1415 
1416  for (auto& child : counter_threads) {
1417  child.get();
1418  }
1419 
1420  std::vector<int32_t> count_copy(hash_entry_count, 0);
1421  CHECK_GT(hash_entry_count, int64_t(0));
1422  memcpy(count_copy.data() + 1, count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1423 #if HAVE_CUDA
1424  thrust::inclusive_scan(count_copy.begin(), count_copy.end(), count_copy.begin());
1425 #else
1427  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1428 #endif
1429  std::vector<std::future<void>> pos_threads;
1430  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1431  pos_threads.push_back(std::async(
1433  [&](size_t thread_idx) {
1434  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1435  if (count_buff[i]) {
1436  pos_buff[i] = count_copy[i];
1437  }
1438  }
1439  },
1440  cpu_thread_idx));
1441  }
1442  for (auto& child : pos_threads) {
1443  child.get();
1444  }
1445 
1446  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1447  std::vector<std::future<void>> rowid_threads;
1448  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1449  rowid_threads.push_back(std::async(
1450  std::launch::async, fill_row_ids_func, cpu_thread_idx, cpu_thread_count));
1451  }
1452 
1453  for (auto& child : rowid_threads) {
1454  child.get();
1455  }
1456 }
1457 
1458 void fill_one_to_many_hash_table(int32_t* buff,
1459  const HashEntryInfo hash_entry_info,
1460  const JoinColumn& join_column,
1461  const JoinColumnTypeInfo& type_info,
1462  const int32_t* sd_inner_to_outer_translation_map,
1463  const int32_t min_inner_elem,
1464  const unsigned cpu_thread_count) {
1465  auto timer = DEBUG_TIMER(__func__);
1466  auto launch_count_matches = [count_buff = buff + hash_entry_info.hash_entry_count,
1467  &join_column,
1468  &type_info,
1469  sd_inner_to_outer_translation_map,
1470  min_inner_elem](auto cpu_thread_idx,
1471  auto cpu_thread_count) {
1473  (count_buff,
1474  join_column,
1475  type_info,
1476  sd_inner_to_outer_translation_map,
1477  min_inner_elem,
1478  cpu_thread_idx,
1479  cpu_thread_count);
1480  };
1481  auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count,
1482  buff,
1483  &join_column,
1484  &type_info,
1485  sd_inner_to_outer_translation_map,
1486  min_inner_elem](auto cpu_thread_idx,
1487  auto cpu_thread_count) {
1489  (buff,
1490  hash_entry_count,
1491  join_column,
1492  type_info,
1493  sd_inner_to_outer_translation_map,
1494  min_inner_elem,
1495  cpu_thread_idx,
1496  cpu_thread_count);
1497  };
1498 
1500  hash_entry_info.hash_entry_count,
1501  join_column,
1502  type_info,
1503  sd_inner_to_outer_translation_map,
1504  min_inner_elem,
1505  cpu_thread_count,
1506  launch_count_matches,
1507  launch_fill_row_ids);
1508 }
1509 
1511  int32_t* buff,
1512  const HashEntryInfo hash_entry_info,
1513  const JoinColumn& join_column,
1514  const JoinColumnTypeInfo& type_info,
1515  const int32_t* sd_inner_to_outer_translation_map,
1516  const int32_t min_inner_elem,
1517  const unsigned cpu_thread_count) {
1518  auto timer = DEBUG_TIMER(__func__);
1519  auto bucket_normalization = hash_entry_info.bucket_normalization;
1520  auto hash_entry_count = hash_entry_info.getNormalizedHashEntryCount();
1521  auto launch_count_matches = [bucket_normalization,
1522  count_buff = buff + hash_entry_count,
1523  &join_column,
1524  &type_info,
1525  sd_inner_to_outer_translation_map,
1526  min_inner_elem](auto cpu_thread_idx,
1527  auto cpu_thread_count) {
1529  (count_buff,
1530  join_column,
1531  type_info,
1532  sd_inner_to_outer_translation_map,
1533  min_inner_elem,
1534  cpu_thread_idx,
1535  cpu_thread_count,
1536  bucket_normalization);
1537  };
1538  auto launch_fill_row_ids = [bucket_normalization,
1539  hash_entry_count,
1540  buff,
1541  &join_column,
1542  &type_info,
1543  sd_inner_to_outer_translation_map,
1544  min_inner_elem](auto cpu_thread_idx,
1545  auto cpu_thread_count) {
1547  (buff,
1548  hash_entry_count,
1549  join_column,
1550  type_info,
1551  sd_inner_to_outer_translation_map,
1552  min_inner_elem,
1553  cpu_thread_idx,
1554  cpu_thread_count,
1555  bucket_normalization);
1556  };
1557 
1559  hash_entry_count,
1560  join_column,
1561  type_info,
1562  sd_inner_to_outer_translation_map,
1563  min_inner_elem,
1564  cpu_thread_count,
1565  launch_count_matches,
1566  launch_fill_row_ids);
1567 }
1568 
1569 template <typename COUNT_MATCHES_LAUNCH_FUNCTOR, typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1571  int32_t* buff,
1572  const int64_t hash_entry_count,
1573  const JoinColumn& join_column,
1574  const JoinColumnTypeInfo& type_info,
1575  const ShardInfo& shard_info,
1576  const int32_t* sd_inner_to_outer_translation_map,
1577  const int32_t min_inner_elem,
1578  const unsigned cpu_thread_count,
1579  COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1580  FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1581  auto timer = DEBUG_TIMER(__func__);
1582  int32_t* pos_buff = buff;
1583  int32_t* count_buff = buff + hash_entry_count;
1584  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1585  std::vector<std::future<void>> counter_threads;
1586  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1587  counter_threads.push_back(std::async(
1588  std::launch::async, count_matches_launcher, cpu_thread_idx, cpu_thread_count));
1589  }
1590 
1591  for (auto& child : counter_threads) {
1592  child.get();
1593  }
1594 
1595  std::vector<int32_t> count_copy(hash_entry_count, 0);
1596  CHECK_GT(hash_entry_count, int64_t(0));
1597  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1599  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1600  std::vector<std::future<void>> pos_threads;
1601  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1602  pos_threads.push_back(std::async(
1604  [&](const unsigned thread_idx) {
1605  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1606  if (count_buff[i]) {
1607  pos_buff[i] = count_copy[i];
1608  }
1609  }
1610  },
1611  cpu_thread_idx));
1612  }
1613  for (auto& child : pos_threads) {
1614  child.get();
1615  }
1616 
1617  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1618  std::vector<std::future<void>> rowid_threads;
1619  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1620  rowid_threads.push_back(std::async(
1621  std::launch::async, fill_row_ids_launcher, cpu_thread_idx, cpu_thread_count));
1622  }
1623 
1624  for (auto& child : rowid_threads) {
1625  child.get();
1626  }
1627 }
1628 
1630  const int64_t hash_entry_count,
1631  const JoinColumn& join_column,
1632  const JoinColumnTypeInfo& type_info,
1633  const ShardInfo& shard_info,
1634  const int32_t* sd_inner_to_outer_translation_map,
1635  const int32_t min_inner_elem,
1636  const unsigned cpu_thread_count) {
1637  auto launch_count_matches = [count_buff = buff + hash_entry_count,
1638  &join_column,
1639  &type_info,
1640  &shard_info
1641 #ifndef __CUDACC__
1642  ,
1643  sd_inner_to_outer_translation_map,
1644  min_inner_elem
1645 #endif
1646  ](auto cpu_thread_idx, auto cpu_thread_count) {
1647  return SUFFIX(count_matches_sharded)(count_buff,
1648  join_column,
1649  type_info,
1650  shard_info
1651 #ifndef __CUDACC__
1652  ,
1653  sd_inner_to_outer_translation_map,
1654  min_inner_elem,
1655  cpu_thread_idx,
1656  cpu_thread_count
1657 #endif
1658  );
1659  };
1660 
1661  auto launch_fill_row_ids = [buff,
1662  hash_entry_count,
1663  &join_column,
1664  &type_info,
1665  &shard_info
1666 #ifndef __CUDACC__
1667  ,
1668  sd_inner_to_outer_translation_map,
1669  min_inner_elem
1670 #endif
1671  ](auto cpu_thread_idx, auto cpu_thread_count) {
1672  return SUFFIX(fill_row_ids_sharded)(buff,
1673  hash_entry_count,
1674  join_column,
1675  type_info,
1676  shard_info
1677 #ifndef __CUDACC__
1678  ,
1679  sd_inner_to_outer_translation_map,
1680  min_inner_elem,
1681  cpu_thread_idx,
1682  cpu_thread_count);
1683 #endif
1684  };
1685 
1687  hash_entry_count,
1688  join_column,
1689  type_info,
1690  shard_info
1691 #ifndef __CUDACC__
1692  ,
1693  sd_inner_to_outer_translation_map,
1694  min_inner_elem,
1695  cpu_thread_count
1696 #endif
1697  ,
1698  launch_count_matches,
1699  launch_fill_row_ids);
1700 }
1701 
1702 void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
1703  const int64_t entry_count,
1704  const size_t key_component_count,
1705  const bool with_val_slot,
1706  const int32_t invalid_slot_val,
1707  const int32_t cpu_thread_idx,
1708  const int32_t cpu_thread_count) {
1709  init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1710  entry_count,
1711  key_component_count,
1712  with_val_slot,
1713  invalid_slot_val,
1714  cpu_thread_idx,
1715  cpu_thread_count);
1716 }
1717 
1718 void init_baseline_hash_join_buff_64(int8_t* hash_join_buff,
1719  const int64_t entry_count,
1720  const size_t key_component_count,
1721  const bool with_val_slot,
1722  const int32_t invalid_slot_val,
1723  const int32_t cpu_thread_idx,
1724  const int32_t cpu_thread_count) {
1725  init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1726  entry_count,
1727  key_component_count,
1728  with_val_slot,
1729  invalid_slot_val,
1730  cpu_thread_idx,
1731  cpu_thread_count);
1732 }
1733 
1734 #ifndef __CUDACC__
1735 #ifdef HAVE_TBB
1736 
1737 void init_baseline_hash_join_buff_tbb_32(int8_t* hash_join_buff,
1738  const int64_t entry_count,
1739  const size_t key_component_count,
1740  const bool with_val_slot,
1741  const int32_t invalid_slot_val) {
1742  init_baseline_hash_join_buff_tbb<int32_t>(
1743  hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1744 }
1745 
1746 void init_baseline_hash_join_buff_tbb_64(int8_t* hash_join_buff,
1747  const int64_t entry_count,
1748  const size_t key_component_count,
1749  const bool with_val_slot,
1750  const int32_t invalid_slot_val) {
1751  init_baseline_hash_join_buff_tbb<int64_t>(
1752  hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1753 }
1754 
1755 #endif // #ifdef HAVE_TBB
1756 #endif // #ifndef __CUDACC__
1757 
1758 int fill_baseline_hash_join_buff_32(int8_t* hash_buff,
1759  const int64_t entry_count,
1760  const int32_t invalid_slot_val,
1761  const bool for_semi_join,
1762  const size_t key_component_count,
1763  const bool with_val_slot,
1764  const GenericKeyHandler* key_handler,
1765  const int64_t num_elems,
1766  const int32_t cpu_thread_idx,
1767  const int32_t cpu_thread_count) {
1768  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1769  entry_count,
1770  invalid_slot_val,
1771  for_semi_join,
1772  key_component_count,
1773  with_val_slot,
1774  key_handler,
1775  num_elems,
1776  cpu_thread_idx,
1777  cpu_thread_count);
1778 }
1779 
1781  const int64_t entry_count,
1782  const int32_t invalid_slot_val,
1783  const size_t key_component_count,
1784  const bool with_val_slot,
1785  const OverlapsKeyHandler* key_handler,
1786  const int64_t num_elems,
1787  const int32_t cpu_thread_idx,
1788  const int32_t cpu_thread_count) {
1789  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1790  entry_count,
1791  invalid_slot_val,
1792  false,
1793  key_component_count,
1794  with_val_slot,
1795  key_handler,
1796  num_elems,
1797  cpu_thread_idx,
1798  cpu_thread_count);
1799 }
1800 
1802  const size_t entry_count,
1803  const int32_t invalid_slot_val,
1804  const size_t key_component_count,
1805  const bool with_val_slot,
1806  const RangeKeyHandler* key_handler,
1807  const size_t num_elems,
1808  const int32_t cpu_thread_idx,
1809  const int32_t cpu_thread_count) {
1810  return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1811  entry_count,
1812  invalid_slot_val,
1813  false,
1814  key_component_count,
1815  with_val_slot,
1816  key_handler,
1817  num_elems,
1818  cpu_thread_idx,
1819  cpu_thread_count);
1820 }
1821 
1822 int fill_baseline_hash_join_buff_64(int8_t* hash_buff,
1823  const int64_t entry_count,
1824  const int32_t invalid_slot_val,
1825  const bool for_semi_join,
1826  const size_t key_component_count,
1827  const bool with_val_slot,
1828  const GenericKeyHandler* key_handler,
1829  const int64_t num_elems,
1830  const int32_t cpu_thread_idx,
1831  const int32_t cpu_thread_count) {
1832  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1833  entry_count,
1834  invalid_slot_val,
1835  for_semi_join,
1836  key_component_count,
1837  with_val_slot,
1838  key_handler,
1839  num_elems,
1840  cpu_thread_idx,
1841  cpu_thread_count);
1842 }
1843 
1845  const int64_t entry_count,
1846  const int32_t invalid_slot_val,
1847  const size_t key_component_count,
1848  const bool with_val_slot,
1849  const OverlapsKeyHandler* key_handler,
1850  const int64_t num_elems,
1851  const int32_t cpu_thread_idx,
1852  const int32_t cpu_thread_count) {
1853  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1854  entry_count,
1855  invalid_slot_val,
1856  false,
1857  key_component_count,
1858  with_val_slot,
1859  key_handler,
1860  num_elems,
1861  cpu_thread_idx,
1862  cpu_thread_count);
1863 }
1864 
1866  const size_t entry_count,
1867  const int32_t invalid_slot_val,
1868  const size_t key_component_count,
1869  const bool with_val_slot,
1870  const RangeKeyHandler* key_handler,
1871  const size_t num_elems,
1872  const int32_t cpu_thread_idx,
1873  const int32_t cpu_thread_count) {
1874  return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1875  entry_count,
1876  invalid_slot_val,
1877  false,
1878  key_component_count,
1879  with_val_slot,
1880  key_handler,
1881  num_elems,
1882  cpu_thread_idx,
1883  cpu_thread_count);
1884 }
1885 
1886 template <typename T>
1888  int32_t* buff,
1889  const T* composite_key_dict,
1890  const int64_t hash_entry_count,
1891  const size_t key_component_count,
1892  const std::vector<JoinColumn>& join_column_per_key,
1893  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1894  const std::vector<JoinBucketInfo>& join_buckets_per_key,
1895  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
1896  const std::vector<int32_t>& sd_min_inner_elems,
1897  const size_t cpu_thread_count,
1898  const bool is_range_join,
1899  const bool is_geo_compressed) {
1900  int32_t* pos_buff = buff;
1901  int32_t* count_buff = buff + hash_entry_count;
1902  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
1903  std::vector<std::future<void>> counter_threads;
1904  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1905  if (is_range_join) {
1906  counter_threads.push_back(std::async(
1908  [count_buff,
1909  composite_key_dict,
1910  &hash_entry_count,
1911  &join_buckets_per_key,
1912  &join_column_per_key,
1913  &is_geo_compressed,
1914  cpu_thread_idx,
1915  cpu_thread_count] {
1916  const auto key_handler = RangeKeyHandler(
1917  is_geo_compressed,
1918  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1919  &join_column_per_key[0],
1920  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1921  count_matches_baseline(count_buff,
1922  composite_key_dict,
1923  hash_entry_count,
1924  &key_handler,
1925  join_column_per_key[0].num_elems,
1926  cpu_thread_idx,
1927  cpu_thread_count);
1928  }));
1929  } else if (join_buckets_per_key.size() > 0) {
1930  counter_threads.push_back(std::async(
1932  [count_buff,
1933  composite_key_dict,
1934  &hash_entry_count,
1935  &join_buckets_per_key,
1936  &join_column_per_key,
1937  cpu_thread_idx,
1938  cpu_thread_count] {
1939  const auto key_handler = OverlapsKeyHandler(
1940  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1941  &join_column_per_key[0],
1942  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1943  count_matches_baseline(count_buff,
1944  composite_key_dict,
1945  hash_entry_count,
1946  &key_handler,
1947  join_column_per_key[0].num_elems,
1948  cpu_thread_idx,
1949  cpu_thread_count);
1950  }));
1951  } else {
1952  counter_threads.push_back(
1954  [count_buff,
1955  composite_key_dict,
1956  &key_component_count,
1957  &hash_entry_count,
1958  &join_column_per_key,
1959  &type_info_per_key,
1960  &sd_inner_to_outer_translation_maps,
1961  &sd_min_inner_elems,
1962  cpu_thread_idx,
1963  cpu_thread_count] {
1964  const auto key_handler =
1965  GenericKeyHandler(key_component_count,
1966  true,
1967  &join_column_per_key[0],
1968  &type_info_per_key[0],
1969  &sd_inner_to_outer_translation_maps[0],
1970  &sd_min_inner_elems[0]);
1971  count_matches_baseline(count_buff,
1972  composite_key_dict,
1973  hash_entry_count,
1974  &key_handler,
1975  join_column_per_key[0].num_elems,
1976  cpu_thread_idx,
1977  cpu_thread_count);
1978  }));
1979  }
1980  }
1981 
1982  for (auto& child : counter_threads) {
1983  child.get();
1984  }
1985 
1986  std::vector<int32_t> count_copy(hash_entry_count, 0);
1987  CHECK_GT(hash_entry_count, int64_t(0));
1988  memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) * sizeof(int32_t));
1990  count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1991  std::vector<std::future<void>> pos_threads;
1992  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1993  pos_threads.push_back(std::async(
1995  [&](const int thread_idx) {
1996  for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1997  if (count_buff[i]) {
1998  pos_buff[i] = count_copy[i];
1999  }
2000  }
2001  },
2002  cpu_thread_idx));
2003  }
2004  for (auto& child : pos_threads) {
2005  child.get();
2006  }
2007 
2008  memset(count_buff, 0, hash_entry_count * sizeof(int32_t));
2009  std::vector<std::future<void>> rowid_threads;
2010  for (size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2011  if (is_range_join) {
2012  rowid_threads.push_back(std::async(
2014  [buff,
2015  composite_key_dict,
2016  hash_entry_count,
2017  &join_column_per_key,
2018  &join_buckets_per_key,
2019  &is_geo_compressed,
2020  cpu_thread_idx,
2021  cpu_thread_count] {
2022  const auto key_handler = RangeKeyHandler(
2023  is_geo_compressed,
2024  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2025  &join_column_per_key[0],
2026  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2028  (buff,
2029  composite_key_dict,
2030  hash_entry_count,
2031  &key_handler,
2032  join_column_per_key[0].num_elems,
2033  cpu_thread_idx,
2034  cpu_thread_count);
2035  }));
2036  } else if (join_buckets_per_key.size() > 0) {
2037  rowid_threads.push_back(std::async(
2039  [buff,
2040  composite_key_dict,
2041  hash_entry_count,
2042  &join_column_per_key,
2043  &join_buckets_per_key,
2044  cpu_thread_idx,
2045  cpu_thread_count] {
2046  const auto key_handler = OverlapsKeyHandler(
2047  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2048  &join_column_per_key[0],
2049  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2051  (buff,
2052  composite_key_dict,
2053  hash_entry_count,
2054  &key_handler,
2055  join_column_per_key[0].num_elems,
2056  cpu_thread_idx,
2057  cpu_thread_count);
2058  }));
2059  } else {
2060  rowid_threads.push_back(std::async(std::launch::async,
2061  [buff,
2062  composite_key_dict,
2063  hash_entry_count,
2064  key_component_count,
2065  &join_column_per_key,
2066  &type_info_per_key,
2067  &sd_inner_to_outer_translation_maps,
2068  &sd_min_inner_elems,
2069  cpu_thread_idx,
2070  cpu_thread_count] {
2071  const auto key_handler = GenericKeyHandler(
2072  key_component_count,
2073  true,
2074  &join_column_per_key[0],
2075  &type_info_per_key[0],
2076  &sd_inner_to_outer_translation_maps[0],
2077  &sd_min_inner_elems[0]);
2079  (buff,
2080  composite_key_dict,
2081  hash_entry_count,
2082  &key_handler,
2083  join_column_per_key[0].num_elems,
2084  cpu_thread_idx,
2085  cpu_thread_count);
2086  }));
2087  }
2088  }
2089 
2090  for (auto& child : rowid_threads) {
2091  child.get();
2092  }
2093 }
2094 
2096  int32_t* buff,
2097  const int32_t* composite_key_dict,
2098  const int64_t hash_entry_count,
2099  const size_t key_component_count,
2100  const std::vector<JoinColumn>& join_column_per_key,
2101  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2102  const std::vector<JoinBucketInfo>& join_bucket_info,
2103  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2104  const std::vector<int32_t>& sd_min_inner_elems,
2105  const int32_t cpu_thread_count,
2106  const bool is_range_join,
2107  const bool is_geo_compressed) {
2108  fill_one_to_many_baseline_hash_table<int32_t>(buff,
2109  composite_key_dict,
2110  hash_entry_count,
2111  key_component_count,
2112  join_column_per_key,
2113  type_info_per_key,
2114  join_bucket_info,
2115  sd_inner_to_outer_translation_maps,
2116  sd_min_inner_elems,
2117  cpu_thread_count,
2118  is_range_join,
2119  is_geo_compressed);
2120 }
2121 
2123  int32_t* buff,
2124  const int64_t* composite_key_dict,
2125  const int64_t hash_entry_count,
2126  const size_t key_component_count,
2127  const std::vector<JoinColumn>& join_column_per_key,
2128  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2129  const std::vector<JoinBucketInfo>& join_bucket_info,
2130  const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2131  const std::vector<int32_t>& sd_min_inner_elems,
2132  const int32_t cpu_thread_count,
2133  const bool is_range_join,
2134  const bool is_geo_compressed) {
2135  fill_one_to_many_baseline_hash_table<int64_t>(buff,
2136  composite_key_dict,
2137  hash_entry_count,
2138  key_component_count,
2139  join_column_per_key,
2140  type_info_per_key,
2141  join_bucket_info,
2142  sd_inner_to_outer_translation_maps,
2143  sd_min_inner_elems,
2144  cpu_thread_count,
2145  is_range_join,
2146  is_geo_compressed);
2147 }
2148 
2149 void approximate_distinct_tuples(uint8_t* hll_buffer_all_cpus,
2150  const uint32_t b,
2151  const size_t padded_size_bytes,
2152  const std::vector<JoinColumn>& join_column_per_key,
2153  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2154  const int thread_count) {
2155  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2156  CHECK(!join_column_per_key.empty());
2157 
2158  std::vector<std::future<void>> approx_distinct_threads;
2159  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2160  approx_distinct_threads.push_back(std::async(
2162  [&join_column_per_key,
2163  &type_info_per_key,
2164  b,
2165  hll_buffer_all_cpus,
2166  padded_size_bytes,
2167  thread_idx,
2168  thread_count] {
2169  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2170 
2171  const auto key_handler = GenericKeyHandler(join_column_per_key.size(),
2172  false,
2173  &join_column_per_key[0],
2174  &type_info_per_key[0],
2175  nullptr,
2176  nullptr);
2178  nullptr,
2179  b,
2180  join_column_per_key[0].num_elems,
2181  &key_handler,
2182  thread_idx,
2183  thread_count);
2184  }));
2185  }
2186  for (auto& child : approx_distinct_threads) {
2187  child.get();
2188  }
2189 }
2190 
2192  uint8_t* hll_buffer_all_cpus,
2193  std::vector<int32_t>& row_counts,
2194  const uint32_t b,
2195  const size_t padded_size_bytes,
2196  const std::vector<JoinColumn>& join_column_per_key,
2197  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2198  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2199  const int thread_count) {
2200  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2201  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2202  CHECK(!join_column_per_key.empty());
2203 
2204  std::vector<std::future<void>> approx_distinct_threads;
2205  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2206  approx_distinct_threads.push_back(std::async(
2208  [&join_column_per_key,
2209  &join_buckets_per_key,
2210  &row_counts,
2211  b,
2212  hll_buffer_all_cpus,
2213  padded_size_bytes,
2214  thread_idx,
2215  thread_count] {
2216  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2217 
2218  const auto key_handler = OverlapsKeyHandler(
2219  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2220  &join_column_per_key[0],
2221  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2223  row_counts.data(),
2224  b,
2225  join_column_per_key[0].num_elems,
2226  &key_handler,
2227  thread_idx,
2228  thread_count);
2229  }));
2230  }
2231  for (auto& child : approx_distinct_threads) {
2232  child.get();
2233  }
2234 
2236  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2237 }
2238 
2240  uint8_t* hll_buffer_all_cpus,
2241  std::vector<int32_t>& row_counts,
2242  const uint32_t b,
2243  const size_t padded_size_bytes,
2244  const std::vector<JoinColumn>& join_column_per_key,
2245  const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2246  const std::vector<JoinBucketInfo>& join_buckets_per_key,
2247  const bool is_compressed,
2248  const int thread_count) {
2249  CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2250  CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2251  CHECK(!join_column_per_key.empty());
2252 
2253  std::vector<std::future<void>> approx_distinct_threads;
2254  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2255  approx_distinct_threads.push_back(std::async(
2257  [&join_column_per_key,
2258  &join_buckets_per_key,
2259  &row_counts,
2260  b,
2261  hll_buffer_all_cpus,
2262  padded_size_bytes,
2263  thread_idx,
2264  is_compressed,
2265  thread_count] {
2266  auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2267 
2268  const auto key_handler = RangeKeyHandler(
2269  is_compressed,
2270  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2271  &join_column_per_key[0],
2272  join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2274  row_counts.data(),
2275  b,
2276  join_column_per_key[0].num_elems,
2277  &key_handler,
2278  thread_idx,
2279  thread_count);
2280  }));
2281  }
2282  for (auto& child : approx_distinct_threads) {
2283  child.get();
2284  }
2285 
2287  row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2288 }
2289 
2290 void compute_bucket_sizes_on_cpu(std::vector<double>& bucket_sizes_for_dimension,
2291  const JoinColumn& join_column,
2292  const JoinColumnTypeInfo& type_info,
2293  const std::vector<double>& bucket_size_thresholds,
2294  const int thread_count) {
2295  std::vector<std::vector<double>> bucket_sizes_for_threads;
2296  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2297  bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(), 0.0);
2298  }
2299  std::vector<std::future<void>> threads;
2300  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2301  threads.push_back(std::async(std::launch::async,
2302  compute_bucket_sizes_impl<2>,
2303  bucket_sizes_for_threads[thread_idx].data(),
2304  &join_column,
2305  &type_info,
2306  bucket_size_thresholds.data(),
2307  thread_idx,
2308  thread_count));
2309  }
2310  for (auto& child : threads) {
2311  child.get();
2312  }
2313 
2314  for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2315  for (size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2316  if (bucket_sizes_for_threads[thread_idx][i] > bucket_sizes_for_dimension[i]) {
2317  bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2318  }
2319  }
2320  }
2321 }
2322 
2323 #endif // ifndef __CUDACC__
const size_t num_shards
#define CHECK_EQ(x, y)
Definition: Logger.h:231
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:67
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:110
__device__ double atomicMin(double *address, double val)
#define GLOBAL
#define SUFFIX(name)
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:55
const int64_t null_val
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define CHECK_GT(x, y)
Definition: Logger.h:235
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define load_cst(ptr)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
Definition: MurmurHash1Inl.h:6
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
constexpr double f
Definition: Utm.h:31
#define DEVICE
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
Definition: JoinHashImpl.h:75
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
future< Result > async(Fn &&fn, Args &&...args)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
Definition: JoinHashImpl.h:81
static constexpr int32_t INVALID_STR_ID
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
DEVICE T SUFFIX() get_invalid_key()
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define mapd_cas(address, compare, val)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const size_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
int64_t map_str_id_to_outer_dict(const int64_t inner_elem, const int64_t min_inner_elem, const int64_t min_outer_elem, const int64_t max_outer_elem, const int32_t *inner_to_outer_translation_map)
int64_t bucket_normalization
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t shard
size_t hash_entry_count
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t max_val
#define UNLIKELY(x)
Definition: likely.h:25
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:125
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const int64_t min_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
Definition: JoinHashImpl.h:96
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
#define NEVER_INLINE
size_t getNormalizedHashEntryCount() const
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define store_cst(ptr, val)
constexpr unsigned N
Definition: Utm.h:110
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
Definition: JoinHashImpl.h:45
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
Definition: DecodersImpl.h:134
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE int write_baseline_hash_slot_for_semi_join(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
__device__ double atomicMax(double *address, double val)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t g_maximum_conditions_to_coalesce
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
DEVICE void count_matches_impl(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)