OmniSciDB  4201147b46
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SessionizeTableFunctions.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #ifndef __CUDACC__
20 #ifdef HAVE_TBB
21 
23 #include "Shared/ThreadInfo.h"
24 
25 #include <tbb/parallel_for.h>
26 #include <tbb/parallel_reduce.h>
27 #include <tbb/parallel_sort.h>
28 #include <algorithm>
29 #include <vector>
30 
31 inline int32_t get_elapsed_seconds(const Timestamp& start, const Timestamp& end) {
32  return (end.time - start.time) / 1000000000;
33 }
34 
35 // clang-format off
36 /*
37  UDTF: tf_compute_dwell_times__cpu_template(TableFunctionManager,
38  Cursor<Column<I> entity_id, Column<S> session_id, Column<Timestamp> ts> data,
39  int64_t min_dwell_points | require="min_dwell_points >= 0",
40  int64_t min_dwell_seconds | require="min_dwell_seconds >= 0",
41  int64_t max_inactive_seconds | require="max_inactive_seconds >= 0") | filter_table_function_transpose=on ->
42  Column<I> entity_id | input_id=args<0>, Column<S> site_id | input_id=args<1>, Column<S> prev_site_id | input_id=args<1>,
43  Column<S> next_site_id | input_id=args<1>, Column<int32_t> session_id, Column<int32_t> start_seq_id,
44  Column<Timestamp> ts, Column<int32_t> dwell_time_sec, Column<int32_t> num_dwell_points,
45  I=[TextEncodingDict, int64_t], S=[TextEncodingDict, int64_t]
46  */
47 // clang-format on
48 
49 template <typename I, typename S>
50 NEVER_INLINE HOST int32_t
51 tf_compute_dwell_times__cpu_template(TableFunctionManager& mgr,
52  const Column<I>& input_id,
53  const Column<S>& input_site_id,
54  const Column<Timestamp>& input_ts,
55  const int64_t min_dwell_points,
56  const int64_t min_dwell_seconds,
57  const int64_t max_inactive_seconds,
58  Column<I>& output_id,
59  Column<S>& output_site_id,
60  Column<S>& output_prev_site_id,
61  Column<S>& output_next_site_id,
62  Column<int32_t>& output_session_id,
63  Column<int32_t>& output_start_seq_id,
64  Column<Timestamp>& output_start_ts,
65  Column<int32_t>& output_dwell_time_sec,
66  Column<int32_t>& output_dwell_points) {
67  auto func_timer = DEBUG_TIMER(__func__);
68 
69  const I id_null_val = inline_null_value<I>();
70  const S site_id_null_val = inline_null_value<S>();
71 
72  const int32_t num_input_rows = input_id.size();
73 
74  // Short circuit early both to avoid unneeded computation and
75  // also eliminate the need to ifguard against empty input sets
76  // below
77 
78  if (num_input_rows == 0) {
79  return 0;
80  }
81 
82  std::vector<int32_t> permutation_idxs(num_input_rows);
83  {
84  auto permute_creation_timer = DEBUG_TIMER("Create permutation index");
85  tbb::parallel_for(tbb::blocked_range<int32_t>(0, num_input_rows),
86  [&](const tbb::blocked_range<int32_t>& r) {
87  const int32_t r_end = r.end();
88  for (int32_t p = r.begin(); p < r_end; ++p) {
89  permutation_idxs[p] = p;
90  }
91  });
92  }
93 
94  {
95  auto permute_sort_timer = DEBUG_TIMER("Sort permutation index");
96  // Sort permutation_idx in ascending order
97  tbb::parallel_sort(permutation_idxs.begin(),
98  permutation_idxs.begin() + num_input_rows,
99  [&](const int32_t& a, const int32_t& b) {
100  return input_id[a] == input_id[b] ? input_ts[a] < input_ts[b]
101  : input_id[a] < input_id[b];
102  });
103  }
104 
105  constexpr int64_t target_rows_per_thread{20000};
106  const ThreadInfo thread_info(
107  std::thread::hardware_concurrency(), num_input_rows, target_rows_per_thread);
108  CHECK_GT(thread_info.num_threads, 0);
109  std::vector<int32_t> per_thread_session_counts(thread_info.num_threads, 0);
110  std::vector<std::pair<int32_t, int32_t>> per_thread_actual_idx_offsets(
111  thread_info.num_threads);
112  std::vector<std::future<void>> worker_threads;
113  int32_t start_row_idx = 0;
114  {
115  // First we count number of dwell sessions found and start and end input and
116  // output offsets per thread
117  auto pre_flight_dwell_count_timer = DEBUG_TIMER("Pre-flight Dwell Count");
118  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
119  const int32_t end_row_idx =
120  std::min(start_row_idx + thread_info.num_elems_per_thread,
121  static_cast<int64_t>(num_input_rows));
122  worker_threads.emplace_back(std::async(
124  [&per_thread_session_counts,
125  &per_thread_actual_idx_offsets,
126  &input_id,
127  &input_site_id,
128  &input_ts,
129  &permutation_idxs,
130  &id_null_val,
131  &site_id_null_val,
132  min_dwell_points,
133  min_dwell_seconds,
134  num_input_rows,
135  max_inactive_seconds](
136  const auto start_idx, const auto end_idx, const auto thread_idx) {
137  int32_t thread_session_count = per_thread_session_counts[thread_idx];
138  // First find first new index
139  int32_t first_valid_idx = start_idx;
140  // First partition reads from beginning
141  if (start_idx > 0) {
142  I first_id = input_id[permutation_idxs[first_valid_idx]];
143  for (; first_valid_idx < end_idx; ++first_valid_idx) {
144  const int32_t permuted_idx = permutation_idxs[first_valid_idx];
145  if (!input_id.isNull(permuted_idx) &&
146  input_id[permuted_idx] != first_id) {
147  break;
148  }
149  }
150  }
151  per_thread_actual_idx_offsets[thread_idx].first = first_valid_idx;
152 
153  auto last_id = input_id[permutation_idxs[first_valid_idx]];
154  auto last_site_id = input_site_id[permutation_idxs[first_valid_idx]];
155 
156  int32_t i = first_valid_idx;
157  int32_t session_num_points = 0;
158  auto session_start_ts = input_ts[permutation_idxs[i]];
159  auto last_ts = input_ts[permutation_idxs[i]];
160  for (; i < end_idx; ++i) {
161  const auto permuted_idx = permutation_idxs[i];
162  const auto& id = input_id[permuted_idx];
163  const auto& site_id = input_site_id[permuted_idx];
164  const auto& current_ts = input_ts[permuted_idx];
165  if (id != last_id || site_id != last_site_id ||
166  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
167  if (last_id != id_null_val && last_site_id != site_id_null_val) {
168  if (session_num_points >= min_dwell_points &&
169  get_elapsed_seconds(session_start_ts, last_ts) >=
170  min_dwell_seconds) {
171  thread_session_count++;
172  }
173  }
174  session_num_points = 1;
175  session_start_ts = current_ts;
176  } else {
177  session_num_points++;
178  }
179 
180  last_id = id;
181  last_site_id = site_id;
182  last_ts = current_ts;
183  }
184 
185  CHECK_EQ(i, end_idx);
186 
187  if (end_idx < num_input_rows) {
188  const int32_t max_transitions = (input_id[permutation_idxs[end_idx]] !=
189  input_id[permutation_idxs[end_idx - 1]])
190  ? 2
191  : 1;
192  int32_t transition_count = 0;
193  for (; i < num_input_rows; ++i) {
194  const auto permuted_idx = permutation_idxs[i];
195  const auto& id = input_id[permuted_idx];
196  const auto& site_id = input_site_id[permuted_idx];
197  const auto& current_ts = input_ts[permuted_idx];
198 
199  if (id != last_id || site_id != last_site_id ||
200  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
201  if (id != last_id) {
202  transition_count++;
203  if (transition_count == max_transitions) {
204  break;
205  }
206  }
207  if (last_id != id_null_val && last_site_id != site_id_null_val) {
208  if (session_num_points >= min_dwell_points &&
209  get_elapsed_seconds(session_start_ts, last_ts) >=
210  min_dwell_seconds) {
211  thread_session_count++;
212  }
213  }
214  last_id = id;
215  last_site_id = site_id;
216  session_num_points = 1;
217  session_start_ts = current_ts;
218  } else {
219  session_num_points++;
220  }
221  last_ts = current_ts;
222  }
223  }
224  if (last_id != id_null_val && last_site_id != site_id_null_val) {
225  if (session_num_points >= min_dwell_points &&
226  get_elapsed_seconds(session_start_ts, last_ts) >= min_dwell_seconds) {
227  thread_session_count++;
228  }
229  }
230  per_thread_actual_idx_offsets[thread_idx].second = i;
231  per_thread_session_counts[thread_idx] = thread_session_count;
232  },
233  start_row_idx,
234  end_row_idx,
235  thread_idx));
236 
237  start_row_idx += thread_info.num_elems_per_thread;
238  }
239  }
240  for (auto& worker_thread : worker_threads) {
241  worker_thread.wait();
242  }
243  worker_threads.clear();
244 
245  // Now compute a prefix_sum
246  std::vector<int32_t> session_counts_prefix_sums(thread_info.num_threads + 1);
247  session_counts_prefix_sums[0] = 0;
248  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
249  session_counts_prefix_sums[thread_idx + 1] =
250  session_counts_prefix_sums[thread_idx] + per_thread_session_counts[thread_idx];
251  }
252  const auto num_output_rows = session_counts_prefix_sums[thread_info.num_threads];
253  mgr.set_output_row_size(num_output_rows);
254  if (num_output_rows == 0) {
255  return num_output_rows;
256  }
257 
258  {
259  // Now actually compute the dwell times and other attributes, using the per-thread
260  // computed input and output offsets computed above
261  auto dwell_calc_timer = DEBUG_TIMER("Dwell Calc");
262  for (int32_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
263  const int32_t start_row_idx = per_thread_actual_idx_offsets[thread_idx].first;
264  const int32_t end_row_idx = per_thread_actual_idx_offsets[thread_idx].second;
265  const int32_t output_start_offset = session_counts_prefix_sums[thread_idx];
266  const int32_t num_sessions = session_counts_prefix_sums[thread_idx + 1] -
267  session_counts_prefix_sums[thread_idx];
268  worker_threads.emplace_back(std::async(
270  [&input_id,
271  &input_site_id,
272  &input_ts,
273  &output_id,
274  &output_site_id,
275  &output_session_id,
276  &output_start_seq_id,
277  &output_start_ts,
278  &output_dwell_time_sec,
279  &output_dwell_points,
280  &permutation_idxs,
281  &id_null_val,
282  &site_id_null_val,
283  min_dwell_points,
284  min_dwell_seconds,
285  max_inactive_seconds](const auto start_row_idx,
286  const auto end_row_idx,
287  const auto output_start_offset,
288  const auto num_sessions) {
289  if (!(end_row_idx > start_row_idx)) {
290  return;
291  }
292  int32_t output_offset = output_start_offset;
293  int32_t session_start_seq_id = 1;
294  int32_t session_seq_id = 1;
295  int32_t session_id = 1;
296  auto last_id = input_id[permutation_idxs[start_row_idx]];
297  auto last_site_id = input_site_id[permutation_idxs[start_row_idx]];
298  auto session_start_ts = input_ts[permutation_idxs[start_row_idx]];
299  auto last_ts = input_ts[permutation_idxs[start_row_idx]];
300  for (int32_t idx = start_row_idx; idx < end_row_idx; ++idx) {
301  const auto permuted_idx = permutation_idxs[idx];
302  const auto& id = input_id[permuted_idx];
303  const auto& site_id = input_site_id[permuted_idx];
304  const auto& current_ts = input_ts[permuted_idx];
305  if (id != last_id || site_id != last_site_id ||
306  get_elapsed_seconds(last_ts, current_ts) > max_inactive_seconds) {
307  if (last_id != id_null_val && last_site_id != site_id_null_val) {
308  const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
309  const int32_t num_dwell_seconds =
310  get_elapsed_seconds(session_start_ts, last_ts);
311  if (num_dwell_points >= min_dwell_points &&
312  num_dwell_seconds >= min_dwell_seconds) {
313  output_id[output_offset] = last_id;
314  output_site_id[output_offset] = last_site_id;
315  output_session_id[output_offset] = session_id++;
316  output_start_seq_id[output_offset] = session_start_seq_id;
317  output_start_ts[output_offset] = session_start_ts;
318  output_dwell_time_sec[output_offset] = num_dwell_seconds;
319  output_dwell_points[output_offset] = num_dwell_points;
320  output_offset++;
321  }
322  }
323  last_site_id = site_id;
324  session_start_ts = input_ts[permuted_idx];
325  if (id != last_id) {
326  last_id = id;
327  session_start_seq_id = 1;
328  session_seq_id = 1;
329  session_id = 1;
330  } else {
331  session_start_seq_id = session_seq_id;
332  }
333  }
334  session_seq_id++;
335  last_ts = current_ts;
336  }
337  if (last_id != id_null_val && last_site_id != site_id_null_val) {
338  const int32_t num_dwell_points = session_seq_id - session_start_seq_id;
339  const int32_t num_dwell_seconds =
340  get_elapsed_seconds(session_start_ts, last_ts);
341  if (num_dwell_points >= min_dwell_points &&
342  num_dwell_seconds >= min_dwell_seconds) {
343  output_id[output_offset] = last_id;
344  output_site_id[output_offset] = last_site_id;
345  output_session_id[output_offset] = session_id++;
346  output_start_seq_id[output_offset] = session_start_seq_id;
347  output_start_ts[output_offset] = session_start_ts;
348  output_dwell_time_sec[output_offset] = num_dwell_seconds;
349  output_dwell_points[output_offset] = num_dwell_points;
350  output_offset++;
351  }
352  }
353  CHECK_EQ(output_offset - output_start_offset, num_sessions);
354  },
355  start_row_idx,
356  end_row_idx,
357  output_start_offset,
358  num_sessions));
359  }
360  }
361  for (auto& worker_thread : worker_threads) {
362  worker_thread.wait();
363  }
364 
365  {
366  output_prev_site_id[0] = site_id_null_val;
367  output_next_site_id[0] = num_output_rows > 1 && output_id[0] == output_id[1]
368  ? output_site_id[1]
369  : site_id_null_val;
370  output_prev_site_id[num_output_rows - 1] =
371  num_output_rows > 1 &&
372  output_id[num_output_rows - 1] == output_id[num_output_rows - 2]
373  ? output_site_id[num_output_rows - 2]
374  : site_id_null_val;
375  output_next_site_id[num_output_rows - 1] = site_id_null_val;
376 
377  auto permute_creation_timer = DEBUG_TIMER("Fill lagged and lead site ids");
378  tbb::parallel_for(tbb::blocked_range<int32_t>(1, num_output_rows - 1),
379  [&](const tbb::blocked_range<int32_t>& r) {
380  const int32_t r_end = r.end();
381  for (int32_t p = r.begin(); p < r_end; ++p) {
382  output_prev_site_id[p] = output_id[p] == output_id[p - 1]
383  ? output_site_id[p - 1]
384  : site_id_null_val;
385  output_next_site_id[p] = output_id[p] == output_id[p + 1]
386  ? output_site_id[p + 1]
387  : site_id_null_val;
388  }
389  });
390  }
391 
392  return num_output_rows;
393 }
394 
395 #endif // #ifdef HAVE_TBB
396 #endif // #ifndef __CUDACC__
void set_output_row_size(int64_t num_rows)
Definition: heavydbTypes.h:678
#define CHECK_EQ(x, y)
Definition: Logger.h:230
DEVICE int64_t size() const
Definition: heavydbTypes.h:468
int64_t time
Definition: heavydbTypes.h:212
#define CHECK_GT(x, y)
Definition: Logger.h:234
constexpr double a
Definition: Utm.h:32
#define HOST
future< Result > async(Fn &&fn, Args &&...args)
DEVICE bool isNull(int64_t index) const
Definition: heavydbTypes.h:470
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define NEVER_INLINE
#define DEBUG_TIMER(name)
Definition: Logger.h:369