26 #include <shared_mutex>
28 #include <unordered_map>
30 #include <tbb/parallel_for.h>
31 #include <tbb/task_arena.h>
33 #define NANOSECONDS_PER_SECOND 1000000000
37 T col_min = std::numeric_limits<T>::max();
38 T col_max = std::numeric_limits<T>::lowest();
39 const int64_t num_rows = col.
size();
40 const size_t max_thread_count = std::thread::hardware_concurrency();
42 const size_t num_threads = std::min(
43 max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
45 std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
46 std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
47 tbb::task_arena limited_arena(num_threads);
49 limited_arena.execute([&] {
51 [&](
const tbb::blocked_range<int64_t>& r) {
52 const int64_t start_idx = r.begin();
53 const int64_t end_idx = r.end();
54 T local_col_min = std::numeric_limits<T>::max();
55 T local_col_max = std::numeric_limits<T>::lowest();
56 for (int64_t r = start_idx; r < end_idx; ++r) {
60 if (col[r] < local_col_min) {
61 local_col_min = col[r];
63 if (col[r] > local_col_max) {
64 local_col_max = col[r];
67 size_t thread_idx = tbb::this_task_arena::current_thread_index();
68 if (local_col_min < local_col_mins[thread_idx]) {
69 local_col_mins[thread_idx] = local_col_min;
71 if (local_col_max > local_col_maxes[thread_idx]) {
72 local_col_maxes[thread_idx] = local_col_max;
77 for (
size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
78 if (local_col_mins[thread_idx] < col_min) {
79 col_min = local_col_mins[thread_idx];
81 if (local_col_maxes[thread_idx] > col_max) {
82 col_max = local_col_maxes[thread_idx];
85 return std::make_pair(col_min, col_max);
103 int_alias_col.
ptr_ =
reinterpret_cast<int32_t*
>(col.
ptr_);
108 template <
typename T>
118 template <
typename T>
121 const size_t max_thread_count = std::thread::hardware_concurrency();
123 const size_t num_threads = std::min(
124 max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
126 std::vector<double> local_col_sums(num_threads, 0.);
127 std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
128 tbb::task_arena limited_arena(num_threads);
130 limited_arena.execute([&] {
132 [&](
const tbb::blocked_range<int64_t>& r) {
133 const int64_t start_idx = r.begin();
134 const int64_t end_idx = r.end();
135 double local_col_sum = 0.;
136 int64_t local_col_non_null_count = 0;
137 for (int64_t r = start_idx; r < end_idx; ++r) {
138 const T val = data[r];
139 if (val == inline_null_value<T>()) {
142 local_col_sum += data[r];
143 local_col_non_null_count++;
145 size_t thread_idx = tbb::this_task_arena::current_thread_index();
146 local_col_sums[thread_idx] += local_col_sum;
147 local_col_non_null_counts[thread_idx] += local_col_non_null_count;
151 double col_sum = 0.0;
152 int64_t col_non_null_count = 0L;
154 for (
size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
155 col_sum += local_col_sums[thread_idx];
156 col_non_null_count += local_col_non_null_counts[thread_idx];
159 return col_non_null_count == 0 ? 0 : col_sum / col_non_null_count;
169 const int64_t num_rows);
171 const int64_t num_rows);
173 const int64_t num_rows);
175 const int64_t num_rows);
177 template <
typename T>
191 template <
typename T>
193 const int64_t num_rows,
196 const size_t max_thread_count = std::thread::hardware_concurrency();
198 const size_t num_threads = std::min(
199 max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
201 std::vector<double> local_col_squared_residuals(num_threads, 0.);
202 std::vector<int64_t> local_col_non_null_counts(num_threads, 0L);
203 tbb::task_arena limited_arena(num_threads);
205 limited_arena.execute([&] {
207 [&](
const tbb::blocked_range<int64_t>& r) {
208 const int64_t start_idx = r.begin();
209 const int64_t end_idx = r.end();
210 double local_col_squared_residual = 0.;
211 int64_t local_col_non_null_count = 0;
212 for (int64_t r = start_idx; r < end_idx; ++r) {
213 const T val = data[r];
214 if (val == inline_null_value<T>()) {
217 const double residual = val - mean;
218 local_col_squared_residual += (residual * residual);
219 local_col_non_null_count++;
221 size_t thread_idx = tbb::this_task_arena::current_thread_index();
222 local_col_squared_residuals[thread_idx] +=
223 local_col_squared_residual;
224 local_col_non_null_counts[thread_idx] += local_col_non_null_count;
228 double col_sum_squared_residual = 0.0;
229 int64_t col_non_null_count = 0;
231 for (
size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
232 col_sum_squared_residual += local_col_squared_residuals[thread_idx];
233 col_non_null_count += local_col_non_null_counts[thread_idx];
236 return col_non_null_count == 0 ? 0
237 : sqrt(col_sum_squared_residual / col_non_null_count);
241 const int64_t num_rows,
244 const int64_t num_rows,
247 const int64_t num_rows,
250 const int64_t num_rows,
253 template <
typename T>
255 T col_min = std::numeric_limits<T>::max();
256 T col_max = std::numeric_limits<T>::lowest();
257 bool has_nulls =
false;
258 const int64_t num_rows = col.
size();
259 const size_t max_thread_count = std::thread::hardware_concurrency();
261 const size_t num_threads = std::min(
262 max_thread_count, ((num_rows + max_inputs_per_thread - 1) / max_inputs_per_thread));
264 std::vector<T> local_col_mins(num_threads, std::numeric_limits<T>::max());
265 std::vector<T> local_col_maxes(num_threads, std::numeric_limits<T>::lowest());
266 std::vector<bool> local_col_has_nulls(num_threads,
false);
267 tbb::task_arena limited_arena(num_threads);
269 limited_arena.execute([&] {
271 [&](
const tbb::blocked_range<int64_t>& r) {
272 const int64_t start_idx = r.begin();
273 const int64_t end_idx = r.end();
274 T local_col_min = std::numeric_limits<T>::max();
275 T local_col_max = std::numeric_limits<T>::lowest();
276 bool local_has_nulls =
false;
277 for (int64_t r = start_idx; r < end_idx; ++r) {
279 local_has_nulls =
true;
282 if (col[r] < local_col_min) {
283 local_col_min = col[r];
285 if (col[r] > local_col_max) {
286 local_col_max = col[r];
289 const size_t thread_idx =
290 tbb::this_task_arena::current_thread_index();
291 if (local_has_nulls) {
292 local_col_has_nulls[thread_idx] =
true;
294 if (local_col_min < local_col_mins[thread_idx]) {
295 local_col_mins[thread_idx] = local_col_min;
297 if (local_col_max > local_col_maxes[thread_idx]) {
298 local_col_maxes[thread_idx] = local_col_max;
303 for (
size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
304 if (local_col_has_nulls[thread_idx]) {
307 if (local_col_mins[thread_idx] < col_min) {
308 col_min = local_col_mins[thread_idx];
310 if (local_col_maxes[thread_idx] > col_max) {
311 col_max = local_col_maxes[thread_idx];
314 return {col_min, col_max, has_nulls};
333 int_alias_col.
ptr_ =
reinterpret_cast<int32_t*
>(col.
ptr_);
338 template <
typename T>
341 const int64_t num_rows,
343 const double std_dev) {
344 if (std_dev <= 0.0) {
345 throw std::runtime_error(
"Standard deviation cannot be <= 0");
347 const double inv_std_dev = 1.0 / std_dev;
350 [&](
const tbb::blocked_range<int64_t>& r) {
351 const int64_t start_idx = r.begin();
352 const int64_t end_idx = r.end();
353 for (int64_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
354 output_data[row_idx] = (input_data[row_idx] - mean) * inv_std_dev;
361 const int64_t num_rows,
363 const double std_dev);
366 const int64_t num_rows,
368 const double std_dev);
370 template <
typename T>
372 const int64_t num_rows) {
373 const int64_t num_features = input_data.size();
374 std::vector<std::vector<T>> normalized_data(num_features);
375 for (int64_t feature_idx = 0; feature_idx < num_features; ++feature_idx) {
378 normalized_data[feature_idx].resize(num_rows);
380 normalized_data[feature_idx].data(),
385 return normalized_data;
389 const std::vector<float*>& input_data,
390 const int64_t num_rows);
392 const std::vector<double*>& input_data,
393 const int64_t num_rows);
395 template <
typename T1,
typename T2>
398 T1 latitudeArc = (fromlat - tolat) * 0.017453292519943295769236907684886;
399 T1 longitudeArc = (fromlon - tolon) * 0.017453292519943295769236907684886;
400 T1 latitudeH = sin(latitudeArc * 0.5);
401 latitudeH *= latitudeH;
402 T1 lontitudeH = sin(longitudeArc * 0.5);
403 lontitudeH *= lontitudeH;
404 T1 tmp = cos(fromlat * 0.017453292519943295769236907684886) *
405 cos(tolat * 0.017453292519943295769236907684886);
406 return 6372797.560856 * (2.0 * asin(sqrt(latitudeH + tmp * lontitudeH)));
420 const double fromlat,
425 const double fromlat,
435 const size_t num_bytes)
const {
437 const auto& cached_data_itr =
data_cache_.find(key);
441 return num_bytes == cached_data_itr->second->num_bytes;
444 template <
typename T>
448 const auto& cached_data_itr =
data_cache_.find(key);
450 const std::string error_msg =
"Data for key " + key +
" not found in cache.";
451 throw std::runtime_error(error_msg);
453 copyData(reinterpret_cast<int8_t*>(dest_buffer),
454 cached_data_itr->second->data_buffer,
455 cached_data_itr->second->num_bytes);
458 template <
typename T>
461 const auto& cached_data_itr =
data_cache_.find(key);
463 const std::string error_msg{
"Data for key " + key +
" not found in cache."};
464 throw std::runtime_error(error_msg);
466 return *
reinterpret_cast<const T*
>(cached_data_itr->second->data_buffer);
469 template <
typename T>
472 const auto& cached_data_itr =
data_cache_.find(key);
476 return reinterpret_cast<const T* const
>(cached_data_itr->second->data_buffer);
479 template <
typename T>
481 T*
const data_buffer,
482 const size_t num_elements) {
484 const size_t num_bytes(num_elements *
sizeof(
T));
485 auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
486 copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
488 const auto& cached_data_itr =
data_cache_.find(key);
490 const std::string warning_msg =
491 "Data for key " + key +
" already exists in cache. Replacing.";
492 std::cout << warning_msg << std::endl;
493 cached_data_itr->second.reset();
494 cached_data_itr->second = cache_data;
497 data_cache_.insert(std::make_pair(key, cache_data));
501 const int8_t* source,
502 const size_t num_bytes)
const {
504 std::memcpy(dest, source, num_bytes);
508 const size_t num_threads =
509 (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
511 tbb::blocked_range<size_t>(0, num_threads, 1),
512 [&](
const tbb::blocked_range<size_t>& r) {
513 const size_t end_chunk_idx = r.end();
514 for (
size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
515 const size_t start_byte = chunk_idx * max_bytes_per_thread;
516 const size_t length =
517 std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
518 std::memcpy(dest + start_byte, source + start_byte, length);
525 template <
typename T>
527 std::shared_lock<std::shared_mutex>
read_lock(cache_mutex_);
528 return data_cache_.count(key) > 0;
531 template <
typename T>
533 std::shared_lock<std::shared_mutex>
read_lock(cache_mutex_);
534 const auto& cached_data_itr = data_cache_.find(key);
535 if (cached_data_itr == data_cache_.end()) {
536 const std::string error_msg{
"Data for key " + key +
" not found in cache."};
537 throw std::runtime_error(error_msg);
539 return cached_data_itr->second;
542 template <
typename T>
544 std::unique_lock<std::shared_mutex>
write_lock(cache_mutex_);
545 const auto& cached_data_itr = data_cache_.find(key);
546 if (cached_data_itr != data_cache_.end()) {
547 const std::string warning_msg =
548 "Data for key " + key +
" already exists in cache. Replacing.";
549 std::cout << warning_msg << std::endl;
550 cached_data_itr->second.reset();
551 cached_data_itr->second = data;
553 data_cache_.insert(std::make_pair(key, data));
556 namespace FileUtilities {
563 std::string regex_string{glob};
565 regex_string = std::regex_replace(regex_string, std::regex(
"\\\\"),
"\\\\");
566 regex_string = std::regex_replace(regex_string, std::regex(
"\\^"),
"\\^");
567 regex_string = std::regex_replace(regex_string, std::regex(
"\\."),
"\\.");
568 regex_string = std::regex_replace(regex_string, std::regex(
"\\$"),
"\\$");
569 regex_string = std::regex_replace(regex_string, std::regex(
"\\|"),
"\\|");
570 regex_string = std::regex_replace(regex_string, std::regex(
"\\("),
"\\(");
571 regex_string = std::regex_replace(regex_string, std::regex(
"\\)"),
"\\)");
572 regex_string = std::regex_replace(regex_string, std::regex(
"\\{"),
"\\{");
573 regex_string = std::regex_replace(regex_string, std::regex(
"\\{"),
"\\}");
574 regex_string = std::regex_replace(regex_string, std::regex(
"\\["),
"\\[");
575 regex_string = std::regex_replace(regex_string, std::regex(
"\\]"),
"\\]");
576 regex_string = std::regex_replace(regex_string, std::regex(
"\\+"),
"\\+");
577 regex_string = std::regex_replace(regex_string, std::regex(
"\\/"),
"\\/");
579 regex_string = std::regex_replace(regex_string, std::regex(
"\\?"),
".");
580 regex_string = std::regex_replace(regex_string, std::regex(
"\\*"),
".*");
584 case_sensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
587 std::vector<std::filesystem::path>
get_fs_paths(
const std::string& file_or_directory) {
588 const std::filesystem::path file_or_directory_path(file_or_directory);
589 const auto file_status = std::filesystem::status(file_or_directory_path);
591 std::vector<std::filesystem::path> fs_paths;
592 if (std::filesystem::is_regular_file(file_status)) {
593 fs_paths.emplace_back(file_or_directory_path);
595 }
else if (std::filesystem::is_directory(file_status)) {
596 for (std::filesystem::directory_entry
const& entry :
597 std::filesystem::directory_iterator(file_or_directory_path)) {
598 if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
599 fs_paths.emplace_back(entry.path());
604 const auto parent_path = file_or_directory_path.parent_path();
605 const auto parent_status = std::filesystem::status(parent_path);
606 if (std::filesystem::is_directory(parent_status)) {
607 const auto file_glob = file_or_directory_path.filename();
608 const std::regex glob_regex{
glob_to_regex(file_glob.string(),
false)};
610 for (std::filesystem::directory_entry
const& entry :
611 std::filesystem::directory_iterator(parent_path)) {
612 if (std::filesystem::is_regular_file(std::filesystem::status(entry))) {
613 const auto entry_filename = entry.path().filename().string();
614 if (std::regex_match(entry_filename, glob_regex)) {
615 fs_paths.emplace_back(entry.path());
627 template <
typename T>
632 switch (bounds_type) {
634 switch (interval_type) {
636 return input >= bounds_val;
638 return input > bounds_val;
643 switch (interval_type) {
645 return input <= bounds_val;
647 return input < bounds_val;
660 const int32_t bounds_val,
665 const int64_t bounds_val,
670 const float bounds_val,
675 const double bounds_val,
679 #endif // #ifndef __CUDACC__
bool isKeyCachedAndSameLength(const std::string &key, const size_t num_bytes) const
std::regex glob_to_regex(const std::string &glob, bool case_sensitive=false)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
bool isKeyCached(const std::string &key) const
NEVER_INLINE HOST std::pair< T, T > get_column_min_max(const Column< T > &col)
DEVICE int64_t size() const
void z_std_normalize_col(const T *input_data, T *output_data, const int64_t num_rows, const double mean, const double std_dev)
std::vector< std::filesystem::path > get_fs_paths(const std::string &file_or_directory)
NEVER_INLINE HOST double get_column_mean(const Column< T > &col)
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const size_t max_inputs_per_thread
const T * getDataPtrForKey(const std::string &key) const
void putDataForKey(const std::string &key, T *const data_buffer, const size_t num_elements)
std::shared_ptr< T > getDataForKey(const std::string &key) const
bool isKeyCached(const std::string &key) const
void putDataForKey(const std::string &key, std::shared_ptr< T > const data)
void getDataForKey(const std::string &key, T *dest_buffer) const
DEVICE bool isNull(int64_t index) const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
EXTENSION_NOINLINE double distance_in_meters(const double fromlon, const double fromlat, const double tolon, const double tolat)
Computes the distance, in meters, between two WGS-84 positions.
const size_t parallel_copy_min_bytes
NEVER_INLINE HOST std::tuple< T, T, bool > get_column_metadata(const Column< T > &col)
std::vector< std::vector< T > > z_std_normalize_data(const std::vector< T * > &input_data, const int64_t num_rows)
#define DEBUG_TIMER(name)
NEVER_INLINE HOST double get_column_std_dev(const Column< T > &col, const double mean)
std::vector< std::string > glob(const std::string &pattern)
const T & getDataRefForKey(const std::string &key) const
std::shared_mutex cache_mutex_
NEVER_INLINE HOST bool is_valid_tf_input(const T input, const T bounds_val, const BoundsType bounds_type, const IntervalType interval_type)
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_