OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NullRowsRemoval.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __CUDACC__
18 
19 #include "NullRowsRemoval.h"
20 #include <tbb/parallel_for.h>
21 #include "Shared/ThreadInfo.h"
22 
23 namespace TableFunctions_Namespace {
24 
25 template <typename T>
27  InputData<T> input_data;
28  input_data.num_rows = input_features.size();
29  input_data.null_val = inline_null_value<T>();
30  for (int64_t c = 0; c < input_features.numCols(); ++c) {
31  input_data.col_ptrs.push_back(reinterpret_cast<T*>(input_features.ptrs_[c]));
32  }
33  return input_data;
34 }
35 
36 template InputData<float> strip_column_metadata(const ColumnList<float>& input_features);
37 template InputData<double> strip_column_metadata(
38  const ColumnList<double>& input_features);
39 
40 template <typename T>
42  const ColumnList<T>& input_features) {
43  InputData<T> input_data;
44  input_data.num_rows = input_features.size();
45  CHECK_EQ(input_data.num_rows, input_labels.size());
46  input_data.null_val = inline_null_value<T>();
47  input_data.col_ptrs.push_back(input_labels.ptr_);
48  for (int64_t c = 0; c < input_features.numCols(); ++c) {
49  input_data.col_ptrs.push_back(reinterpret_cast<T*>(input_features.ptrs_[c]));
50  }
51  return input_data;
52 }
53 
54 template InputData<float> strip_column_metadata(const Column<float>& input_labels,
55  const ColumnList<float>& input_features);
56 
57 template InputData<double> strip_column_metadata(
58  const Column<double>& input_labels,
59  const ColumnList<double>& input_features);
60 
61 template <typename T>
63  MaskedData<T> masked_data;
64  const auto input_num_rows = input_data.num_rows;
65  masked_data.unmasked_num_rows = input_num_rows;
66  masked_data.index_map.resize(input_num_rows);
67  auto& index_map = masked_data.index_map;
68  const int32_t num_cols = input_data.col_ptrs.size();
69  int32_t valid_row_count = 0;
70  masked_data.reverse_index_map.reserve(masked_data.unmasked_num_rows);
71  auto& reverse_index_map = masked_data.reverse_index_map;
72  const auto null_val = input_data.null_val;
73  constexpr int64_t target_rows_per_thread{20000};
74  const ThreadInfo thread_info(
75  std::thread::hardware_concurrency(), input_num_rows, target_rows_per_thread);
76  CHECK_GE(thread_info.num_threads, 1L);
77  CHECK_GE(thread_info.num_elems_per_thread, 1L);
78  std::vector<std::vector<int32_t>> per_thread_reverse_index_maps(
79  thread_info.num_threads);
80  tbb::task_arena limited_arena(thread_info.num_threads);
81  limited_arena.execute([&] {
83  tbb::blocked_range<int64_t>(
84  0, input_num_rows, static_cast<int32_t>(thread_info.num_elems_per_thread)),
85  [&](const tbb::blocked_range<int64_t>& r) {
86  size_t thread_idx = tbb::this_task_arena::current_thread_index();
87  auto& local_reverse_index_map = per_thread_reverse_index_maps[thread_idx];
88  int32_t local_valid_row_count = 0;
89  const int32_t start_idx = r.begin();
90  const int32_t end_idx = r.end();
91  for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
92  int32_t col_idx = 0;
93  for (; col_idx < num_cols; ++col_idx) {
94  if (input_data.col_ptrs[col_idx][row_idx] == null_val) {
95  index_map[row_idx] = NULL_ROW_IDX;
96  break;
97  }
98  }
99  if (col_idx == num_cols) {
100  local_reverse_index_map.emplace_back(row_idx);
101  index_map[row_idx] = local_valid_row_count++;
102  }
103  }
104  },
105  tbb::simple_partitioner());
106  });
107 
108  for (const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
109  valid_row_count += per_thread_reverse_index_map.size();
110  }
111  masked_data.masked_num_rows = valid_row_count;
112 
113  masked_data.data.resize(num_cols, nullptr);
114  // Exit early if there are no nulls to avoid unneeded computation
115  if (masked_data.masked_num_rows == masked_data.unmasked_num_rows) {
116  for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
117  masked_data.data[col_idx] = input_data.col_ptrs[col_idx];
118  }
119  return masked_data;
120  }
121 
122  masked_data.reverse_index_map.resize(valid_row_count);
123 
124  int32_t start_reverse_index_offset = 0;
125  std::vector<std::future<void>> worker_threads;
126  for (const auto& per_thread_reverse_index_map : per_thread_reverse_index_maps) {
127  const int32_t local_reverse_index_map_size = per_thread_reverse_index_map.size();
128  worker_threads.emplace_back(std::async(
130  [&reverse_index_map](const auto& local_map,
131  const int32_t local_map_offset,
132  const int32_t local_map_size) {
133  for (int32_t map_idx = 0; map_idx < local_map_size; ++map_idx) {
134  reverse_index_map[map_idx + local_map_offset] = local_map[map_idx];
135  }
136  },
137  per_thread_reverse_index_map,
138  start_reverse_index_offset,
139  local_reverse_index_map_size));
140  start_reverse_index_offset += local_reverse_index_map_size;
141  }
142  for (auto& worker_thread : worker_threads) {
143  worker_thread.wait();
144  }
145  masked_data.data_allocations.resize(num_cols);
146  for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
147  masked_data.data_allocations[col_idx].resize(valid_row_count);
148  masked_data.data[col_idx] = masked_data.data_allocations[col_idx].data();
149  }
150 
151  auto& denulled_data = masked_data.data;
152 
153  tbb::parallel_for(tbb::blocked_range<int32_t>(0, valid_row_count),
154  [&](const tbb::blocked_range<int32_t>& r) {
155  const int32_t start_idx = r.begin();
156  const int32_t end_idx = r.end();
157  for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
158  const auto input_row_idx = reverse_index_map[row_idx];
159  for (int32_t col_idx = 0; col_idx < num_cols; ++col_idx) {
160  denulled_data[col_idx][row_idx] =
161  input_data.col_ptrs[col_idx][input_row_idx];
162  }
163  }
164  });
165  return masked_data;
166 }
167 
168 template MaskedData<float> remove_null_rows(const InputData<float>& input_data);
169 template MaskedData<double> remove_null_rows(const InputData<double>& input_data);
170 
171 template <typename T>
172 void unmask_data(const T* masked_input,
173  const std::vector<int32_t>& reverse_index_map,
174  T* unmasked_output,
175  const int64_t num_unmasked_rows,
176  const T null_val) {
177  // First fill data with nulls (as its currently initialized to 0)
178  // Todo(todd): Look at allowing override of default 0-initialization of output columns
179  // to avoid overhead of double initialization
180  tbb::parallel_for(tbb::blocked_range<size_t>(0, static_cast<size_t>(num_unmasked_rows)),
181  [&](const tbb::blocked_range<size_t>& r) {
182  const int32_t start_idx = r.begin();
183  const int32_t end_idx = r.end();
184  for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
185  unmasked_output[row_idx] = null_val;
186  }
187  });
188 
189  const auto num_masked_rows = reverse_index_map.size();
190  tbb::parallel_for(tbb::blocked_range<size_t>(0, num_masked_rows),
191  [&](const tbb::blocked_range<size_t>& r) {
192  const int32_t start_idx = r.begin();
193  const int32_t end_idx = r.end();
194  for (int32_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
195  unmasked_output[reverse_index_map[row_idx]] =
196  masked_input[row_idx];
197  }
198  });
199 }
200 
201 template void unmask_data(const int32_t* masked_input,
202  const std::vector<int32_t>& reverse_index_map,
203  int32_t* unmasked_output,
204  const int64_t num_unmasked_rows,
205  const int32_t null_val);
206 
207 template void unmask_data(const float* masked_input,
208  const std::vector<int32_t>& reverse_index_map,
209  float* unmasked_output,
210  const int64_t num_unmasked_rows,
211  const float null_val);
212 
213 template void unmask_data(const double* masked_input,
214  const std::vector<int32_t>& reverse_index_map,
215  double* unmasked_output,
216  const int64_t num_unmasked_rows,
217  const double null_val);
218 
219 } // namespace TableFunctions_Namespace
220 
221 #endif // __CUDACC__
#define CHECK_EQ(x, y)
Definition: Logger.h:301
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
DEVICE int64_t size() const
DEVICE int64_t numCols() const
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::vector< int32_t > reverse_index_map
future< Result > async(Fn &&fn, Args &&...args)
int64_t num_threads
Definition: ThreadInfo.h:22
int8_t ** ptrs_
void unmask_data(const T *masked_input, const std::vector< int32_t > &reverse_index_map, T *unmasked_output, const int64_t num_unmasked_rows, const T null_val)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
constexpr int32_t NULL_ROW_IDX
MaskedData< T > remove_null_rows(const InputData< T > &input_data)
DEVICE int64_t size() const
InputData< T > strip_column_metadata(const ColumnList< T > &input_features)