25 #include <tbb/parallel_for.h>
26 #include <tbb/parallel_reduce.h>
27 #include <tbb/parallel_sort.h>
32 return (end.
time - start.
time) / 1000000000;
49 template <
typename I,
typename S>
55 const int64_t min_dwell_points,
56 const int64_t min_dwell_seconds,
57 const int64_t max_inactive_seconds,
69 const I id_null_val = inline_null_value<I>();
70 const S site_id_null_val = inline_null_value<S>();
72 const int32_t num_input_rows = input_id.
size();
78 if (num_input_rows == 0) {
82 std::vector<int32_t> permutation_idxs(num_input_rows);
84 auto permute_creation_timer =
DEBUG_TIMER(
"Create permutation index");
86 [&](
const tbb::blocked_range<int32_t>& r) {
87 const int32_t r_end = r.end();
88 for (int32_t p = r.begin(); p < r_end; ++p) {
89 permutation_idxs[p] = p;
95 auto permute_sort_timer =
DEBUG_TIMER(
"Sort permutation index");
97 tbb::parallel_sort(permutation_idxs.begin(),
98 permutation_idxs.begin() + num_input_rows,
99 [&](
const int32_t&
a,
const int32_t& b) {
100 return input_id[
a] == input_id[b] ? input_ts[
a] < input_ts[b]
101 : input_id[
a] < input_id[b];
105 constexpr int64_t target_rows_per_thread{20000};
107 std::thread::hardware_concurrency(), num_input_rows, target_rows_per_thread);
108 CHECK_GT(thread_info.num_threads, 0);
109 std::vector<int32_t> per_thread_session_counts(thread_info.num_threads, 0);
110 std::vector<std::pair<int32_t, int32_t>> per_thread_actual_idx_offsets(
111 thread_info.num_threads);
112 std::vector<std::future<void>> worker_threads;
113 int32_t start_row_idx = 0;
117 auto pre_flight_dwell_count_timer =
DEBUG_TIMER(
"Pre-flight Dwell Count");
118 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
119 const int32_t end_row_idx =
120 std::min(start_row_idx + thread_info.num_elems_per_thread,
121 static_cast<int64_t>(num_input_rows));
124 [&per_thread_session_counts,
125 &per_thread_actual_idx_offsets,
135 max_inactive_seconds](
136 const auto start_idx,
const auto end_idx,
const auto thread_idx) {
137 int32_t thread_session_count = per_thread_session_counts[thread_idx];
139 int32_t first_valid_idx = start_idx;
142 I first_id = input_id[permutation_idxs[first_valid_idx]];
143 for (; first_valid_idx < end_idx; ++first_valid_idx) {
144 const int32_t permuted_idx = permutation_idxs[first_valid_idx];
145 if (!input_id.
isNull(permuted_idx) &&
146 input_id[permuted_idx] != first_id) {
151 per_thread_actual_idx_offsets[thread_idx].first = first_valid_idx;
153 auto last_id = input_id[permutation_idxs[first_valid_idx]];
154 auto last_site_id = input_site_id[permutation_idxs[first_valid_idx]];
156 int32_t i = first_valid_idx;
157 int32_t session_num_points = 0;
158 auto session_start_ts = input_ts[permutation_idxs[i]];
159 auto last_ts = input_ts[permutation_idxs[i]];
160 for (; i < end_idx; ++i) {
161 const auto permuted_idx = permutation_idxs[i];
162 const auto&
id = input_id[permuted_idx];
163 const auto& site_id = input_site_id[permuted_idx];
164 const auto& current_ts = input_ts[permuted_idx];
165 if (
id != last_id || site_id != last_site_id ||
166 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
167 if (last_id != id_null_val && last_site_id != site_id_null_val) {
168 if (session_num_points >= min_dwell_points &&
169 get_elapsed_seconds(session_start_ts, last_ts) >=
171 thread_session_count++;
174 session_num_points = 1;
175 session_start_ts = current_ts;
177 session_num_points++;
181 last_site_id = site_id;
182 last_ts = current_ts;
187 if (end_idx < num_input_rows) {
188 const int32_t max_transitions = (input_id[permutation_idxs[end_idx]] !=
189 input_id[permutation_idxs[end_idx - 1]])
192 int32_t transition_count = 0;
193 for (; i < num_input_rows; ++i) {
194 const auto permuted_idx = permutation_idxs[i];
195 const auto&
id = input_id[permuted_idx];
196 const auto& site_id = input_site_id[permuted_idx];
197 const auto& current_ts = input_ts[permuted_idx];
199 if (
id != last_id || site_id != last_site_id ||
200 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
203 if (transition_count == max_transitions) {
207 if (last_id != id_null_val && last_site_id != site_id_null_val) {
208 if (session_num_points >= min_dwell_points &&
209 get_elapsed_seconds(session_start_ts, last_ts) >=
211 thread_session_count++;
215 last_site_id = site_id;
216 session_num_points = 1;
217 session_start_ts = current_ts;
219 session_num_points++;
221 last_ts = current_ts;
224 if (last_id != id_null_val && last_site_id != site_id_null_val) {
225 if (session_num_points >= min_dwell_points &&
226 get_elapsed_seconds(session_start_ts, last_ts) >= min_dwell_seconds) {
227 thread_session_count++;
230 per_thread_actual_idx_offsets[thread_idx].second = i;
231 per_thread_session_counts[thread_idx] = thread_session_count;
237 start_row_idx += thread_info.num_elems_per_thread;
240 for (
auto& worker_thread : worker_threads) {
241 worker_thread.wait();
243 worker_threads.clear();
246 std::vector<int32_t> session_counts_prefix_sums(thread_info.num_threads + 1);
247 session_counts_prefix_sums[0] = 0;
248 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
249 session_counts_prefix_sums[thread_idx + 1] =
250 session_counts_prefix_sums[thread_idx] + per_thread_session_counts[thread_idx];
252 const auto num_output_rows = session_counts_prefix_sums[thread_info.num_threads];
254 if (num_output_rows == 0) {
255 return num_output_rows;
262 for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
263 const int32_t start_row_idx = per_thread_actual_idx_offsets[thread_idx].first;
264 const int32_t end_row_idx = per_thread_actual_idx_offsets[thread_idx].second;
265 const int32_t output_start_offset = session_counts_prefix_sums[thread_idx];
266 const int32_t num_sessions = session_counts_prefix_sums[thread_idx + 1] -
267 session_counts_prefix_sums[thread_idx];
276 &output_start_seq_id,
278 &output_dwell_time_sec,
279 &output_dwell_points,
285 max_inactive_seconds](
const auto start_row_idx,
286 const auto end_row_idx,
287 const auto output_start_offset,
288 const auto num_sessions) {
289 if (!(end_row_idx > start_row_idx)) {
292 int32_t output_offset = output_start_offset;
293 int32_t session_start_seq_id = 1;
294 int32_t session_seq_id = 1;
295 int32_t session_id = 1;
296 auto last_id = input_id[permutation_idxs[start_row_idx]];
297 auto last_site_id = input_site_id[permutation_idxs[start_row_idx]];
298 auto session_start_ts = input_ts[permutation_idxs[start_row_idx]];
299 auto last_ts = input_ts[permutation_idxs[start_row_idx]];
300 for (int32_t idx = start_row_idx; idx < end_row_idx; ++idx) {
301 const auto permuted_idx = permutation_idxs[idx];
302 const auto&
id = input_id[permuted_idx];
303 const auto& site_id = input_site_id[permuted_idx];
304 const auto& current_ts = input_ts[permuted_idx];
305 if (
id != last_id || site_id != last_site_id ||
306 get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
307 if (last_id != id_null_val && last_site_id != site_id_null_val) {
308 const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
309 const int32_t num_dwell_seconds =
310 get_elapsed_seconds(session_start_ts, last_ts);
311 if (num_dwell_points >= min_dwell_points &&
312 num_dwell_seconds >= min_dwell_seconds) {
313 output_id[output_offset] = last_id;
314 output_site_id[output_offset] = last_site_id;
315 output_session_id[output_offset] = session_id++;
316 output_start_seq_id[output_offset] = session_start_seq_id;
317 output_start_ts[output_offset] = session_start_ts;
318 output_dwell_time_sec[output_offset] = num_dwell_seconds;
319 output_dwell_points[output_offset] = num_dwell_points;
323 last_site_id = site_id;
324 session_start_ts = input_ts[permuted_idx];
327 session_start_seq_id = 1;
331 session_start_seq_id = session_seq_id;
335 last_ts = current_ts;
337 if (last_id != id_null_val && last_site_id != site_id_null_val) {
338 const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
339 const int32_t num_dwell_seconds =
340 get_elapsed_seconds(session_start_ts, last_ts);
341 if (num_dwell_points >= min_dwell_points &&
342 num_dwell_seconds >= min_dwell_seconds) {
343 output_id[output_offset] = last_id;
344 output_site_id[output_offset] = last_site_id;
345 output_session_id[output_offset] = session_id++;
346 output_start_seq_id[output_offset] = session_start_seq_id;
347 output_start_ts[output_offset] = session_start_ts;
348 output_dwell_time_sec[output_offset] = num_dwell_seconds;
349 output_dwell_points[output_offset] = num_dwell_points;
353 CHECK_EQ(output_offset - output_start_offset, num_sessions);
361 for (
auto& worker_thread : worker_threads) {
362 worker_thread.wait();
366 output_prev_site_id[0] = site_id_null_val;
367 output_next_site_id[0] = num_output_rows > 1 && output_id[0] == output_id[1]
370 output_prev_site_id[num_output_rows - 1] =
371 num_output_rows > 1 &&
372 output_id[num_output_rows - 1] == output_id[num_output_rows - 2]
373 ? output_site_id[num_output_rows - 2]
375 output_next_site_id[num_output_rows - 1] = site_id_null_val;
377 auto permute_creation_timer =
DEBUG_TIMER(
"Fill lagged and lead site ids");
379 [&](
const tbb::blocked_range<int32_t>& r) {
380 const int32_t r_end = r.end();
381 for (int32_t p = r.begin(); p < r_end; ++p) {
382 output_prev_site_id[p] = output_id[p] == output_id[p - 1]
383 ? output_site_id[p - 1]
385 output_next_site_id[p] = output_id[p] == output_id[p + 1]
386 ? output_site_id[p + 1]
392 return num_output_rows;
395 #endif // #ifdef HAVE_TBB
396 #endif // #ifndef __CUDACC__
void set_output_row_size(int64_t num_rows)
DEVICE int64_t size() const
future< Result > async(Fn &&fn, Args &&...args)
DEVICE bool isNull(int64_t index) const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define DEBUG_TIMER(name)