OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UtilityTableFunctions.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __CUDACC__
18 
19 #include <string>
20 
21 #ifdef HAVE_TBB
22 #include <tbb/parallel_for.h>
23 #include <tbb/task_arena.h>
24 #endif
25 
26 #include "Shared/ThreadInfo.h"
27 #include "UtilityTableFunctions.h"
28 
29 int64_t numStepsBetween(int64_t start, int64_t stop, int64_t step) {
30  return (stop - start) / step;
31 }
32 
33 template <typename T>
34 int64_t numStepsBetween(Timestamp start, Timestamp stop, T step) {
35  return step.numStepsBetween(start, stop);
36 }
37 
38 template <typename T, typename K>
39 int32_t generate_series_parallel(const T start,
40  const T stop,
41  const K step,
42  Column<T>& series_output) {
43  const int64_t num_rows = numStepsBetween(start, stop, step) + 1;
44 
45  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
46  [&](const tbb::blocked_range<int64_t>& r) {
47  const int64_t start_out_idx = r.begin();
48  const int64_t end_out_idx = r.end();
49  for (int64_t out_idx = start_out_idx; out_idx != end_out_idx;
50  ++out_idx) {
51  series_output[out_idx] = start + (step * out_idx);
52  }
53  });
54  return num_rows;
55 }
56 
57 template <typename T, typename K>
59  const T start,
60  const T stop,
61  const K step,
62  Column<T>& series_output) {
63  const int64_t MAX_ROWS{1L << 30};
64  const int64_t PARALLEL_THRESHOLD{10000L};
65  const int64_t num_rows = numStepsBetween(start, stop, step) + 1;
66  if (num_rows <= 0) {
67  mgr.set_output_row_size(0);
68  return 0;
69  }
70  mgr.set_output_row_size(num_rows);
71 
72  if (num_rows > MAX_ROWS) {
73  return mgr.ERROR_MESSAGE(
74  "Invocation of generate_series would result in " + std::to_string(num_rows) +
75  " rows, which exceeds the max limit of " + std::to_string(MAX_ROWS) + " rows.");
76  }
77 
78 #ifdef HAVE_TBB
79  if (num_rows > PARALLEL_THRESHOLD) {
80  return generate_series_parallel(start, stop, step, series_output);
81  }
82 #endif
83 
84  for (int64_t out_idx = 0; out_idx != num_rows; ++out_idx) {
85  series_output[out_idx] = start + (step * out_idx);
86  }
87  return num_rows;
88 }
89 
90 template <typename T>
92  const T start,
93  const T stop,
94  Column<T>& series_output) {
96  mgr, start, stop, static_cast<int64_t>(1), series_output);
97 }
98 
99 #include <chrono>
100 #include <random>
101 #include <thread>
102 
103 HOST std::string gen_random_str(std::mt19937& generator, const int64_t str_len) {
104  constexpr char alphanum_lookup_table[] =
105  "0123456789"
106  "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
107  "abcdefghijklmnopqrstuvwxyz";
108  constexpr size_t char_mod = sizeof(alphanum_lookup_table) - 1;
109  std::uniform_int_distribution<int32_t> rand_distribution(0, char_mod);
110 
111  std::string tmp_s;
112  tmp_s.reserve(str_len);
113  for (int i = 0; i < str_len; ++i) {
114  tmp_s += alphanum_lookup_table[rand_distribution(generator)];
115  }
116  return tmp_s;
117 }
118 
120 #ifdef _WIN32
121 #pragma comment(linker "/INCLUDE:generate_random_strings__cpu_")
122 #else
123 __attribute__((__used__))
124 #endif
126  const int64_t num_strings,
127  const int64_t string_length,
128  Column<int64_t>& output_id,
129  Column<TextEncodingDict>& output_strings) {
130  auto timer = DEBUG_TIMER(__func__);
131  // Check for out-of-range errors for the input parameters
132  // in the function instead of with require due to issue encountered
133  // with require over multiple variables
134  constexpr int64_t max_strings{10000000L};
135  constexpr int64_t max_str_len{10000L};
136  if (num_strings > max_strings) {
137  return mgr.ERROR_MESSAGE(
138  "generate_random_strings: num_strings must be between 0 and 10,000,000.");
139  }
140  if (string_length > max_str_len) {
141  return mgr.ERROR_MESSAGE(
142  "generate_random_strings: string_length must be between 1 and 10,000.");
143  }
144  if (num_strings == 0L) {
145  // Bail early as there is no work to be done
146  return 0;
147  }
148 
149  mgr.set_output_row_size(num_strings);
150  constexpr int64_t target_strings_per_thread{5000};
151  const ThreadInfo thread_info(
152  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
153  std::vector<std::mt19937> per_thread_rand_generators;
154  per_thread_rand_generators.reserve(thread_info.num_threads);
155  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
156  const uint64_t seed = std::chrono::duration_cast<std::chrono::nanoseconds>(
157  std::chrono::system_clock::now().time_since_epoch())
158  .count() +
159  thread_idx * 971;
160  per_thread_rand_generators.emplace_back(seed);
161  }
162  std::vector<std::string> rand_strings(num_strings);
163  tbb::task_arena limited_arena(thread_info.num_threads);
164  limited_arena.execute([&] {
165  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
167  tbb::blocked_range<int64_t>(0, num_strings, thread_info.num_elems_per_thread),
168  [&](const tbb::blocked_range<int64_t>& r) {
169  const int64_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
170  const int64_t start_out_idx = r.begin();
171  const int64_t end_out_idx = r.end();
172  for (int64_t out_idx = start_out_idx; out_idx != end_out_idx; ++out_idx) {
173  rand_strings[out_idx] =
174  gen_random_str(per_thread_rand_generators[tbb_thread_idx], string_length);
175  }
176  },
177  tbb::simple_partitioner());
178  });
179  const std::vector<int32_t> rand_string_ids =
180  output_strings.string_dict_proxy_->getOrAddTransientBulk(rand_strings);
181  for (int64_t row_idx = 0; row_idx < num_strings; row_idx++) {
182  output_id[row_idx] = row_idx;
183  output_strings[row_idx] = rand_string_ids[row_idx];
184  }
185  return num_strings;
186 }
187 
188 // Explicit template instantiations
189 
190 // wrappers for step calculation
193 
194 // parallel implementations
195 template int32_t generate_series_parallel(int64_t, int64_t, int64_t, Column<int64_t>&);
196 template int32_t generate_series_parallel(Timestamp,
197  Timestamp,
200 template int32_t generate_series_parallel(Timestamp,
201  Timestamp,
204 
205 // non-default step implementations
207  int64_t,
208  int64_t,
209  int64_t,
210  Column<int64_t>&);
212  Timestamp,
213  Timestamp,
217  Timestamp,
218  Timestamp,
221 
222 // default step implementations
224  int64_t,
225  int64_t,
226  Column<int64_t>&);
227 
228 #endif //__CUDACC__
void set_output_row_size(int64_t num_rows)
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
EXTENSION_NOINLINE_HOST int32_t generate_random_strings__cpu_(TableFunctionManager &mgr, const int64_t num_strings, const int64_t string_length, Column< int64_t > &output_id, Column< TextEncodingDict > &output_strings)
std::string to_string(char const *&&v)
#define HOST
FORCE_INLINE T __attribute__((__may_alias__))*may_alias_ptr(T *ptr)
Definition: TypePunning.h:32
int64_t num_threads
Definition: ThreadInfo.h:22
#define EXTENSION_NOINLINE_HOST
Definition: heavydbTypes.h:49
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
int64_t numStepsBetween(int64_t start, int64_t stop, int64_t step)
#define NEVER_INLINE
#define DEBUG_TIMER(name)
Definition: Logger.h:411
HOST std::string gen_random_str(std::mt19937 &generator, const int64_t str_len)
NEVER_INLINE HOST int32_t generate_series__cpu_template(TableFunctionManager &mgr, const T start, const T stop, const K step, Column< T > &series_output)
int32_t generate_series_parallel(const T start, const T stop, const K step, Column< T > &series_output)