OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
JoinHashTableQueryRuntime.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <math.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include "../Shared/geo_compression_runtime.h"
21 #include "CompareKeysInl.h"
22 #include "MurmurHash.h"
23 
24 DEVICE bool compare_to_key(const int8_t* entry,
25  const int8_t* key,
26  const size_t key_bytes) {
27  for (size_t i = 0; i < key_bytes; ++i) {
28  if (entry[i] != key[i]) {
29  return false;
30  }
31  }
32  return true;
33 }
34 
35 namespace {
36 
37 const int kNoMatch = -1;
38 const int kNotPresent = -2;
39 
40 } // namespace
41 
42 template <class T>
43 DEVICE int64_t get_matching_slot(const int8_t* hash_buff,
44  const uint32_t h,
45  const int8_t* key,
46  const size_t key_bytes) {
47  const auto lookup_result_ptr = hash_buff + h * (key_bytes + sizeof(T));
48  if (compare_to_key(lookup_result_ptr, key, key_bytes)) {
49  return *reinterpret_cast<const T*>(lookup_result_ptr + key_bytes);
50  }
51  if (*reinterpret_cast<const T*>(lookup_result_ptr) ==
52  SUFFIX(get_invalid_key) < T > ()) {
53  return kNotPresent;
54  }
55  return kNoMatch;
56 }
57 
58 template <class T>
59 FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t* hash_buff,
60  const int8_t* key,
61  const size_t key_bytes,
62  const size_t entry_count) {
63  if (!entry_count) {
64  return kNoMatch;
65  }
66  const uint32_t h = MurmurHash1(key, key_bytes, 0) % entry_count;
67  int64_t matching_slot = get_matching_slot<T>(hash_buff, h, key, key_bytes);
68  if (matching_slot != kNoMatch) {
69  return matching_slot;
70  }
71  uint32_t h_probe = (h + 1) % entry_count;
72  while (h_probe != h) {
73  matching_slot = get_matching_slot<T>(hash_buff, h_probe, key, key_bytes);
74  if (matching_slot != kNoMatch) {
75  return matching_slot;
76  }
77  h_probe = (h_probe + 1) % entry_count;
78  }
79  return kNoMatch;
80 }
81 
82 extern "C" NEVER_INLINE DEVICE int64_t
83 baseline_hash_join_idx_32(const int8_t* hash_buff,
84  const int8_t* key,
85  const size_t key_bytes,
86  const size_t entry_count) {
87  return baseline_hash_join_idx_impl<int32_t>(hash_buff, key, key_bytes, entry_count);
88 }
89 
90 extern "C" NEVER_INLINE DEVICE int64_t
91 baseline_hash_join_idx_64(const int8_t* hash_buff,
92  const int8_t* key,
93  const size_t key_bytes,
94  const size_t entry_count) {
95  return baseline_hash_join_idx_impl<int64_t>(hash_buff, key, key_bytes, entry_count);
96 }
97 
98 template <typename T>
100  const double bucket_size) {
101  return static_cast<int64_t>(floor(static_cast<double>(value) * bucket_size));
102 }
103 
104 extern "C" NEVER_INLINE DEVICE int64_t
105 get_bucket_key_for_range_double(const int8_t* range_bytes,
106  const size_t range_component_index,
107  const double bucket_size) {
108  const auto range = reinterpret_cast<const double*>(range_bytes);
109  return get_bucket_key_for_value_impl(range[range_component_index], bucket_size);
110 }
111 
112 FORCE_INLINE DEVICE int64_t
114  const size_t range_component_index,
115  const double bucket_size) {
116  const auto range_ptr = reinterpret_cast<const int32_t*>(range);
117  if (range_component_index % 2 == 0) {
120  range_ptr[range_component_index]),
121  bucket_size);
122  } else {
125  range_ptr[range_component_index]),
126  bucket_size);
127  }
128 }
129 
130 extern "C" NEVER_INLINE DEVICE int64_t
132  const size_t range_component_index,
133  const double bucket_size) {
135  range, range_component_index, bucket_size);
136 }
137 
138 template <typename T>
140  const size_t key_component_count,
141  const T* composite_key_dict,
142  const size_t entry_count) {
143  const uint32_t h = MurmurHash1(key, key_component_count * sizeof(T), 0) % entry_count;
144  uint32_t off = h * key_component_count;
145  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
146  return h;
147  }
148  uint32_t h_probe = (h + 1) % entry_count;
149  while (h_probe != h) {
150  off = h_probe * key_component_count;
151  if (keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
152  return h_probe;
153  }
154  if (composite_key_dict[off] == SUFFIX(get_invalid_key) < T > ()) {
155  return -1;
156  }
157  h_probe = (h_probe + 1) % entry_count;
158  }
159  return -1;
160 }
161 
162 extern "C" NEVER_INLINE DEVICE int64_t
163 get_composite_key_index_32(const int32_t* key,
164  const size_t key_component_count,
165  const int32_t* composite_key_dict,
166  const size_t entry_count) {
168  key, key_component_count, composite_key_dict, entry_count);
169 }
170 
171 extern "C" NEVER_INLINE DEVICE int64_t
172 get_composite_key_index_64(const int64_t* key,
173  const size_t key_component_count,
174  const int64_t* composite_key_dict,
175  const size_t entry_count) {
177  key, key_component_count, composite_key_dict, entry_count);
178 }
179 
180 extern "C" NEVER_INLINE DEVICE int32_t insert_sorted(int32_t* arr,
181  size_t elem_count,
182  int32_t elem) {
183  for (size_t i = 0; i < elem_count; i++) {
184  if (elem == arr[i])
185  return 0;
186 
187  if (elem > arr[i])
188  continue;
189 
190  for (size_t j = elem_count; i < j; j--) {
191  arr[j] = arr[j - 1];
192  }
193  arr[i] = elem;
194  return 1;
195  }
196 
197  arr[elem_count] = elem;
198  return 1;
199 }
200 
201 extern "C" ALWAYS_INLINE DEVICE int64_t overlaps_hash_join_idx(int64_t hash_buff,
202  const int64_t key,
203  const int64_t min_key,
204  const int64_t max_key) {
205  if (key >= min_key && key <= max_key) {
206  return *(reinterpret_cast<int32_t*>(hash_buff) + (key - min_key));
207  }
208  return -1;
209 }
210 
211 struct BufferRange {
212  const int32_t* buffer = nullptr;
213  const int64_t element_count = 0;
214 };
215 
217 get_row_id_buffer_ptr(int64_t* hash_table_ptr,
218  const int64_t* key,
219  const int64_t key_component_count,
220  const int64_t entry_count,
221  const int64_t offset_buffer_ptr_offset,
222  const int64_t sub_buff_size) {
223  const int64_t min_key = 0;
224  const int64_t max_key = entry_count - 1;
225 
226  auto key_idx =
227  get_composite_key_index_64(key, key_component_count, hash_table_ptr, entry_count);
228 
229  if (key_idx < -1) {
230  return BufferRange{.buffer = nullptr, .element_count = 0};
231  }
232 
233  int8_t* one_to_many_ptr = reinterpret_cast<int8_t*>(hash_table_ptr);
234  one_to_many_ptr += offset_buffer_ptr_offset;
235 
236  // Returns an index used to fetch row count and row ids.
237  const auto slot = overlaps_hash_join_idx(
238  reinterpret_cast<int64_t>(one_to_many_ptr), key_idx, min_key, max_key);
239  if (slot < 0) {
240  return BufferRange{.buffer = nullptr, .element_count = 0};
241  }
242 
243  // Offset into the row count section of buffer
244  int8_t* count_ptr = one_to_many_ptr + sub_buff_size;
245 
246  const int64_t matched_row_count = overlaps_hash_join_idx(
247  reinterpret_cast<int64_t>(count_ptr), key_idx, min_key, max_key);
248 
249  // Offset into payload section, containing an array of row ids
250  int32_t* rowid_buffer = (int32_t*)(one_to_many_ptr + 2 * sub_buff_size);
251  const auto rowidoff_ptr = &rowid_buffer[slot];
252 
253  return BufferRange{.buffer = rowidoff_ptr, .element_count = matched_row_count};
254 }
255 
256 struct Bounds {
257  const double min_X;
258  const double min_Y;
259  const double max_X;
260  const double max_Y;
261 };
262 
287 extern "C" NEVER_INLINE DEVICE int64_t
288 get_candidate_rows(int32_t* out_arr,
289  const uint32_t max_arr_size,
290  const int8_t* range_bytes,
291  const int32_t range_component_index,
292  const double bucket_size_x,
293  const double bucket_size_y,
294  const int32_t keys_count,
295  const int64_t key_component_count,
296  int64_t* hash_table_ptr,
297  const int64_t entry_count,
298  const int64_t offset_buffer_ptr_offset,
299  const int64_t sub_buff_size) {
300  const auto range = reinterpret_cast<const double*>(range_bytes);
301 
302  size_t elem_count = 0;
303 
304  const auto bounds =
305  Bounds{.min_X = range[0], .min_Y = range[1], .max_X = range[2], .max_Y = range[3]};
306 
307  for (int64_t x = floor(bounds.min_X * bucket_size_x);
308  x <= floor(bounds.max_X * bucket_size_x);
309  x++) {
310  for (int64_t y = floor(bounds.min_Y * bucket_size_y);
311  y <= floor(bounds.max_Y * bucket_size_y);
312  y++) {
313  int64_t cur_key[2];
314  cur_key[0] = static_cast<int64_t>(x);
315  cur_key[1] = static_cast<int64_t>(y);
316 
317  const auto buffer_range = get_row_id_buffer_ptr(hash_table_ptr,
318  cur_key,
319  key_component_count,
320  entry_count,
321  offset_buffer_ptr_offset,
322  sub_buff_size);
323 
324  for (int64_t j = 0; j < buffer_range.element_count; j++) {
325  const auto rowid = buffer_range.buffer[j];
326  elem_count += insert_sorted(out_arr, elem_count, rowid);
327  assert(max_arr_size >= elem_count);
328  }
329  }
330  }
331 
332  return elem_count;
333 }
334 
335 // /// Given the bounding box and the bucket size,
336 // /// return the number of buckets the bounding box
337 // /// will be split into.
338 extern "C" NEVER_INLINE DEVICE int32_t
339 get_num_buckets_for_bounds(const int8_t* range_bytes,
340  const int32_t range_component_index,
341  const double bucket_size_x,
342  const double bucket_size_y) {
343  const auto range = reinterpret_cast<const double*>(range_bytes);
344 
345  const auto bounds_min_x = range[0];
346  const auto bounds_min_y = range[1];
347  const auto bounds_max_x = range[2];
348  const auto bounds_max_y = range[3];
349 
350  const auto num_x =
351  floor(bounds_max_x * bucket_size_x) - floor(bounds_min_x * bucket_size_x);
352  const auto num_y =
353  floor(bounds_max_y * bucket_size_y) - floor(bounds_min_y * bucket_size_y);
354  const auto num_buckets = (num_x + 1) * (num_y + 1);
355 
356  return num_buckets;
357 }
NEVER_INLINE DEVICE uint32_t MurmurHash1(const void *key, int len, const uint32_t seed)
Definition: MurmurHash.cpp:20
DEVICE bool compare_to_key(const int8_t *entry, const int8_t *key, const size_t key_bytes)
ALWAYS_INLINE DEVICE BufferRange get_row_id_buffer_ptr(int64_t *hash_table_ptr, const int64_t *key, const int64_t key_component_count, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
DEVICE int64_t get_matching_slot(const int8_t *hash_buff, const uint32_t h, const int8_t *key, const size_t key_bytes)
#define SUFFIX(name)
NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_64(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
FORCE_INLINE DEVICE int64_t get_bucket_key_for_value_impl(const T value, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_compressed(const int8_t *range, const size_t range_component_index, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_bucket_key_for_range_double(const int8_t *range_bytes, const size_t range_component_index, const double bucket_size)
#define DEVICE
DEVICE T SUFFIX() get_invalid_key()
NEVER_INLINE DEVICE int32_t insert_sorted(int32_t *arr, size_t elem_count, int32_t elem)
ALWAYS_INLINE DEVICE int64_t overlaps_hash_join_idx(int64_t hash_buff, const int64_t key, const int64_t min_key, const int64_t max_key)
DEVICE double decompress_lattitude_coord_geoint32(const int32_t compressed)
int64_t const int32_t sz assert(dest)
#define FORCE_INLINE
FORCE_INLINE DEVICE int64_t get_bucket_key_for_range_compressed_impl(const int8_t *range, const size_t range_component_index, const double bucket_size)
NEVER_INLINE DEVICE int64_t get_candidate_rows(int32_t *out_arr, const uint32_t max_arr_size, const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y, const int32_t keys_count, const int64_t key_component_count, int64_t *hash_table_ptr, const int64_t entry_count, const int64_t offset_buffer_ptr_offset, const int64_t sub_buff_size)
FORCE_INLINE DEVICE int64_t baseline_hash_join_idx_impl(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
#define NEVER_INLINE
NEVER_INLINE DEVICE int64_t baseline_hash_join_idx_32(const int8_t *hash_buff, const int8_t *key, const size_t key_bytes, const size_t entry_count)
DEVICE double decompress_longitude_coord_geoint32(const int32_t compressed)
NEVER_INLINE DEVICE int64_t get_composite_key_index_32(const int32_t *key, const size_t key_component_count, const int32_t *composite_key_dict, const size_t entry_count)
#define ALWAYS_INLINE
NEVER_INLINE DEVICE int32_t get_num_buckets_for_bounds(const int8_t *range_bytes, const int32_t range_component_index, const double bucket_size_x, const double bucket_size_y)
NEVER_INLINE DEVICE int64_t get_composite_key_index_64(const int64_t *key, const size_t key_component_count, const int64_t *composite_key_dict, const size_t entry_count)
FORCE_INLINE DEVICE int64_t get_composite_key_index_impl(const T *key, const size_t key_component_count, const T *composite_key_dict, const size_t entry_count)