OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SessionizeTableFunctions.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #ifndef __CUDACC__
20 #ifdef HAVE_TBB
21 
23 #include "Shared/ThreadInfo.h"
24 
25 #include <tbb/parallel_for.h>
26 #include <tbb/parallel_reduce.h>
27 #include <tbb/parallel_sort.h>
28 #include <algorithm>
29 #include <vector>
30 
31 inline int32_t get_elapsed_seconds(const Timestamp& start, const Timestamp& end) {
32  return (end.time - start.time) / 1000000000;
33 }
34 
35 // clang-format off
36 /*
37  UDTF: tf_compute_dwell_times__cpu_template(TableFunctionManager,
38  Cursor<Column<I> entity_id, Column<S> site_id, Column<Timestamp> ts> data,
39  int64_t min_dwell_points | require="min_dwell_points >= 0" | default=1,
40  int64_t min_dwell_seconds | require="min_dwell_seconds >= 0" | default=0,
41  int64_t max_inactive_seconds | require="max_inactive_seconds >= 0" | default=999999999999) | filter_table_function_transpose=on ->
42  Column<I> entity_id | input_id=args<0>, Column<S> site_id | input_id=args<1>, Column<S> prev_site_id | input_id=args<1>,
43  Column<S> next_site_id | input_id=args<1>, Column<int32_t> session_id, Column<int32_t> start_seq_id,
44  Column<Timestamp> ts, Column<int32_t> dwell_time_sec, Column<int32_t> num_dwell_points,
45  I=[TextEncodingDict, int64_t], S=[TextEncodingDict, int64_t]
46  */
47 // clang-format on
48 
49 template <typename I, typename S>
50 NEVER_INLINE HOST int32_t
51 tf_compute_dwell_times__cpu_template(TableFunctionManager& mgr,
52  const Column<I>& input_id,
53  const Column<S>& input_site_id,
54  const Column<Timestamp>& input_ts,
55  const int64_t min_dwell_points,
56  const int64_t min_dwell_seconds,
57  const int64_t max_inactive_seconds,
58  Column<I>& output_id,
59  Column<S>& output_site_id,
60  Column<S>& output_prev_site_id,
61  Column<S>& output_next_site_id,
62  Column<int32_t>& output_session_id,
63  Column<int32_t>& output_start_seq_id,
64  Column<Timestamp>& output_start_ts,
65  Column<int32_t>& output_dwell_time_sec,
66  Column<int32_t>& output_dwell_points) {
67  auto func_timer = DEBUG_TIMER(__func__);
68 
69  const I id_null_val = inline_null_value<I>();
70  const S site_id_null_val = inline_null_value<S>();
71 
72  const int32_t num_input_rows = input_id.size();
73 
74  // Short circuit early both to avoid unneeded computation and
75  // also eliminate the need to ifguard against empty input sets
76  // below
77 
78  if (num_input_rows == 0) {
79  return 0;
80  }
81 
82  std::vector<int32_t> permutation_idxs(num_input_rows);
83  {
84  auto permute_creation_timer = DEBUG_TIMER("Create permutation index");
85  tbb::parallel_for(tbb::blocked_range<int32_t>(0, num_input_rows),
86  [&](const tbb::blocked_range<int32_t>& r) {
87  const int32_t r_end = r.end();
88  for (int32_t p = r.begin(); p < r_end; ++p) {
89  permutation_idxs[p] = p;
90  }
91  });
92  }
93 
94  {
95  auto permute_sort_timer = DEBUG_TIMER("Sort permutation index");
96  // Sort permutation_idx in ascending order
97  tbb::parallel_sort(permutation_idxs.begin(),
98  permutation_idxs.begin() + num_input_rows,
99  [&](const int32_t& a, const int32_t& b) {
100  return input_id[a] == input_id[b] ? input_ts[a] < input_ts[b]
101  : input_id[a] < input_id[b];
102  });
103  }
104 
105  constexpr int64_t target_rows_per_thread{20000};
106  const ThreadInfo thread_info(
107  std::thread::hardware_concurrency(), num_input_rows, target_rows_per_thread);
108  CHECK_GT(thread_info.num_threads, 0);
109  std::vector<int32_t> per_thread_session_counts(thread_info.num_threads, 0);
110  std::vector<std::pair<int32_t, int32_t>> per_thread_actual_idx_offsets(
111  thread_info.num_threads);
112  std::vector<std::future<void>> worker_threads;
113  int32_t start_row_idx = 0;
114  {
115  // First we count number of dwell sessions found and start and end input and
116  // output offsets per thread
117  auto pre_flight_dwell_count_timer = DEBUG_TIMER("Pre-flight Dwell Count");
118  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
119  const int32_t end_row_idx =
120  std::min(start_row_idx + thread_info.num_elems_per_thread,
121  static_cast<int64_t>(num_input_rows));
122  worker_threads.emplace_back(std::async(
124  [&, min_dwell_points, min_dwell_seconds, num_input_rows, max_inactive_seconds](
125  const auto start_idx, const auto end_idx, const auto thread_idx) {
126  int32_t thread_session_count = per_thread_session_counts[thread_idx];
127  // First find first new index
128  int32_t first_valid_idx = start_idx;
129  // First partition reads from beginning
130  if (start_idx > 0) {
131  I first_id = input_id[permutation_idxs[first_valid_idx]];
132  for (; first_valid_idx < end_idx; ++first_valid_idx) {
133  const int32_t permuted_idx = permutation_idxs[first_valid_idx];
134  if (!input_id.isNull(permuted_idx) &&
135  input_id[permuted_idx] != first_id) {
136  break;
137  }
138  }
139  }
140  per_thread_actual_idx_offsets[thread_idx].first = first_valid_idx;
141 
142  auto last_id = input_id[permutation_idxs[first_valid_idx]];
143  auto last_site_id = input_site_id[permutation_idxs[first_valid_idx]];
144 
145  int32_t i = first_valid_idx;
146  int32_t session_num_points = 0;
147  auto session_start_ts = input_ts[permutation_idxs[i]];
148  auto last_ts = input_ts[permutation_idxs[i]];
149  for (; i < end_idx; ++i) {
150  const auto permuted_idx = permutation_idxs[i];
151  const auto& id = input_id[permuted_idx];
152  const auto& site_id = input_site_id[permuted_idx];
153  const auto& current_ts = input_ts[permuted_idx];
154  if (id != last_id || site_id != last_site_id ||
155  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
156  if (last_id != id_null_val && last_site_id != site_id_null_val) {
157  if (session_num_points >= min_dwell_points &&
158  get_elapsed_seconds(session_start_ts, last_ts) >=
159  min_dwell_seconds) {
160  thread_session_count++;
161  }
162  }
163  session_num_points = 1;
164  session_start_ts = current_ts;
165  } else {
166  session_num_points++;
167  }
168 
169  last_id = id;
170  last_site_id = site_id;
171  last_ts = current_ts;
172  }
173 
174  CHECK_EQ(i, end_idx);
175 
176  if (end_idx < num_input_rows) {
177  const int32_t max_transitions = (input_id[permutation_idxs[end_idx]] !=
178  input_id[permutation_idxs[end_idx - 1]])
179  ? 2
180  : 1;
181  int32_t transition_count = 0;
182  for (; i < num_input_rows; ++i) {
183  const auto permuted_idx = permutation_idxs[i];
184  const auto& id = input_id[permuted_idx];
185  const auto& site_id = input_site_id[permuted_idx];
186  const auto& current_ts = input_ts[permuted_idx];
187 
188  if (id != last_id || site_id != last_site_id ||
189  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
190  if (id != last_id) {
191  transition_count++;
192  if (transition_count == max_transitions) {
193  break;
194  }
195  }
196  if (last_id != id_null_val && last_site_id != site_id_null_val) {
197  if (session_num_points >= min_dwell_points &&
198  get_elapsed_seconds(session_start_ts, last_ts) >=
199  min_dwell_seconds) {
200  thread_session_count++;
201  }
202  }
203  last_id = id;
204  last_site_id = site_id;
205  session_num_points = 1;
206  session_start_ts = current_ts;
207  } else {
208  session_num_points++;
209  }
210  last_ts = current_ts;
211  }
212  }
213  if (last_id != id_null_val && last_site_id != site_id_null_val) {
214  if (session_num_points >= min_dwell_points &&
215  get_elapsed_seconds(session_start_ts, last_ts) >= min_dwell_seconds) {
216  thread_session_count++;
217  }
218  }
219  per_thread_actual_idx_offsets[thread_idx].second = i;
220  per_thread_session_counts[thread_idx] = thread_session_count;
221  },
222  start_row_idx,
223  end_row_idx,
224  thread_idx));
225 
226  start_row_idx += thread_info.num_elems_per_thread;
227  }
228  }
229  for (auto& worker_thread : worker_threads) {
230  worker_thread.wait();
231  }
232  worker_threads.clear();
233 
234  // Now compute a prefix_sum
235  std::vector<int32_t> session_counts_prefix_sums(thread_info.num_threads + 1);
236  session_counts_prefix_sums[0] = 0;
237  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
238  session_counts_prefix_sums[thread_idx + 1] =
239  session_counts_prefix_sums[thread_idx] + per_thread_session_counts[thread_idx];
240  }
241  const auto num_output_rows = session_counts_prefix_sums[thread_info.num_threads];
242  mgr.set_output_row_size(num_output_rows);
243  if (num_output_rows == 0) {
244  return num_output_rows;
245  }
246 
247  {
248  // Now actually compute the dwell times and other attributes, using the per-thread
249  // computed input and output offsets computed above
250  auto dwell_calc_timer = DEBUG_TIMER("Dwell Calc");
251  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
252  const int32_t start_row_idx = per_thread_actual_idx_offsets[thread_idx].first;
253  const int32_t end_row_idx = per_thread_actual_idx_offsets[thread_idx].second;
254  const int32_t output_start_offset = session_counts_prefix_sums[thread_idx];
255  const int32_t num_sessions = session_counts_prefix_sums[thread_idx + 1] -
256  session_counts_prefix_sums[thread_idx];
257  worker_threads.emplace_back(std::async(
259  [&input_id,
260  &input_site_id,
261  &input_ts,
262  &output_id,
263  &output_site_id,
264  &output_session_id,
265  &output_start_seq_id,
266  &output_start_ts,
267  &output_dwell_time_sec,
268  &output_dwell_points,
269  &permutation_idxs,
270  &id_null_val,
271  &site_id_null_val,
272  min_dwell_points,
273  min_dwell_seconds,
274  max_inactive_seconds](const auto start_row_idx,
275  const auto end_row_idx,
276  const auto output_start_offset,
277  const auto num_sessions) {
278  if (!(end_row_idx > start_row_idx)) {
279  return;
280  }
281  int32_t output_offset = output_start_offset;
282  int32_t session_start_seq_id = 1;
283  int32_t session_seq_id = 1;
284  int32_t session_id = 1;
285  auto last_id = input_id[permutation_idxs[start_row_idx]];
286  auto last_site_id = input_site_id[permutation_idxs[start_row_idx]];
287  auto session_start_ts = input_ts[permutation_idxs[start_row_idx]];
288  auto last_ts = input_ts[permutation_idxs[start_row_idx]];
289  for (int32_t idx = start_row_idx; idx < end_row_idx; ++idx) {
290  const auto permuted_idx = permutation_idxs[idx];
291  const auto& id = input_id[permuted_idx];
292  const auto& site_id = input_site_id[permuted_idx];
293  const auto& current_ts = input_ts[permuted_idx];
294  if (id != last_id || site_id != last_site_id ||
295  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
296  if (last_id != id_null_val && last_site_id != site_id_null_val) {
297  const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
298  const int32_t num_dwell_seconds =
299  get_elapsed_seconds(session_start_ts, last_ts);
300  if (num_dwell_points >= min_dwell_points &&
301  num_dwell_seconds >= min_dwell_seconds) {
302  output_id[output_offset] = last_id;
303  output_site_id[output_offset] = last_site_id;
304  output_session_id[output_offset] = session_id++;
305  output_start_seq_id[output_offset] = session_start_seq_id;
306  output_start_ts[output_offset] = session_start_ts;
307  output_dwell_time_sec[output_offset] = num_dwell_seconds;
308  output_dwell_points[output_offset] = num_dwell_points;
309  output_offset++;
310  }
311  }
312  last_site_id = site_id;
313  session_start_ts = input_ts[permuted_idx];
314  if (id != last_id) {
315  last_id = id;
316  session_start_seq_id = 1;
317  session_seq_id = 1;
318  session_id = 1;
319  } else {
320  session_start_seq_id = session_seq_id;
321  }
322  }
323  session_seq_id++;
324  last_ts = current_ts;
325  }
326  if (last_id != id_null_val && last_site_id != site_id_null_val) {
327  const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
328  const int32_t num_dwell_seconds =
329  get_elapsed_seconds(session_start_ts, last_ts);
330  if (num_dwell_points >= min_dwell_points &&
331  num_dwell_seconds >= min_dwell_seconds) {
332  output_id[output_offset] = last_id;
333  output_site_id[output_offset] = last_site_id;
334  output_session_id[output_offset] = session_id++;
335  output_start_seq_id[output_offset] = session_start_seq_id;
336  output_start_ts[output_offset] = session_start_ts;
337  output_dwell_time_sec[output_offset] = num_dwell_seconds;
338  output_dwell_points[output_offset] = num_dwell_points;
339  output_offset++;
340  }
341  }
342  CHECK_EQ(output_offset - output_start_offset, num_sessions);
343  },
344  start_row_idx,
345  end_row_idx,
346  output_start_offset,
347  num_sessions));
348  }
349  }
350  for (auto& worker_thread : worker_threads) {
351  worker_thread.wait();
352  }
353 
354  {
355  output_prev_site_id[0] = site_id_null_val;
356  output_next_site_id[0] = num_output_rows > 1 && output_id[0] == output_id[1]
357  ? output_site_id[1]
358  : site_id_null_val;
359  output_prev_site_id[num_output_rows - 1] =
360  num_output_rows > 1 &&
361  output_id[num_output_rows - 1] == output_id[num_output_rows - 2]
362  ? output_site_id[num_output_rows - 2]
363  : site_id_null_val;
364  output_next_site_id[num_output_rows - 1] = site_id_null_val;
365 
366  auto permute_creation_timer = DEBUG_TIMER("Fill lagged and lead site ids");
367  tbb::parallel_for(tbb::blocked_range<int32_t>(1, num_output_rows - 1),
368  [&](const tbb::blocked_range<int32_t>& r) {
369  const int32_t r_end = r.end();
370  for (int32_t p = r.begin(); p < r_end; ++p) {
371  output_prev_site_id[p] = output_id[p] == output_id[p - 1]
372  ? output_site_id[p - 1]
373  : site_id_null_val;
374  output_next_site_id[p] = output_id[p] == output_id[p + 1]
375  ? output_site_id[p + 1]
376  : site_id_null_val;
377  }
378  });
379  }
380 
381  return num_output_rows;
382 }
383 
384 #endif // #ifdef HAVE_TBB
385 #endif // #ifndef __CUDACC__
void set_output_row_size(int64_t num_rows)
Definition: heavydbTypes.h:373
#define CHECK_EQ(x, y)
Definition: Logger.h:301
DEVICE int64_t size() const
int64_t time
Definition: heavydbTypes.h:699
#define CHECK_GT(x, y)
Definition: Logger.h:305
constexpr double a
Definition: Utm.h:32
#define HOST
future< Result > async(Fn &&fn, Args &&...args)
DEVICE bool isNull(int64_t index) const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define NEVER_INLINE
#define DEBUG_TIMER(name)
Definition: Logger.h:412