OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionsCommon.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __CUDACC__
18 
19 #include "TableFunctionsCommon.h"
20 
21 #include <cstring> // std::memcpy
22 #include <filesystem>
23 #include <memory>
24 #include <mutex>
25 #include <regex>
26 #include <shared_mutex>
27 #include <string>
28 #include <unordered_map>
29 
30 #include <tbb/parallel_for.h>
31 #include <tbb/task_arena.h>
32 
33 #define NANOSECONDS_PER_SECOND 1000000000
34 
35 template <typename T>
36 NEVER_INLINE HOST std::pair<T, T> get_column_min_max(const Column<T>& col) {
37  T col_min = std::numeric_limits<T>::max();
38  T col_max = std::numeric_limits<T>::lowest();
39  const int64_t num_rows = col.size();
40  const size_t max_thread_count = std::thread::hardware_concurrency();
41  const size_t max_inputs_per_thread = 200000;
42  const size_t num_threads = std::min(
43  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
44 
45  std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
46  std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
47  tbb::task_arena limited_arena(num_threads);
48 
49  limited_arena.execute([&] {
50  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
51  [&](const tbb::blocked_range<int64_t>& r) {
52  const int64_t start_idx = r.begin();
53  const int64_t end_idx = r.end();
54  T local_col_min = std::numeric_limits<T>::max();
55  T local_col_max = std::numeric_limits<T>::lowest();
56  for (int64_t r = start_idx; r < end_idx; ++r) {
57  if (col.isNull(r)) {
58  continue;
59  }
60  if (col[r] < local_col_min) {
61  local_col_min = col[r];
62  }
63  if (col[r] > local_col_max) {
64  local_col_max = col[r];
65  }
66  }
67  size_t thread_idx = tbb::this_task_arena::current_thread_index();
68  if (local_col_min < local_col_mins[thread_idx]) {
69  local_col_mins[thread_idx] = local_col_min;
70  }
71  if (local_col_max > local_col_maxes[thread_idx]) {
72  local_col_maxes[thread_idx] = local_col_max;
73  }
74  });
75  });
76 
77  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
78  if (local_col_mins[thread_idx] < col_min) {
79  col_min = local_col_mins[thread_idx];
80  }
81  if (local_col_maxes[thread_idx] > col_max) {
82  col_max = local_col_maxes[thread_idx];
83  }
84  }
85  return std::make_pair(col_min, col_max);
86 }
87 
88 template NEVER_INLINE HOST std::pair<int8_t, int8_t> get_column_min_max(
89  const Column<int8_t>& col);
90 template NEVER_INLINE HOST std::pair<int16_t, int16_t> get_column_min_max(
91  const Column<int16_t>& col);
92 template NEVER_INLINE HOST std::pair<int32_t, int32_t> get_column_min_max(
93  const Column<int32_t>& col);
94 template NEVER_INLINE HOST std::pair<int64_t, int64_t> get_column_min_max(
95  const Column<int64_t>& col);
96 template NEVER_INLINE HOST std::pair<float, float> get_column_min_max(
97  const Column<float>& col);
98 template NEVER_INLINE HOST std::pair<double, double> get_column_min_max(
99  const Column<double>& col);
100 
101 std::pair<int32_t, int32_t> get_column_min_max(const Column<TextEncodingDict>& col) {
102  Column<int32_t> int_alias_col;
103  int_alias_col.ptr_ = reinterpret_cast<int32_t*>(col.ptr_);
104  int_alias_col.size_ = col.size_;
105  return get_column_min_max(int_alias_col);
106 }
107 
108 template <typename T>
110  return get_column_mean(col.ptr_, col.size_);
111 }
112 
113 template NEVER_INLINE HOST double get_column_mean(const Column<int32_t>& col);
114 template NEVER_INLINE HOST double get_column_mean(const Column<int64_t>& col);
115 template NEVER_INLINE HOST double get_column_mean(const Column<float>& col);
116 template NEVER_INLINE HOST double get_column_mean(const Column<double>& col);
117 
118 template <typename T>
119 NEVER_INLINE HOST double get_column_mean(const T* data, const int64_t num_rows) {
120  // const int64_t num_rows = col.size();
121  const size_t max_thread_count = std::thread::hardware_concurrency();
122  const size_t max_inputs_per_thread = 200000;
123  const size_t num_threads = std::min(
124  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
125 
126  std::vector<double> local_col_sums(num_threads, 0.);
127  std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
128  tbb::task_arena limited_arena(num_threads);
129 
130  limited_arena.execute([&] {
131  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
132  [&](const tbb::blocked_range<int64_t>& r) {
133  const int64_t start_idx = r.begin();
134  const int64_t end_idx = r.end();
135  double local_col_sum = 0.;
136  int64_t local_col_non_null_count = 0;
137  for (int64_t r = start_idx; r < end_idx; ++r) {
138  const T val = data[r];
139  if (val == inline_null_value<T>()) {
140  continue;
141  }
142  local_col_sum += data[r];
143  local_col_non_null_count++;
144  }
145  size_t thread_idx = tbb::this_task_arena::current_thread_index();
146  local_col_sums[thread_idx] += local_col_sum;
147  local_col_non_null_counts[thread_idx] += local_col_non_null_count;
148  });
149  });
150 
151  double col_sum = 0.0;
152  int64_t col_non_null_count = 0L;
153 
154  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
155  col_sum += local_col_sums[thread_idx];
156  col_non_null_count += local_col_non_null_counts[thread_idx];
157  }
158 
159  return col_non_null_count == 0 ? 0 : col_sum / col_non_null_count;
160 }
161 
162 // Todo(todd): we should use a functor approach for gathering whatever stats
163 // a table function needs so we're not repeating boilerplate code (although
164 // should confirm it doesn't have an adverse affect on performance).
165 // Leaving as a follow-up though until we have more examples of real-world
166 // usage patterns.
167 
168 template NEVER_INLINE HOST double get_column_mean(const int32_t* data,
169  const int64_t num_rows);
170 template NEVER_INLINE HOST double get_column_mean(const int64_t* data,
171  const int64_t num_rows);
172 template NEVER_INLINE HOST double get_column_mean(const float* data,
173  const int64_t num_rows);
174 template NEVER_INLINE HOST double get_column_mean(const double* data,
175  const int64_t num_rows);
176 
177 template <typename T>
178 NEVER_INLINE HOST double get_column_std_dev(const Column<T>& col, const double mean) {
179  return get_column_std_dev(col.ptr_, col.size_, mean);
180 }
181 
182 template NEVER_INLINE HOST double get_column_std_dev(const Column<int32_t>& col,
183  const double mean);
184 template NEVER_INLINE HOST double get_column_std_dev(const Column<int64_t>& col,
185  const double mean);
186 template NEVER_INLINE HOST double get_column_std_dev(const Column<float>& col,
187  const double mean);
188 template NEVER_INLINE HOST double get_column_std_dev(const Column<double>& col,
189  const double mean);
190 
191 template <typename T>
193  const int64_t num_rows,
194  const double mean) {
195  // const int64_t num_rows = col.size();
196  const size_t max_thread_count = std::thread::hardware_concurrency();
197  const size_t max_inputs_per_thread = 200000;
198  const size_t num_threads = std::min(
199  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
200 
201  std::vector<double> local_col_squared_residuals(num_threads, 0.);
202  std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
203  tbb::task_arena limited_arena(num_threads);
204 
205  limited_arena.execute([&] {
206  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
207  [&](const tbb::blocked_range<int64_t>& r) {
208  const int64_t start_idx = r.begin();
209  const int64_t end_idx = r.end();
210  double local_col_squared_residual = 0.;
211  int64_t local_col_non_null_count = 0;
212  for (int64_t r = start_idx; r < end_idx; ++r) {
213  const T val = data[r];
214  if (val == inline_null_value<T>()) {
215  continue;
216  }
217  const double residual = val - mean;
218  local_col_squared_residual += (residual * residual);
219  local_col_non_null_count++;
220  }
221  size_t thread_idx = tbb::this_task_arena::current_thread_index();
222  local_col_squared_residuals[thread_idx] +=
223  local_col_squared_residual;
224  local_col_non_null_counts[thread_idx] += local_col_non_null_count;
225  });
226  });
227 
228  double col_sum_squared_residual = 0.0;
229  int64_t col_non_null_count = 0;
230 
231  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
232  col_sum_squared_residual += local_col_squared_residuals[thread_idx];
233  col_non_null_count += local_col_non_null_counts[thread_idx];
234  }
235 
236  return col_non_null_count == 0 ? 0
237  : sqrt(col_sum_squared_residual / col_non_null_count);
238 }
239 
240 template NEVER_INLINE HOST double get_column_std_dev(const int32_t* data,
241  const int64_t num_rows,
242  const double mean);
243 template NEVER_INLINE HOST double get_column_std_dev(const int64_t* data,
244  const int64_t num_rows,
245  const double mean);
246 template NEVER_INLINE HOST double get_column_std_dev(const float* data,
247  const int64_t num_rows,
248  const double mean);
249 template NEVER_INLINE HOST double get_column_std_dev(const double* data,
250  const int64_t num_rows,
251  const double mean);
252 
253 template <typename T>
254 NEVER_INLINE HOST std::tuple<T, T, bool> get_column_metadata(const Column<T>& col) {
255  T col_min = std::numeric_limits<T>::max();
256  T col_max = std::numeric_limits<T>::lowest();
257  bool has_nulls = false;
258  const int64_t num_rows = col.size();
259  const size_t max_thread_count = std::thread::hardware_concurrency();
260  const size_t max_inputs_per_thread = 200000;
261  const size_t num_threads = std::min(
262  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
263 
264  std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
265  std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
266  std::vector<bool> local_col_has_nulls(num_threads, false);
267  tbb::task_arena limited_arena(num_threads);
268 
269  limited_arena.execute([&] {
270  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
271  [&](const tbb::blocked_range<int64_t>& r) {
272  const int64_t start_idx = r.begin();
273  const int64_t end_idx = r.end();
274  T local_col_min = std::numeric_limits<T>::max();
275  T local_col_max = std::numeric_limits<T>::lowest();
276  bool local_has_nulls = false;
277  for (int64_t r = start_idx; r < end_idx; ++r) {
278  if (col.isNull(r)) {
279  local_has_nulls = true;
280  continue;
281  }
282  if (col[r] < local_col_min) {
283  local_col_min = col[r];
284  }
285  if (col[r] > local_col_max) {
286  local_col_max = col[r];
287  }
288  }
289  const size_t thread_idx =
290  tbb::this_task_arena::current_thread_index();
291  if (local_has_nulls) {
292  local_col_has_nulls[thread_idx] = true;
293  }
294  if (local_col_min < local_col_mins[thread_idx]) {
295  local_col_mins[thread_idx] = local_col_min;
296  }
297  if (local_col_max > local_col_maxes[thread_idx]) {
298  local_col_maxes[thread_idx] = local_col_max;
299  }
300  });
301  });
302 
303  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
304  if (local_col_has_nulls[thread_idx]) {
305  has_nulls = true;
306  }
307  if (local_col_mins[thread_idx] < col_min) {
308  col_min = local_col_mins[thread_idx];
309  }
310  if (local_col_maxes[thread_idx] > col_max) {
311  col_max = local_col_maxes[thread_idx];
312  }
313  }
314  return {col_min, col_max, has_nulls};
315 }
316 
317 template NEVER_INLINE HOST std::tuple<int8_t, int8_t, bool> get_column_metadata(
318  const Column<int8_t>& col);
319 template NEVER_INLINE HOST std::tuple<int16_t, int16_t, bool> get_column_metadata(
320  const Column<int16_t>& col);
321 template NEVER_INLINE HOST std::tuple<int32_t, int32_t, bool> get_column_metadata(
322  const Column<int32_t>& col);
323 template NEVER_INLINE HOST std::tuple<int64_t, int64_t, bool> get_column_metadata(
324  const Column<int64_t>& col);
325 template NEVER_INLINE HOST std::tuple<float, float, bool> get_column_metadata(
326  const Column<float>& col);
327 template NEVER_INLINE HOST std::tuple<double, double, bool> get_column_metadata(
328  const Column<double>& col);
329 
330 std::tuple<int32_t, int32_t, bool> get_column_metadata(
331  const Column<TextEncodingDict>& col) {
332  Column<int32_t> int_alias_col;
333  int_alias_col.ptr_ = reinterpret_cast<int32_t*>(col.ptr_);
334  int_alias_col.size_ = col.size_;
335  return get_column_metadata(int_alias_col);
336 }
337 
338 template <typename T>
339 void z_std_normalize_col(const T* input_data,
340  T* output_data,
341  const int64_t num_rows,
342  const double mean,
343  const double std_dev) {
344  if (std_dev <= 0.0) {
345  throw std::runtime_error("Standard deviation cannot be <= 0");
346  }
347  const double inv_std_dev = 1.0 / std_dev;
348 
349  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
350  [&](const tbb::blocked_range<int64_t>& r) {
351  const int64_t start_idx = r.begin();
352  const int64_t end_idx = r.end();
353  for (int64_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
354  output_data[row_idx] = (input_data[row_idx] - mean) * inv_std_dev;
355  }
356  });
357 }
358 
359 template void z_std_normalize_col(const float* input_data,
360  float* output_data,
361  const int64_t num_rows,
362  const double mean,
363  const double std_dev);
364 template void z_std_normalize_col(const double* input_data,
365  double* output_data,
366  const int64_t num_rows,
367  const double mean,
368  const double std_dev);
369 
370 template <typename T>
371 std::vector<std::vector<T>> z_std_normalize_data(const std::vector<T*>& input_data,
372  const int64_t num_rows) {
373  const int64_t num_features = input_data.size();
374  std::vector<std::vector<T>> normalized_data(num_features);
375  for (int64_t feature_idx = 0; feature_idx < num_features; ++feature_idx) {
376  const auto mean = get_column_mean(input_data[feature_idx], num_rows);
377  const auto std_dev = get_column_std_dev(input_data[feature_idx], num_rows, mean);
378  normalized_data[feature_idx].resize(num_rows);
379  z_std_normalize_col(input_data[feature_idx],
380  normalized_data[feature_idx].data(),
381  num_rows,
382  mean,
383  std_dev);
384  }
385  return normalized_data;
386 }
387 
388 template std::vector<std::vector<float>> z_std_normalize_data(
389  const std::vector<float*>& input_data,
390  const int64_t num_rows);
391 template std::vector<std::vector<double>> z_std_normalize_data(
392  const std::vector<double*>& input_data,
393  const int64_t num_rows);
394 
395 template <typename T1, typename T2>
397 distance_in_meters(const T1 fromlon, const T1 fromlat, const T2 tolon, const T2 tolat) {
398  T1 latitudeArc = (fromlat - tolat) * 0.017453292519943295769236907684886;
399  T1 longitudeArc = (fromlon - tolon) * 0.017453292519943295769236907684886;
400  T1 latitudeH = sin(latitudeArc * 0.5);
401  latitudeH *= latitudeH;
402  T1 lontitudeH = sin(longitudeArc * 0.5);
403  lontitudeH *= lontitudeH;
404  T1 tmp = cos(fromlat * 0.017453292519943295769236907684886) *
405  cos(tolat * 0.017453292519943295769236907684886);
406  return 6372797.560856 * (2.0 * asin(sqrt(latitudeH + tmp * lontitudeH)));
407 }
408 
409 template NEVER_INLINE HOST float distance_in_meters(const float fromlon,
410  const float fromlat,
411  const float tolon,
412  const float tolat);
413 
414 template NEVER_INLINE HOST float distance_in_meters(const float fromlon,
415  const float fromlat,
416  const double tolon,
417  const double tolat);
418 
419 template NEVER_INLINE HOST double distance_in_meters(const double fromlon,
420  const double fromlat,
421  const float tolon,
422  const float tolat);
423 
424 template NEVER_INLINE HOST double distance_in_meters(const double fromlon,
425  const double fromlat,
426  const double tolon,
427  const double tolat);
428 
429 bool DataBufferCache::isKeyCached(const std::string& key) const {
430  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
431  return data_cache_.count(key) > 0;
432 }
433 
434 bool DataBufferCache::isKeyCachedAndSameLength(const std::string& key,
435  const size_t num_bytes) const {
436  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
437  const auto& cached_data_itr = data_cache_.find(key);
438  if (cached_data_itr == data_cache_.end()) {
439  return false;
440  }
441  return num_bytes == cached_data_itr->second->num_bytes;
442 }
443 
444 template <typename T>
445 void DataBufferCache::getDataForKey(const std::string& key, T* dest_buffer) const {
446  auto timer = DEBUG_TIMER(__func__);
447  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
448  const auto& cached_data_itr = data_cache_.find(key);
449  if (cached_data_itr == data_cache_.end()) {
450  const std::string error_msg = "Data for key " + key + " not found in cache.";
451  throw std::runtime_error(error_msg);
452  }
453  copyData(reinterpret_cast<int8_t*>(dest_buffer),
454  cached_data_itr->second->data_buffer,
455  cached_data_itr->second->num_bytes);
456 }
457 
458 template <typename T>
459 const T& DataBufferCache::getDataRefForKey(const std::string& key) const {
460  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
461  const auto& cached_data_itr = data_cache_.find(key);
462  if (cached_data_itr == data_cache_.end()) {
463  const std::string error_msg{"Data for key " + key + " not found in cache."};
464  throw std::runtime_error(error_msg);
465  }
466  return *reinterpret_cast<const T*>(cached_data_itr->second->data_buffer);
467 }
468 
469 template <typename T>
470 const T* DataBufferCache::getDataPtrForKey(const std::string& key) const {
471  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
472  const auto& cached_data_itr = data_cache_.find(key);
473  if (cached_data_itr == data_cache_.end()) {
474  return nullptr;
475  }
476  return reinterpret_cast<const T* const>(cached_data_itr->second->data_buffer);
477 }
478 
479 template <typename T>
480 void DataBufferCache::putDataForKey(const std::string& key,
481  T* const data_buffer,
482  const size_t num_elements) {
483  auto timer = DEBUG_TIMER(__func__);
484  const size_t num_bytes(num_elements * sizeof(T));
485  auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
486  copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
487  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
488  const auto& cached_data_itr = data_cache_.find(key);
489  if (data_cache_.find(key) != data_cache_.end()) {
490  const std::string warning_msg =
491  "Data for key " + key + " already exists in cache. Replacing.";
492  std::cout << warning_msg << std::endl;
493  cached_data_itr->second.reset();
494  cached_data_itr->second = cache_data;
495  return;
496  }
497  data_cache_.insert(std::make_pair(key, cache_data));
498 }
499 
501  const int8_t* source,
502  const size_t num_bytes) const {
503  if (num_bytes < parallel_copy_min_bytes) {
504  std::memcpy(dest, source, num_bytes);
505  return;
506  }
507  const size_t max_bytes_per_thread = parallel_copy_min_bytes;
508  const size_t num_threads =
509  (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
511  tbb::blocked_range<size_t>(0, num_threads, 1),
512  [&](const tbb::blocked_range<size_t>& r) {
513  const size_t end_chunk_idx = r.end();
514  for (size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
515  const size_t start_byte = chunk_idx * max_bytes_per_thread;
516  const size_t length =
517  std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
518  std::memcpy(dest + start_byte, source + start_byte, length);
519  }
520  });
521 }
522 
523 /* Definitions for DataCache */
524 
525 template <typename T>
526 bool DataCache<T>::isKeyCached(const std::string& key) const {
527  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
528  return data_cache_.count(key) > 0;
529 }
530 
531 template <typename T>
532 std::shared_ptr<T> DataCache<T>::getDataForKey(const std::string& key) const {
533  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
534  const auto& cached_data_itr = data_cache_.find(key);
535  if (cached_data_itr == data_cache_.end()) {
536  const std::string error_msg{"Data for key " + key + " not found in cache."};
537  throw std::runtime_error(error_msg);
538  }
539  return cached_data_itr->second;
540 }
541 
542 template <typename T>
543 void DataCache<T>::putDataForKey(const std::string& key, std::shared_ptr<T> const data) {
544  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
545  const auto& cached_data_itr = data_cache_.find(key);
546  if (cached_data_itr != data_cache_.end()) {
547  const std::string warning_msg =
548  "Data for key " + key + " already exists in cache. Replacing.";
549  std::cout << warning_msg << std::endl;
550  cached_data_itr->second.reset();
551  cached_data_itr->second = data;
552  }
553  data_cache_.insert(std::make_pair(key, data));
554 }
555 
556 namespace FileUtilities {
557 
558 // Following implementation taken from https://stackoverflow.com/a/65851545
559 
560 std::regex glob_to_regex(const std::string& glob, bool case_sensitive = false) {
561  // Note It is possible to automate checking if filesystem is case sensitive or not (e.g.
562  // by performing a test first time this function is ran)
563  std::string regex_string{glob};
564  // Escape all regex special chars:
565  regex_string = std::regex_replace(regex_string, std::regex("\\\\"), "\\\\");
566  regex_string = std::regex_replace(regex_string, std::regex("\\^"), "\\^");
567  regex_string = std::regex_replace(regex_string, std::regex("\\."), "\\.");
568  regex_string = std::regex_replace(regex_string, std::regex("\\$"), "\\$");
569  regex_string = std::regex_replace(regex_string, std::regex("\\|"), "\\|");
570  regex_string = std::regex_replace(regex_string, std::regex("\\("), "\\(");
571  regex_string = std::regex_replace(regex_string, std::regex("\\)"), "\\)");
572  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\{");
573  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\}");
574  regex_string = std::regex_replace(regex_string, std::regex("\\["), "\\[");
575  regex_string = std::regex_replace(regex_string, std::regex("\\]"), "\\]");
576  regex_string = std::regex_replace(regex_string, std::regex("\\+"), "\\+");
577  regex_string = std::regex_replace(regex_string, std::regex("\\/"), "\\/");
578  // Convert wildcard specific chars '*?' to their regex equivalents:
579  regex_string = std::regex_replace(regex_string, std::regex("\\?"), ".");
580  regex_string = std::regex_replace(regex_string, std::regex("\\*"), ".*");
581 
582  return std::regex(
583  regex_string,
584  case_sensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
585 }
586 
587 std::vector<std::filesystem::path> get_fs_paths(const std::string& file_or_directory) {
588  const std::filesystem::path file_or_directory_path(file_or_directory);
589  const auto file_status = std::filesystem::status(file_or_directory_path);
590 
591  std::vector<std::filesystem::path> fs_paths;
592  if (std::filesystem::is_regular_file(file_status)) {
593  fs_paths.emplace_back(file_or_directory_path);
594  return fs_paths;
595  } else if (std::filesystem::is_directory(file_status)) {
596  for (std::filesystem::directory_entry const& entry :
597  std::filesystem::directory_iterator(file_or_directory_path)) {
598  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
599  fs_paths.emplace_back(entry.path());
600  }
601  }
602  return fs_paths;
603  } else {
604  const auto parent_path = file_or_directory_path.parent_path();
605  const auto parent_status = std::filesystem::status(parent_path);
606  if (std::filesystem::is_directory(parent_status)) {
607  const auto file_glob = file_or_directory_path.filename();
608  const std::regex glob_regex{glob_to_regex(file_glob.string(), false)};
609 
610  for (std::filesystem::directory_entry const& entry :
611  std::filesystem::directory_iterator(parent_path)) {
612  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
613  const auto entry_filename = entry.path().filename().string();
614  if (std::regex_match(entry_filename, glob_regex)) {
615  fs_paths.emplace_back(entry.path());
616  }
617  }
618  }
619  return fs_paths;
620  }
621  }
622  return fs_paths;
623 }
624 
625 } // namespace FileUtilities
626 
627 template <typename T>
629  const T bounds_val,
630  const BoundsType bounds_type,
631  const IntervalType interval_type) {
632  switch (bounds_type) {
633  case BoundsType::Min:
634  switch (interval_type) {
636  return input >= bounds_val;
638  return input > bounds_val;
639  default:
640  UNREACHABLE();
641  }
642  case BoundsType::Max:
643  switch (interval_type) {
645  return input <= bounds_val;
647  return input < bounds_val;
648  default:
649  UNREACHABLE();
650  }
651  break;
652  default:
653  UNREACHABLE();
654  }
655  UNREACHABLE();
656  return false; // To address compiler warning
657 }
658 
659 template NEVER_INLINE HOST bool is_valid_tf_input(const int32_t input,
660  const int32_t bounds_val,
661  const BoundsType bounds_type,
662  const IntervalType interval_type);
663 
664 template NEVER_INLINE HOST bool is_valid_tf_input(const int64_t input,
665  const int64_t bounds_val,
666  const BoundsType bounds_type,
667  const IntervalType interval_type);
668 
669 template NEVER_INLINE HOST bool is_valid_tf_input(const float input,
670  const float bounds_val,
671  const BoundsType bounds_type,
672  const IntervalType interval_type);
673 
674 template NEVER_INLINE HOST bool is_valid_tf_input(const double input,
675  const double bounds_val,
676  const BoundsType bounds_type,
677  const IntervalType interval_type);
678 
679 #endif // #ifndef __CUDACC__
bool isKeyCachedAndSameLength(const std::string &key, const size_t num_bytes) const
std::regex glob_to_regex(const std::string &glob, bool case_sensitive=false)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
bool isKeyCached(const std::string &key) const
NEVER_INLINE HOST std::pair< T, T > get_column_min_max(const Column< T > &col)
DEVICE int64_t size() const
Definition: heavydbTypes.h:469
#define UNREACHABLE()
Definition: Logger.h:266
T * ptr_
Definition: heavydbTypes.h:454
void z_std_normalize_col(const T *input_data, T *output_data, const int64_t num_rows, const double mean, const double std_dev)
std::vector< std::filesystem::path > get_fs_paths(const std::string &file_or_directory)
NEVER_INLINE HOST double get_column_mean(const Column< T > &col)
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const size_t max_inputs_per_thread
const T * getDataPtrForKey(const std::string &key) const
void putDataForKey(const std::string &key, T *const data_buffer, const size_t num_elements)
#define HOST
std::shared_ptr< T > getDataForKey(const std::string &key) const
bool isKeyCached(const std::string &key) const
void putDataForKey(const std::string &key, std::shared_ptr< T > const data)
void getDataForKey(const std::string &key, T *dest_buffer) const
TextEncodingDict * ptr_
Definition: heavydbTypes.h:502
DEVICE bool isNull(int64_t index) const
Definition: heavydbTypes.h:471
int64_t size_
Definition: heavydbTypes.h:455
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
EXTENSION_NOINLINE double distance_in_meters(const double fromlon, const double fromlat, const double tolon, const double tolat)
Computes the distance, in meters, between two WGS-84 positions.
const size_t parallel_copy_min_bytes
#define NEVER_INLINE
NEVER_INLINE HOST std::tuple< T, T, bool > get_column_metadata(const Column< T > &col)
std::vector< std::vector< T > > z_std_normalize_data(const std::vector< T * > &input_data, const int64_t num_rows)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
NEVER_INLINE HOST double get_column_std_dev(const Column< T > &col, const double mean)
std::vector< std::string > glob(const std::string &pattern)
const T & getDataRefForKey(const std::string &key) const
std::shared_mutex cache_mutex_
NEVER_INLINE HOST bool is_valid_tf_input(const T input, const T bounds_val, const BoundsType bounds_type, const IntervalType interval_type)
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_