OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionsCommon.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __CUDACC__
18 
19 #include "TableFunctionsCommon.hpp"
20 
21 #include <cstring> // std::memcpy
22 #include <filesystem>
23 #include <memory>
24 #include <mutex>
25 #include <regex>
26 #include <shared_mutex>
27 #include <string>
28 #include <unordered_map>
29 
30 #include <tbb/parallel_for.h>
31 #include <tbb/task_arena.h>
32 
33 #define NANOSECONDS_PER_SECOND 1000000000
34 
35 template <typename T>
36 NEVER_INLINE HOST std::pair<T, T> get_column_min_max(const Column<T>& col) {
37  T col_min = std::numeric_limits<T>::max();
38  T col_max = std::numeric_limits<T>::lowest();
39  const int64_t num_rows = col.size();
40  const size_t max_thread_count = std::thread::hardware_concurrency();
41  const size_t max_inputs_per_thread = 20000;
42  const size_t num_threads = std::min(
43  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
44 
45  std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
46  std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
47  tbb::task_arena limited_arena(num_threads);
48 
49  limited_arena.execute([&] {
50  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
51  [&](const tbb::blocked_range<int64_t>& r) {
52  const int64_t start_idx = r.begin();
53  const int64_t end_idx = r.end();
54  T local_col_min = std::numeric_limits<T>::max();
55  T local_col_max = std::numeric_limits<T>::lowest();
56  for (int64_t r = start_idx; r < end_idx; ++r) {
57  const T val = col[r];
58  if (val == inline_null_value<T>()) {
59  continue;
60  }
61  if (val < local_col_min) {
62  local_col_min = val;
63  }
64  if (val > local_col_max) {
65  local_col_max = val;
66  }
67  }
68  size_t thread_idx = tbb::this_task_arena::current_thread_index();
69  if (local_col_min < local_col_mins[thread_idx]) {
70  local_col_mins[thread_idx] = local_col_min;
71  }
72  if (local_col_max > local_col_maxes[thread_idx]) {
73  local_col_maxes[thread_idx] = local_col_max;
74  }
75  });
76  });
77 
78  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
79  if (local_col_mins[thread_idx] < col_min) {
80  col_min = local_col_mins[thread_idx];
81  }
82  if (local_col_maxes[thread_idx] > col_max) {
83  col_max = local_col_maxes[thread_idx];
84  }
85  }
86  return std::make_pair(col_min, col_max);
87 }
88 
89 template NEVER_INLINE HOST std::pair<int8_t, int8_t> get_column_min_max(
90  const Column<int8_t>& col);
91 template NEVER_INLINE HOST std::pair<int16_t, int16_t> get_column_min_max(
92  const Column<int16_t>& col);
93 template NEVER_INLINE HOST std::pair<int32_t, int32_t> get_column_min_max(
94  const Column<int32_t>& col);
95 template NEVER_INLINE HOST std::pair<int64_t, int64_t> get_column_min_max(
96  const Column<int64_t>& col);
97 template NEVER_INLINE HOST std::pair<float, float> get_column_min_max(
98  const Column<float>& col);
99 template NEVER_INLINE HOST std::pair<double, double> get_column_min_max(
100  const Column<double>& col);
101 
102 std::pair<int32_t, int32_t> get_column_min_max(const Column<TextEncodingDict>& col) {
103  Column<int32_t> int_alias_col(reinterpret_cast<int32_t*>(col.getPtr()), col.size());
104  return get_column_min_max(int_alias_col);
105 }
106 
107 // Todo(todd): we should use a functor approach for gathering whatever stats
108 // a table function needs so we're not repeating boilerplate code (although
109 // should confirm it doesn't have an adverse affect on performance).
110 // Leaving as a follow-up though until we have more examples of real-world
111 // usage patterns.
112 
113 template <typename T>
114 NEVER_INLINE HOST double get_column_mean(const T* data, const int64_t num_rows) {
115  // const int64_t num_rows = col.size();
116  const size_t max_thread_count = std::thread::hardware_concurrency();
117  const size_t max_inputs_per_thread = 20000;
118  const size_t num_threads = std::min(
119  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
120 
121  std::vector<double> local_col_sums(num_threads, 0.);
122  std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
123  tbb::task_arena limited_arena(num_threads);
124  limited_arena.execute([&] {
125  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
126  [&](const tbb::blocked_range<int64_t>& r) {
127  const int64_t start_idx = r.begin();
128  const int64_t end_idx = r.end();
129  double local_col_sum = 0.;
130  int64_t local_col_non_null_count = 0;
131  for (int64_t r = start_idx; r < end_idx; ++r) {
132  const T val = data[r];
133  if (val == inline_null_value<T>()) {
134  continue;
135  }
136  local_col_sum += data[r];
137  local_col_non_null_count++;
138  }
139  size_t thread_idx = tbb::this_task_arena::current_thread_index();
140  local_col_sums[thread_idx] += local_col_sum;
141  local_col_non_null_counts[thread_idx] += local_col_non_null_count;
142  });
143  });
144 
145  double col_sum = 0.0;
146  int64_t col_non_null_count = 0L;
147 
148  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
149  col_sum += local_col_sums[thread_idx];
150  col_non_null_count += local_col_non_null_counts[thread_idx];
151  }
152 
153  return col_non_null_count == 0 ? 0 : col_sum / col_non_null_count;
154 }
155 
156 template NEVER_INLINE HOST double get_column_mean(const int8_t* data,
157  const int64_t num_rows);
158 
159 template NEVER_INLINE HOST double get_column_mean(const int16_t* data,
160  const int64_t num_rows);
161 
162 template NEVER_INLINE HOST double get_column_mean(const int32_t* data,
163  const int64_t num_rows);
164 
165 template NEVER_INLINE HOST double get_column_mean(const int64_t* data,
166  const int64_t num_rows);
167 
168 template NEVER_INLINE HOST double get_column_mean(const float* data,
169  const int64_t num_rows);
170 
171 template NEVER_INLINE HOST double get_column_mean(const double* data,
172  const int64_t num_rows);
173 
174 template <typename T>
176  return get_column_mean(col.getPtr(), col.size());
177 }
178 
179 template NEVER_INLINE HOST double get_column_mean(const Column<int8_t>& col);
180 template NEVER_INLINE HOST double get_column_mean(const Column<int16_t>& col);
181 template NEVER_INLINE HOST double get_column_mean(const Column<int32_t>& col);
182 template NEVER_INLINE HOST double get_column_mean(const Column<int64_t>& col);
183 template NEVER_INLINE HOST double get_column_mean(const Column<float>& col);
184 template NEVER_INLINE HOST double get_column_mean(const Column<double>& col);
185 
186 template <typename T>
187 NEVER_INLINE HOST double get_column_std_dev(const Column<T>& col, const double mean) {
188  return get_column_std_dev(col.getPtr(), col.size(), mean);
189 }
190 
191 template NEVER_INLINE HOST double get_column_std_dev(const Column<int32_t>& col,
192  const double mean);
193 template NEVER_INLINE HOST double get_column_std_dev(const Column<int64_t>& col,
194  const double mean);
195 template NEVER_INLINE HOST double get_column_std_dev(const Column<float>& col,
196  const double mean);
197 template NEVER_INLINE HOST double get_column_std_dev(const Column<double>& col,
198  const double mean);
199 
200 template <typename T>
202  const int64_t num_rows,
203  const double mean) {
204  // const int64_t num_rows = col.size();
205  const size_t max_thread_count = std::thread::hardware_concurrency();
206  const size_t max_inputs_per_thread = 200000;
207  const size_t num_threads = std::min(
208  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
209 
210  std::vector<double> local_col_squared_residuals(num_threads, 0.);
211  std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
212  tbb::task_arena limited_arena(num_threads);
213 
214  limited_arena.execute([&] {
215  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
216  [&](const tbb::blocked_range<int64_t>& r) {
217  const int64_t start_idx = r.begin();
218  const int64_t end_idx = r.end();
219  double local_col_squared_residual = 0.;
220  int64_t local_col_non_null_count = 0;
221  for (int64_t r = start_idx; r < end_idx; ++r) {
222  const T val = data[r];
223  if (val == inline_null_value<T>()) {
224  continue;
225  }
226  const double residual = val - mean;
227  local_col_squared_residual += (residual * residual);
228  local_col_non_null_count++;
229  }
230  size_t thread_idx = tbb::this_task_arena::current_thread_index();
231  local_col_squared_residuals[thread_idx] +=
232  local_col_squared_residual;
233  local_col_non_null_counts[thread_idx] += local_col_non_null_count;
234  });
235  });
236 
237  double col_sum_squared_residual = 0.0;
238  int64_t col_non_null_count = 0;
239 
240  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
241  col_sum_squared_residual += local_col_squared_residuals[thread_idx];
242  col_non_null_count += local_col_non_null_counts[thread_idx];
243  }
244 
245  return col_non_null_count == 0 ? 0
246  : sqrt(col_sum_squared_residual / col_non_null_count);
247 }
248 
249 template NEVER_INLINE HOST double get_column_std_dev(const int32_t* data,
250  const int64_t num_rows,
251  const double mean);
252 template NEVER_INLINE HOST double get_column_std_dev(const int64_t* data,
253  const int64_t num_rows,
254  const double mean);
255 template NEVER_INLINE HOST double get_column_std_dev(const float* data,
256  const int64_t num_rows,
257  const double mean);
258 template NEVER_INLINE HOST double get_column_std_dev(const double* data,
259  const int64_t num_rows,
260  const double mean);
261 
262 template <typename T>
263 NEVER_INLINE HOST std::tuple<T, T, bool> get_column_metadata(const Column<T>& col) {
264  T col_min = std::numeric_limits<T>::max();
265  T col_max = std::numeric_limits<T>::lowest();
266  bool has_nulls = false;
267  const int64_t num_rows = col.size();
268  const size_t max_thread_count = std::thread::hardware_concurrency();
269  const size_t max_inputs_per_thread = 200000;
270  const size_t num_threads = std::min(
271  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
272 
273  std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
274  std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
275  std::vector<bool> local_col_has_nulls(num_threads, false);
276  tbb::task_arena limited_arena(num_threads);
277 
278  limited_arena.execute([&] {
279  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
280  [&](const tbb::blocked_range<int64_t>& r) {
281  const int64_t start_idx = r.begin();
282  const int64_t end_idx = r.end();
283  T local_col_min = std::numeric_limits<T>::max();
284  T local_col_max = std::numeric_limits<T>::lowest();
285  bool local_has_nulls = false;
286  for (int64_t r = start_idx; r < end_idx; ++r) {
287  if (col.isNull(r)) {
288  local_has_nulls = true;
289  continue;
290  }
291  if (col[r] < local_col_min) {
292  local_col_min = col[r];
293  }
294  if (col[r] > local_col_max) {
295  local_col_max = col[r];
296  }
297  }
298  const size_t thread_idx =
299  tbb::this_task_arena::current_thread_index();
300  if (local_has_nulls) {
301  local_col_has_nulls[thread_idx] = true;
302  }
303  if (local_col_min < local_col_mins[thread_idx]) {
304  local_col_mins[thread_idx] = local_col_min;
305  }
306  if (local_col_max > local_col_maxes[thread_idx]) {
307  local_col_maxes[thread_idx] = local_col_max;
308  }
309  });
310  });
311 
312  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
313  if (local_col_has_nulls[thread_idx]) {
314  has_nulls = true;
315  }
316  if (local_col_mins[thread_idx] < col_min) {
317  col_min = local_col_mins[thread_idx];
318  }
319  if (local_col_maxes[thread_idx] > col_max) {
320  col_max = local_col_maxes[thread_idx];
321  }
322  }
323  return {col_min, col_max, has_nulls};
324 }
325 
326 template NEVER_INLINE HOST std::tuple<int8_t, int8_t, bool> get_column_metadata(
327  const Column<int8_t>& col);
328 template NEVER_INLINE HOST std::tuple<int16_t, int16_t, bool> get_column_metadata(
329  const Column<int16_t>& col);
330 template NEVER_INLINE HOST std::tuple<int32_t, int32_t, bool> get_column_metadata(
331  const Column<int32_t>& col);
332 template NEVER_INLINE HOST std::tuple<int64_t, int64_t, bool> get_column_metadata(
333  const Column<int64_t>& col);
334 template NEVER_INLINE HOST std::tuple<float, float, bool> get_column_metadata(
335  const Column<float>& col);
336 template NEVER_INLINE HOST std::tuple<double, double, bool> get_column_metadata(
337  const Column<double>& col);
338 
339 std::tuple<int32_t, int32_t, bool> get_column_metadata(
340  const Column<TextEncodingDict>& col) {
341  Column<int32_t> int_alias_col(reinterpret_cast<int32_t*>(col.getPtr()), col.size());
342  return get_column_metadata(int_alias_col);
343 }
344 
345 template <typename T>
346 void z_std_normalize_col(const T* input_data,
347  T* output_data,
348  const int64_t num_rows,
349  const double mean,
350  const double std_dev) {
351  if (std_dev <= 0.0) {
352  throw std::runtime_error("Standard deviation cannot be <= 0");
353  }
354  const double inv_std_dev = 1.0 / std_dev;
355 
356  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
357  [&](const tbb::blocked_range<int64_t>& r) {
358  const int64_t start_idx = r.begin();
359  const int64_t end_idx = r.end();
360  for (int64_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
361  output_data[row_idx] = (input_data[row_idx] - mean) * inv_std_dev;
362  }
363  });
364 }
365 
366 template void z_std_normalize_col(const float* input_data,
367  float* output_data,
368  const int64_t num_rows,
369  const double mean,
370  const double std_dev);
371 template void z_std_normalize_col(const double* input_data,
372  double* output_data,
373  const int64_t num_rows,
374  const double mean,
375  const double std_dev);
376 
377 template <typename T>
378 std::vector<std::vector<T>> z_std_normalize_data(const std::vector<T*>& input_data,
379  const int64_t num_rows) {
380  const int64_t num_features = input_data.size();
381  std::vector<std::vector<T>> normalized_data(num_features);
382  for (int64_t feature_idx = 0; feature_idx < num_features; ++feature_idx) {
383  const auto mean = get_column_mean(input_data[feature_idx], num_rows);
384  const auto std_dev = get_column_std_dev(input_data[feature_idx], num_rows, mean);
385  normalized_data[feature_idx].resize(num_rows);
386  z_std_normalize_col(input_data[feature_idx],
387  normalized_data[feature_idx].data(),
388  num_rows,
389  mean,
390  std_dev);
391  }
392  return normalized_data;
393 }
394 
395 template std::vector<std::vector<float>> z_std_normalize_data(
396  const std::vector<float*>& input_data,
397  const int64_t num_rows);
398 template std::vector<std::vector<double>> z_std_normalize_data(
399  const std::vector<double*>& input_data,
400  const int64_t num_rows);
401 
402 template <typename T1, typename T2>
404 distance_in_meters(const T1 fromlon, const T1 fromlat, const T2 tolon, const T2 tolat) {
405  T1 latitudeArc = (fromlat - tolat) * 0.017453292519943295769236907684886;
406  T1 longitudeArc = (fromlon - tolon) * 0.017453292519943295769236907684886;
407  T1 latitudeH = sin(latitudeArc * 0.5);
408  latitudeH *= latitudeH;
409  T1 lontitudeH = sin(longitudeArc * 0.5);
410  lontitudeH *= lontitudeH;
411  T1 tmp = cos(fromlat * 0.017453292519943295769236907684886) *
412  cos(tolat * 0.017453292519943295769236907684886);
413  return 6372797.560856 * (2.0 * asin(sqrt(latitudeH + tmp * lontitudeH)));
414 }
415 
416 template NEVER_INLINE HOST float distance_in_meters(const float fromlon,
417  const float fromlat,
418  const float tolon,
419  const float tolat);
420 
421 template NEVER_INLINE HOST float distance_in_meters(const float fromlon,
422  const float fromlat,
423  const double tolon,
424  const double tolat);
425 
426 template NEVER_INLINE HOST double distance_in_meters(const double fromlon,
427  const double fromlat,
428  const float tolon,
429  const float tolat);
430 
431 template NEVER_INLINE HOST double distance_in_meters(const double fromlon,
432  const double fromlat,
433  const double tolon,
434  const double tolat);
435 
436 bool DataBufferCache::isKeyCached(const std::string& key) const {
437  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
438  return data_cache_.count(key) > 0;
439 }
440 
441 bool DataBufferCache::isKeyCachedAndSameLength(const std::string& key,
442  const size_t num_bytes) const {
443  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
444  const auto& cached_data_itr = data_cache_.find(key);
445  if (cached_data_itr == data_cache_.end()) {
446  return false;
447  }
448  return num_bytes == cached_data_itr->second->num_bytes;
449 }
450 
451 template <typename T>
452 void DataBufferCache::getDataForKey(const std::string& key, T* dest_buffer) const {
453  auto timer = DEBUG_TIMER(__func__);
454  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
455  const auto& cached_data_itr = data_cache_.find(key);
456  if (cached_data_itr == data_cache_.end()) {
457  const std::string error_msg = "Data for key " + key + " not found in cache.";
458  throw std::runtime_error(error_msg);
459  }
460  copyData(reinterpret_cast<int8_t*>(dest_buffer),
461  cached_data_itr->second->data_buffer,
462  cached_data_itr->second->num_bytes);
463 }
464 
465 template <typename T>
466 const T& DataBufferCache::getDataRefForKey(const std::string& key) const {
467  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
468  const auto& cached_data_itr = data_cache_.find(key);
469  if (cached_data_itr == data_cache_.end()) {
470  const std::string error_msg{"Data for key " + key + " not found in cache."};
471  throw std::runtime_error(error_msg);
472  }
473  return *reinterpret_cast<const T*>(cached_data_itr->second->data_buffer);
474 }
475 
476 template <typename T>
477 const T* DataBufferCache::getDataPtrForKey(const std::string& key) const {
478  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
479  const auto& cached_data_itr = data_cache_.find(key);
480  if (cached_data_itr == data_cache_.end()) {
481  return nullptr;
482  }
483  return reinterpret_cast<const T* const>(cached_data_itr->second->data_buffer);
484 }
485 
486 template <typename T>
487 void DataBufferCache::putDataForKey(const std::string& key,
488  T* const data_buffer,
489  const size_t num_elements) {
490  auto timer = DEBUG_TIMER(__func__);
491  const size_t num_bytes(num_elements * sizeof(T));
492  auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
493  copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
494  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
495  const auto& cached_data_itr = data_cache_.find(key);
496  if (data_cache_.find(key) != data_cache_.end()) {
497  const std::string warning_msg =
498  "Data for key " + key + " already exists in cache. Replacing.";
499  std::cout << warning_msg << std::endl;
500  cached_data_itr->second.reset();
501  cached_data_itr->second = cache_data;
502  return;
503  }
504  data_cache_.insert(std::make_pair(key, cache_data));
505 }
506 
508  const int8_t* source,
509  const size_t num_bytes) const {
510  if (num_bytes < parallel_copy_min_bytes) {
511  std::memcpy(dest, source, num_bytes);
512  return;
513  }
514  const size_t max_bytes_per_thread = parallel_copy_min_bytes;
515  const size_t num_threads =
516  (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
518  tbb::blocked_range<size_t>(0, num_threads, 1),
519  [&](const tbb::blocked_range<size_t>& r) {
520  const size_t end_chunk_idx = r.end();
521  for (size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
522  const size_t start_byte = chunk_idx * max_bytes_per_thread;
523  const size_t length =
524  std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
525  std::memcpy(dest + start_byte, source + start_byte, length);
526  }
527  });
528 }
529 
530 /* Definitions for DataCache */
531 
532 template <typename T>
533 bool DataCache<T>::isKeyCached(const std::string& key) const {
534  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
535  return data_cache_.count(key) > 0;
536 }
537 
538 template <typename T>
539 std::shared_ptr<T> DataCache<T>::getDataForKey(const std::string& key) const {
540  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
541  const auto& cached_data_itr = data_cache_.find(key);
542  if (cached_data_itr == data_cache_.end()) {
543  const std::string error_msg{"Data for key " + key + " not found in cache."};
544  throw std::runtime_error(error_msg);
545  }
546  return cached_data_itr->second;
547 }
548 
549 template <typename T>
550 void DataCache<T>::putDataForKey(const std::string& key, std::shared_ptr<T> const data) {
551  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
552  const auto& cached_data_itr = data_cache_.find(key);
553  if (cached_data_itr != data_cache_.end()) {
554  const std::string warning_msg =
555  "Data for key " + key + " already exists in cache. Replacing.";
556  std::cout << warning_msg << std::endl;
557  cached_data_itr->second.reset();
558  cached_data_itr->second = data;
559  }
560  data_cache_.insert(std::make_pair(key, data));
561 }
562 
563 namespace FileUtilities {
564 
565 // Following implementation taken from https://stackoverflow.com/a/65851545
566 
567 std::regex glob_to_regex(const std::string& glob, bool case_sensitive = false) {
568  // Note It is possible to automate checking if filesystem is case sensitive or not (e.g.
569  // by performing a test first time this function is ran)
570  std::string regex_string{glob};
571  // Escape all regex special chars:
572  regex_string = std::regex_replace(regex_string, std::regex("\\\\"), "\\\\");
573  regex_string = std::regex_replace(regex_string, std::regex("\\^"), "\\^");
574  regex_string = std::regex_replace(regex_string, std::regex("\\."), "\\.");
575  regex_string = std::regex_replace(regex_string, std::regex("\\$"), "\\$");
576  regex_string = std::regex_replace(regex_string, std::regex("\\|"), "\\|");
577  regex_string = std::regex_replace(regex_string, std::regex("\\("), "\\(");
578  regex_string = std::regex_replace(regex_string, std::regex("\\)"), "\\)");
579  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\{");
580  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\}");
581  regex_string = std::regex_replace(regex_string, std::regex("\\["), "\\[");
582  regex_string = std::regex_replace(regex_string, std::regex("\\]"), "\\]");
583  regex_string = std::regex_replace(regex_string, std::regex("\\+"), "\\+");
584  regex_string = std::regex_replace(regex_string, std::regex("\\/"), "\\/");
585  // Convert wildcard specific chars '*?' to their regex equivalents:
586  regex_string = std::regex_replace(regex_string, std::regex("\\?"), ".");
587  regex_string = std::regex_replace(regex_string, std::regex("\\*"), ".*");
588 
589  return std::regex(
590  regex_string,
591  case_sensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
592 }
593 
594 std::vector<std::filesystem::path> get_fs_paths(const std::string& file_or_directory) {
595  const std::filesystem::path file_or_directory_path(file_or_directory);
596  const auto file_status = std::filesystem::status(file_or_directory_path);
597 
598  std::vector<std::filesystem::path> fs_paths;
599  if (std::filesystem::is_regular_file(file_status)) {
600  fs_paths.emplace_back(file_or_directory_path);
601  return fs_paths;
602  } else if (std::filesystem::is_directory(file_status)) {
603  for (std::filesystem::directory_entry const& entry :
604  std::filesystem::directory_iterator(file_or_directory_path)) {
605  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
606  fs_paths.emplace_back(entry.path());
607  }
608  }
609  return fs_paths;
610  } else {
611  const auto parent_path = file_or_directory_path.parent_path();
612  const auto parent_status = std::filesystem::status(parent_path);
613  if (std::filesystem::is_directory(parent_status)) {
614  const auto file_glob = file_or_directory_path.filename();
615  const std::regex glob_regex{glob_to_regex(file_glob.string(), false)};
616 
617  for (std::filesystem::directory_entry const& entry :
618  std::filesystem::directory_iterator(parent_path)) {
619  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
620  const auto entry_filename = entry.path().filename().string();
621  if (std::regex_match(entry_filename, glob_regex)) {
622  fs_paths.emplace_back(entry.path());
623  }
624  }
625  }
626  return fs_paths;
627  }
628  }
629  return fs_paths;
630 }
631 
632 } // namespace FileUtilities
633 
634 template <typename T>
636  const T bounds_val,
637  const BoundsType bounds_type,
638  const IntervalType interval_type) {
639  switch (bounds_type) {
640  case BoundsType::Min:
641  switch (interval_type) {
643  return input >= bounds_val;
645  return input > bounds_val;
646  default:
647  UNREACHABLE();
648  }
649  case BoundsType::Max:
650  switch (interval_type) {
652  return input <= bounds_val;
654  return input < bounds_val;
655  default:
656  UNREACHABLE();
657  }
658  break;
659  default:
660  UNREACHABLE();
661  }
662  UNREACHABLE();
663  return false; // To address compiler warning
664 }
665 
666 template NEVER_INLINE HOST bool is_valid_tf_input(const int32_t input,
667  const int32_t bounds_val,
668  const BoundsType bounds_type,
669  const IntervalType interval_type);
670 
671 template NEVER_INLINE HOST bool is_valid_tf_input(const int64_t input,
672  const int64_t bounds_val,
673  const BoundsType bounds_type,
674  const IntervalType interval_type);
675 
676 template NEVER_INLINE HOST bool is_valid_tf_input(const float input,
677  const float bounds_val,
678  const BoundsType bounds_type,
679  const IntervalType interval_type);
680 
681 template NEVER_INLINE HOST bool is_valid_tf_input(const double input,
682  const double bounds_val,
683  const BoundsType bounds_type,
684  const IntervalType interval_type);
685 
686 #endif // #ifndef __CUDACC__
bool isKeyCachedAndSameLength(const std::string &key, const size_t num_bytes) const
std::regex glob_to_regex(const std::string &glob, bool case_sensitive=false)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
bool isKeyCached(const std::string &key) const
NEVER_INLINE HOST std::pair< T, T > get_column_min_max(const Column< T > &col)
DEVICE int64_t size() const
Definition: heavydbTypes.h:592
#define UNREACHABLE()
Definition: Logger.h:266
DEVICE T * getPtr() const
Definition: heavydbTypes.h:591
void z_std_normalize_col(const T *input_data, T *output_data, const int64_t num_rows, const double mean, const double std_dev)
std::vector< std::filesystem::path > get_fs_paths(const std::string &file_or_directory)
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const size_t max_inputs_per_thread
const T * getDataPtrForKey(const std::string &key) const
void putDataForKey(const std::string &key, T *const data_buffer, const size_t num_elements)
#define HOST
std::shared_ptr< T > getDataForKey(const std::string &key) const
bool isKeyCached(const std::string &key) const
void putDataForKey(const std::string &key, std::shared_ptr< T > const data)
void getDataForKey(const std::string &key, T *dest_buffer) const
DEVICE TextEncodingDict * getPtr() const
Definition: heavydbTypes.h:781
DEVICE bool isNull(int64_t index) const
Definition: heavydbTypes.h:595
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
EXTENSION_NOINLINE double distance_in_meters(const double fromlon, const double fromlat, const double tolon, const double tolat)
Computes the distance, in meters, between two WGS-84 positions.
const size_t parallel_copy_min_bytes
#define NEVER_INLINE
NEVER_INLINE HOST std::tuple< T, T, bool > get_column_metadata(const Column< T > &col)
std::vector< std::vector< T > > z_std_normalize_data(const std::vector< T * > &input_data, const int64_t num_rows)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
NEVER_INLINE HOST double get_column_std_dev(const Column< T > &col, const double mean)
DEVICE int64_t size() const
Definition: heavydbTypes.h:782
std::vector< std::string > glob(const std::string &pattern)
const T & getDataRefForKey(const std::string &key) const
std::shared_mutex cache_mutex_
NEVER_INLINE HOST bool is_valid_tf_input(const T input, const T bounds_val, const BoundsType bounds_type, const IntervalType interval_type)
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
NEVER_INLINE HOST double get_column_mean(const T *data, const int64_t num_rows)