OmniSciDB  467d548b97
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTableQueryRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <math.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 
23 #include "QueryEngine/MurmurHash.h"
24 
25 DEVICE bool compare_to_key(const int8_t* entry,
26  const int8_t* key,
27  const size_t key_bytes) {
28  for (size_t i = 0; i < key_bytes; ++i) {
29  if (entry[i] != key[i]) {
30  return false;
31  }
32  }
33  return true;
34 }
35 
36 namespace {
37 
38 const int kNoMatch = -1;
39 const int kNotPresent = -2;
40 
41 } // namespace
42 
43 template <class T>
44 DEVICE int64_t get_matching_slot(const int8_t* hash_buff,
45  const uint32_t h,
46  const int8_t* key,
47  const size_t key_bytes) {
48  const auto lookup_result_ptr = hash_buff + h * (key_bytes + sizeof(T));
49  if (compare_to_key(lookup_result_ptr, key, key_bytes)) {
50  return *reinterpret_cast<const T*>(lookup_result_ptr + key_bytes);
51  }
52  if (*reinterpret_cast<const T*>(lookup_result_ptr) ==
53  SUFFIX(get_invalid_key) < T > ()) {
54  return kNotPresent;
55  }
56  return kNoMatch;
57 }
58 
59 template <class T>
60 FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t* hash_buff,
61  const int8_t* key,
62  const size_t key_bytes,
63  const size_t entry_count) {
64  if (!entry_count) {
65  return kNoMatch;
66  }
67  const uint32_t h = MurmurHash1(key, key_bytes, 0) % entry_count;
68  int64_t matching_slot = get_matching_slot<T>(hash_buff, h, key, key_bytes);
69  if (matching_slot != kNoMatch) {
70  return matching_slot;
71  }
72  uint32_t h_probe = (h + 1) % entry_count;
73  while (h_probe != h) {
74  matching_slot = get_matching_slot<T>(hash_buff, h_probe, key, key_bytes);
75  if (matching_slot != kNoMatch) {
76  return matching_slot;
77  }
78  h_probe = (h_probe + 1) % entry_count;
79  }
80  return kNoMatch;
81 }
82 
83 extern "C" NEVER_INLINE DEVICE int64_t
84 baseline_hash_join_idx_32(const int8_t* hash_buff,
85  const int8_t* key,
86  const size_t key_bytes,
87  const size_t entry_count) {
88  return baseline_hash_join_idx_impl<int32_t>(hash_buff, key, key_bytes, entry_count);
89 }
90 
91 extern "C" NEVER_INLINE DEVICE int64_t
92 baseline_hash_join_idx_64(const int8_t* hash_buff,
93  const int8_t* key,
94  const size_t key_bytes,
95  const size_t entry_count) {
96  return baseline_hash_join_idx_impl<int64_t>(hash_buff, key, key_bytes, entry_count);
97 }
98 
99 template <typename T>
101  const double bucket_size) {
102  return static_cast<int64_t>(floor(static_cast<double>(value) * bucket_size));
103 }
104 
105 extern "C" NEVER_INLINE DEVICE int64_t
106 get_bucket_key_for_range_double(const int8_t* range_bytes,
107  const size_t range_component_index,
108  const double bucket_size) {
109  const auto range = reinterpret_cast<const double*>(range_bytes);
110  return get_bucket_key_for_value_impl(range[range_component_index], bucket_size);
111 }
112 
113 FORCE_INLINE DEVICE int64_t
115  const size_t range_component_index,
116  const double bucket_size) {
117  const auto range_ptr = reinterpret_cast<const int32_t*>(range);
118  if (range_component_index % 2 == 0) {
120  Geospatial::decompress_longitude_coord_geoint32(range_ptr[range_component_index]),
121  bucket_size);
122  } else {
124  Geospatial::decompress_lattitude_coord_geoint32(range_ptr[range_component_index]),
125  bucket_size);
126  }
127 }
128 
129 extern "C" NEVER_INLINE DEVICE int64_t
131  const size_t range_component_index,
132  const double bucket_size) {
134  range, range_component_index, bucket_size);
135 }
136 
137 template <typename T>
139  const size_t key_component_count,
140  const T* composite_key_dict,
141  const size_t entry_count) {
142  const uint32_t h = MurmurHash1(key, key_component_count * sizeof(T), 0) % entry_count;
143  uint32_t off = h * key_component_count;
144  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
145  return h;
146  }
147  uint32_t h_probe = (h + 1) % entry_count;
148  while (h_probe != h) {
149  off = h_probe * key_component_count;
150  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
151  return h_probe;
152  }
153  if (composite_key_dict[off] == SUFFIX(get_invalid_key) < T > ()) {
154  return -1;
155  }
156  h_probe = (h_probe + 1) % entry_count;
157  }
158  return -1;
159 }
160 
161 extern "C" NEVER_INLINE DEVICE int64_t
162 get_composite_key_index_32(const int32_t* key,
163  const size_t key_component_count,
164  const int32_t* composite_key_dict,
165  const size_t entry_count) {
167  key, key_component_count, composite_key_dict, entry_count);
168 }
169 
170 extern "C" NEVER_INLINE DEVICE int64_t
171 get_composite_key_index_64(const int64_t* key,
172  const size_t key_component_count,
173  const int64_t* composite_key_dict,
174  const size_t entry_count) {
176  key, key_component_count, composite_key_dict, entry_count);
177 }
178 
179 extern "C" NEVER_INLINE DEVICE int32_t insert_sorted(int32_t* arr,
180  size_t elem_count,
181  int32_t elem) {
182  for (size_t i = 0; i < elem_count; i++) {
183  if (elem == arr[i])
184  return 0;
185 
186  if (elem > arr[i])
187  continue;
188 
189  for (size_t j = elem_count; i < j; j--) {
190  arr[j] = arr[j - 1];
191  }
192  arr[i] = elem;
193  return 1;
194  }
195 
196  arr[elem_count] = elem;
197  return 1;
198 }
199 
200 extern "C" ALWAYS_INLINE DEVICE int64_t overlaps_hash_join_idx(int64_t hash_buff,
201  const int64_t key,
202  const int64_t min_key,
203  const int64_t max_key) {
204  if (key >= min_key && key <= max_key) {
205  return *(reinterpret_cast<int32_t*>(hash_buff) + (key - min_key));
206  }
207  return -1;
208 }
209 
210 struct BufferRange {
211  const int32_t* buffer = nullptr;
212  const int64_t element_count = 0;
213 };
214 
216 get_row_id_buffer_ptr(int64_t* hash_table_ptr,
217  const int64_t* key,
218  const int64_t key_component_count,
219  const int64_t entry_count,
220  const int64_t offset_buffer_ptr_offset,
221  const int64_t sub_buff_size) {
222  const int64_t min_key = 0;
223  const int64_t max_key = entry_count - 1;
224 
225  auto key_idx =
226  get_composite_key_index_64(key, key_component_count, hash_table_ptr, entry_count);
227 
228  if (key_idx < -1) {
229  return BufferRange{.buffer = nullptr, .element_count = 0};
230  }
231 
232  int8_t* one_to_many_ptr = reinterpret_cast<int8_t*>(hash_table_ptr);
233  one_to_many_ptr += offset_buffer_ptr_offset;
234 
235  // Returns an index used to fetch row count and row ids.
236  const auto slot = overlaps_hash_join_idx(
237  reinterpret_cast<int64_t>(one_to_many_ptr), key_idx, min_key, max_key);
238  if (slot < 0) {
239  return BufferRange{.buffer = nullptr, .element_count = 0};
240  }
241 
242  // Offset into the row count section of buffer
243  int8_t* count_ptr = one_to_many_ptr + sub_buff_size;
244 
245  const int64_t matched_row_count = overlaps_hash_join_idx(
246  reinterpret_cast<int64_t>(count_ptr), key_idx, min_key, max_key);
247 
248  // Offset into payload section, containing an array of row ids
249  int32_t* rowid_buffer = (int32_t*)(one_to_many_ptr + 2 * sub_buff_size);
250  const auto rowidoff_ptr = &rowid_buffer[slot];
251 
252  return BufferRange{.buffer = rowidoff_ptr, .element_count = matched_row_count};
253 }
254 
255 struct Bounds {
256  const double min_X;
257  const double min_Y;
258  const double max_X;
259  const double max_Y;
260 };
261 
286 extern "C" NEVER_INLINE DEVICE int64_t
287 get_candidate_rows(int32_t* out_arr,
288  const uint32_t max_arr_size,
289  const int8_t* range_bytes,
290  const int32_t range_component_index,
291  const double bucket_size_x,
292  const double bucket_size_y,
293  const int32_t keys_count,
294  const int64_t key_component_count,
295  int64_t* hash_table_ptr,
296  const int64_t entry_count,
297  const int64_t offset_buffer_ptr_offset,
298  const int64_t sub_buff_size) {
299  const auto range = reinterpret_cast<const double*>(range_bytes);
300 
301  size_t elem_count = 0;
302 
303  const auto bounds =
304  Bounds{.min_X = range[0], .min_Y = range[1], .max_X = range[2], .max_Y = range[3]};
305 
306  for (int64_t x = floor(bounds.min_X * bucket_size_x);
307  x <= floor(bounds.max_X * bucket_size_x);
308  x++) {
309  for (int64_t y = floor(bounds.min_Y * bucket_size_y);
310  y <= floor(bounds.max_Y * bucket_size_y);
311  y++) {
312  int64_t cur_key[2];
313  cur_key[0] = static_cast<int64_t>(x);
314  cur_key[1] = static_cast<int64_t>(y);
315 
316  const auto buffer_range = get_row_id_buffer_ptr(hash_table_ptr,
317  cur_key,
318  key_component_count,
319  entry_count,
320  offset_buffer_ptr_offset,
321  sub_buff_size);
322 
323  for (int64_t j = 0; j < buffer_range.element_count; j++) {
324  const auto rowid = buffer_range.buffer[j];
325  elem_count += insert_sorted(out_arr, elem_count, rowid);
326  assert(max_arr_size >= elem_count);
327  }
328  }
329  }
330 
331  return elem_count;
332 }
333 
334 // /// Given the bounding box and the bucket size,
335 // /// return the number of buckets the bounding box
336 // /// will be split into.
337 extern "C" NEVER_INLINE DEVICE int32_t
338 get_num_buckets_for_bounds(const int8_t* range_bytes,
339  const int32_t range_component_index,
340  const double bucket_size_x,
341  const double bucket_size_y) {
342  const auto range = reinterpret_cast<const double*>(range_bytes);
343 
344  const auto bounds_min_x = range[0];
345  const auto bounds_min_y = range[1];
346  const auto bounds_max_x = range[2];
347  const auto bounds_max_y = range[3];
348 
349  const auto num_x =
350  floor(bounds_max_x * bucket_size_x) - floor(bounds_min_x * bucket_size_x);
351  const auto num_y =
352  floor(bounds_max_y * bucket_size_y) - floor(bounds_min_y * bucket_size_y);
353  const auto num_buckets = (num_x + 1) * (num_y + 1);
354 
355  return num_buckets;
356 }
NEVER_INLINE DEVICE uint32_t MurmurHash1(const void *key, int len, const uint32_t seed)
Definition: MurmurHash.cpp:20
DEVICE bool compare_to_key(const int8_t *entry, const int8_t *key, const size_t key_bytes)
ALWAYS_INLINE DEVICE BufferRange get_row_id_buffer_ptr(int64_t *hash_table_ptr, const int64_t *key, const int64_t key_component_count, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
DEVICE int64_t get_matching_slot(const int8_t *hash_buff, const uint32_t h, const int8_t *key, const size_t key_bytes)
DEVICE double decompress_lattitude_coord_geoint32(const int32_t compressed)
#define SUFFIX(name)
NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_64(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
FORCE_INLINE DEVICE int64_t get_bucket_key_for_value_impl(const T value, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_compressed(const int8_t *range, const size_t range_component_index, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_double(const int8_t *range_bytes, const size_t range_component_index, const double bucket_size)
#define DEVICE
DEVICE T SUFFIX() get_invalid_key()
NEVER_INLINE DEVICE int32_t insert_sorted(int32_t *arr, size_t elem_count, int32_t elem)
ALWAYS_INLINE DEVICE int64_t overlaps_hash_join_idx(int64_t hash_buff, const int64_t key, const int64_t min_key, const int64_t max_key)
int64_t const int32_t sz assert(dest)
DEVICE double decompress_longitude_coord_geoint32(const int32_t compressed)
#define FORCE_INLINE
FORCE_INLINE DEVICE int64_t get_bucket_key_for_range_compressed_impl(const int8_t *range, const size_t range_component_index, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_candidate_rows(int32_t *out_arr, const uint32_t max_arr_size, const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y, const int32_t keys_count, const int64_t key_component_count, int64_t *hash_table_ptr, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size)
FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
#define NEVER_INLINE
NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_32(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
NEVER_INLINE DEVICE int64_t get_composite_key_index_32(const int32_t *key, const size_t key_component_count, const int32_t *composite_key_dict, const size_t entry_count)
#define ALWAYS_INLINE
NEVER_INLINE DEVICE int32_t get_num_buckets_for_bounds(const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y)
NEVER_INLINE DEVICE int64_t get_composite_key_index_64(const int64_t *key, const size_t key_component_count, const int64_t *composite_key_dict, const size_t entry_count)
FORCE_INLINE DEVICE int64_t get_composite_key_index_impl(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count)