OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionsCommon.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __CUDACC__
18 
19 #include <cstring> // std::memcpy
20 #include <filesystem>
21 #include <memory>
22 #include <mutex>
23 #include <regex>
24 #include <shared_mutex>
25 #include <string>
26 #include <unordered_map>
27 
28 #include <tbb/parallel_for.h>
29 #include <tbb/task_arena.h>
30 #include <tbb/task_group.h>
31 
32 #include "TableFunctionsCommon.h"
33 
34 #define NANOSECONDS_PER_SECOND 1000000000
35 
36 template <typename T>
37 TEMPLATE_NOINLINE std::pair<T, T> get_column_min_max(const Column<T>& col) {
38  T col_min = std::numeric_limits<T>::max();
39  T col_max = std::numeric_limits<T>::lowest();
40  const int64_t num_rows = col.size();
41  const size_t max_thread_count = std::thread::hardware_concurrency();
42  const size_t max_inputs_per_thread = 200000;
43  const size_t num_threads = std::min(
44  max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
45 
46  std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
47  std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
48  tbb::task_arena limited_arena(num_threads);
49  tbb::task_group tg;
50 
51  limited_arena.execute([&] {
52  tg.run([&] {
53  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
54  [&](const tbb::blocked_range<int64_t>& r) {
55  const int64_t start_idx = r.begin();
56  const int64_t end_idx = r.end();
57  T local_col_min = std::numeric_limits<T>::max();
58  T local_col_max = std::numeric_limits<T>::lowest();
59  for (int64_t r = start_idx; r < end_idx; ++r) {
60  if (col.isNull(r)) {
61  continue;
62  }
63  if (col[r] < local_col_min) {
64  local_col_min = col[r];
65  }
66  if (col[r] > local_col_max) {
67  local_col_max = col[r];
68  }
69  }
70  size_t thread_idx =
71  tbb::this_task_arena::current_thread_index();
72  if (local_col_min < local_col_mins[thread_idx]) {
73  local_col_mins[thread_idx] = local_col_min;
74  }
75  if (local_col_max > local_col_maxes[thread_idx]) {
76  local_col_maxes[thread_idx] = local_col_max;
77  }
78  });
79  });
80  });
81 
82  limited_arena.execute([&] { tg.wait(); });
83 
84  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
85  if (local_col_mins[thread_idx] < col_min) {
86  col_min = local_col_mins[thread_idx];
87  }
88  if (local_col_maxes[thread_idx] > col_max) {
89  col_max = local_col_maxes[thread_idx];
90  }
91  }
92  return std::make_pair(col_min, col_max);
93 }
94 
95 std::pair<int32_t, int32_t> get_column_min_max(const Column<TextEncodingDict>& col) {
96  Column<int32_t> int_alias_col;
97  int_alias_col.ptr_ = reinterpret_cast<int32_t*>(col.ptr_);
98  int_alias_col.size_ = col.size_;
99  return get_column_min_max(int_alias_col);
100 }
101 
102 template <typename T1, typename T2>
104 distance_in_meters(const T1 fromlon, const T1 fromlat, const T2 tolon, const T2 tolat) {
105  T1 latitudeArc = (fromlat - tolat) * 0.017453292519943295769236907684886;
106  T1 longitudeArc = (fromlon - tolon) * 0.017453292519943295769236907684886;
107  T1 latitudeH = sin(latitudeArc * 0.5);
108  latitudeH *= latitudeH;
109  T1 lontitudeH = sin(longitudeArc * 0.5);
110  lontitudeH *= lontitudeH;
111  T1 tmp = cos(fromlat * 0.017453292519943295769236907684886) *
112  cos(tolat * 0.017453292519943295769236907684886);
113  return 6372797.560856 * (2.0 * asin(sqrt(latitudeH + tmp * lontitudeH)));
114 }
115 
116 bool DataBufferCache::isKeyCached(const std::string& key) const {
117  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
118  return data_cache_.count(key) > 0;
119 }
120 
121 bool DataBufferCache::isKeyCachedAndSameLength(const std::string& key,
122  const size_t num_bytes) const {
123  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
124  const auto& cached_data_itr = data_cache_.find(key);
125  if (cached_data_itr == data_cache_.end()) {
126  return false;
127  }
128  return num_bytes == cached_data_itr->second->num_bytes;
129 }
130 
131 template <typename T>
132 void DataBufferCache::getDataForKey(const std::string& key, T* dest_buffer) const {
133  auto timer = DEBUG_TIMER(__func__);
134  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
135  const auto& cached_data_itr = data_cache_.find(key);
136  if (cached_data_itr == data_cache_.end()) {
137  const std::string error_msg = "Data for key " + key + " not found in cache.";
138  throw std::runtime_error(error_msg);
139  }
140  copyData(reinterpret_cast<int8_t*>(dest_buffer),
141  cached_data_itr->second->data_buffer,
142  cached_data_itr->second->num_bytes);
143 }
144 
145 template <typename T>
146 const T& DataBufferCache::getDataRefForKey(const std::string& key) const {
147  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
148  const auto& cached_data_itr = data_cache_.find(key);
149  if (cached_data_itr == data_cache_.end()) {
150  const std::string error_msg{"Data for key " + key + " not found in cache."};
151  throw std::runtime_error(error_msg);
152  }
153  return *reinterpret_cast<const T*>(cached_data_itr->second->data_buffer);
154 }
155 
156 template <typename T>
157 const T* DataBufferCache::getDataPtrForKey(const std::string& key) const {
158  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
159  const auto& cached_data_itr = data_cache_.find(key);
160  if (cached_data_itr == data_cache_.end()) {
161  return nullptr;
162  }
163  return reinterpret_cast<const T* const>(cached_data_itr->second->data_buffer);
164 }
165 
166 template <typename T>
167 void DataBufferCache::putDataForKey(const std::string& key,
168  T* const data_buffer,
169  const size_t num_elements) {
170  auto timer = DEBUG_TIMER(__func__);
171  const size_t num_bytes(num_elements * sizeof(T));
172  auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
173  copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
174  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
175  const auto& cached_data_itr = data_cache_.find(key);
176  if (data_cache_.find(key) != data_cache_.end()) {
177  const std::string warning_msg =
178  "Data for key " + key + " already exists in cache. Replacing.";
179  std::cout << warning_msg << std::endl;
180  cached_data_itr->second.reset();
181  cached_data_itr->second = cache_data;
182  return;
183  }
184  data_cache_.insert(std::make_pair(key, cache_data));
185 }
186 
187 void DataBufferCache::copyData(int8_t* dest,
188  const int8_t* source,
189  const size_t num_bytes) const {
190  if (num_bytes < parallel_copy_min_bytes) {
191  std::memcpy(dest, source, num_bytes);
192  return;
193  }
194  const size_t max_bytes_per_thread = parallel_copy_min_bytes;
195  const size_t num_threads =
196  (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
198  tbb::blocked_range<size_t>(0, num_threads, 1),
199  [&](const tbb::blocked_range<size_t>& r) {
200  const size_t end_chunk_idx = r.end();
201  for (size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
202  const size_t start_byte = chunk_idx * max_bytes_per_thread;
203  const size_t length =
204  std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
205  std::memcpy(dest + start_byte, source + start_byte, length);
206  }
207  });
208 }
209 
210 /* Definitions for DataCache */
211 
212 template <typename T>
213 bool DataCache<T>::isKeyCached(const std::string& key) const {
214  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
215  return data_cache_.count(key) > 0;
216 }
217 
218 template <typename T>
219 std::shared_ptr<T> DataCache<T>::getDataForKey(const std::string& key) const {
220  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
221  const auto& cached_data_itr = data_cache_.find(key);
222  if (cached_data_itr == data_cache_.end()) {
223  const std::string error_msg{"Data for key " + key + " not found in cache."};
224  throw std::runtime_error(error_msg);
225  }
226  return cached_data_itr->second;
227 }
228 
229 template <typename T>
230 void DataCache<T>::putDataForKey(const std::string& key, std::shared_ptr<T> const data) {
231  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
232  const auto& cached_data_itr = data_cache_.find(key);
233  if (cached_data_itr != data_cache_.end()) {
234  const std::string warning_msg =
235  "Data for key " + key + " already exists in cache. Replacing.";
236  std::cout << warning_msg << std::endl;
237  cached_data_itr->second.reset();
238  cached_data_itr->second = data;
239  }
240  data_cache_.insert(std::make_pair(key, data));
241 }
242 
243 namespace FileUtilities {
244 
245 // Following implementation taken from https://stackoverflow.com/a/65851545
246 
247 std::regex glob_to_regex(const std::string& glob, bool case_sensitive = false) {
248  // Note It is possible to automate checking if filesystem is case sensitive or not (e.g.
249  // by performing a test first time this function is ran)
250  std::string regex_string{glob};
251  // Escape all regex special chars:
252  regex_string = std::regex_replace(regex_string, std::regex("\\\\"), "\\\\");
253  regex_string = std::regex_replace(regex_string, std::regex("\\^"), "\\^");
254  regex_string = std::regex_replace(regex_string, std::regex("\\."), "\\.");
255  regex_string = std::regex_replace(regex_string, std::regex("\\$"), "\\$");
256  regex_string = std::regex_replace(regex_string, std::regex("\\|"), "\\|");
257  regex_string = std::regex_replace(regex_string, std::regex("\\("), "\\(");
258  regex_string = std::regex_replace(regex_string, std::regex("\\)"), "\\)");
259  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\{");
260  regex_string = std::regex_replace(regex_string, std::regex("\\{"), "\\}");
261  regex_string = std::regex_replace(regex_string, std::regex("\\["), "\\[");
262  regex_string = std::regex_replace(regex_string, std::regex("\\]"), "\\]");
263  regex_string = std::regex_replace(regex_string, std::regex("\\+"), "\\+");
264  regex_string = std::regex_replace(regex_string, std::regex("\\/"), "\\/");
265  // Convert wildcard specific chars '*?' to their regex equivalents:
266  regex_string = std::regex_replace(regex_string, std::regex("\\?"), ".");
267  regex_string = std::regex_replace(regex_string, std::regex("\\*"), ".*");
268 
269  return std::regex(
270  regex_string,
271  case_sensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
272 }
273 
274 std::vector<std::filesystem::path> get_fs_paths(const std::string& file_or_directory) {
275  const std::filesystem::path file_or_directory_path(file_or_directory);
276  const auto file_status = std::filesystem::status(file_or_directory_path);
277 
278  std::vector<std::filesystem::path> fs_paths;
279  if (std::filesystem::is_regular_file(file_status)) {
280  fs_paths.emplace_back(file_or_directory_path);
281  return fs_paths;
282  } else if (std::filesystem::is_directory(file_status)) {
283  for (std::filesystem::directory_entry const& entry :
284  std::filesystem::directory_iterator(file_or_directory_path)) {
285  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
286  fs_paths.emplace_back(entry.path());
287  }
288  }
289  return fs_paths;
290  } else {
291  const auto parent_path = file_or_directory_path.parent_path();
292  const auto parent_status = std::filesystem::status(parent_path);
293  if (std::filesystem::is_directory(parent_status)) {
294  const auto file_glob = file_or_directory_path.filename();
295  const std::regex glob_regex{glob_to_regex(file_glob.string(), false)};
296 
297  for (std::filesystem::directory_entry const& entry :
298  std::filesystem::directory_iterator(parent_path)) {
299  if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
300  const auto entry_filename = entry.path().filename().string();
301  if (std::regex_match(entry_filename, glob_regex)) {
302  fs_paths.emplace_back(entry.path());
303  }
304  }
305  }
306  return fs_paths;
307  }
308  }
309  return fs_paths;
310 }
311 
312 } // namespace FileUtilities
313 
314 template <typename T>
315 bool is_valid_tf_input(const T input,
316  const T bounds_val,
317  const BoundsType bounds_type,
318  const IntervalType interval_type) {
319  switch (bounds_type) {
320  case BoundsType::Min:
321  switch (interval_type) {
323  return input >= bounds_val;
325  return input > bounds_val;
326  default:
327  UNREACHABLE();
328  }
329  case BoundsType::Max:
330  switch (interval_type) {
332  return input <= bounds_val;
334  return input < bounds_val;
335  default:
336  UNREACHABLE();
337  }
338  break;
339  default:
340  UNREACHABLE();
341  }
342  UNREACHABLE();
343  return false; // To address compiler warning
344 }
345 
346 #endif // __CUDACC__
bool isKeyCachedAndSameLength(const std::string &key, const size_t num_bytes) const
std::regex glob_to_regex(const std::string &glob, bool case_sensitive=false)
bool isKeyCached(const std::string &key) const
#define const
DEVICE int64_t size() const
Definition: OmniSciTypes.h:248
#define TEMPLATE_NOINLINE
Definition: OmniSciTypes.h:36
#define UNREACHABLE()
Definition: Logger.h:255
T * ptr_
Definition: OmniSciTypes.h:229
TEMPLATE_NOINLINE std::pair< T, T > get_column_min_max(const Column< T > &col)
std::vector< std::filesystem::path > get_fs_paths(const std::string &file_or_directory)
const size_t max_inputs_per_thread
const T * getDataPtrForKey(const std::string &key) const
void putDataForKey(const std::string &key, T *const data_buffer, const size_t num_elements)
std::vector< std::string > glob(const std::string &pattern)
std::shared_ptr< T > getDataForKey(const std::string &key) const
bool isKeyCached(const std::string &key) const
bool is_valid_tf_input(const T input, const T bounds_val, const BoundsType bounds_type, const IntervalType interval_type)
void putDataForKey(const std::string &key, std::shared_ptr< T > const data)
void getDataForKey(const std::string &key, T *dest_buffer) const
DEVICE bool isNull(int64_t index) const
Definition: OmniSciTypes.h:250
int64_t size_
Definition: OmniSciTypes.h:230
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
EXTENSION_NOINLINE double distance_in_meters(const double fromlon, const double fromlat, const double tolon, const double tolat)
Computes the distance, in meters, between two WGS-84 positions.
mapd_shared_lock< mapd_shared_mutex > read_lock
#define DEBUG_TIMER(name)
Definition: Logger.h:358
mapd_unique_lock< mapd_shared_mutex > write_lock
const T & getDataRefForKey(const std::string &key) const