OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UtilityTableFunctions.cpp File Reference
#include <string>
#include "Shared/ThreadInfo.h"
#include "UtilityTableFunctions.h"
#include <chrono>
#include <random>
#include <thread>
+ Include dependency graph for UtilityTableFunctions.cpp:

Go to the source code of this file.

Functions

EXTENSION_NOINLINE_HOST int32_t generate_series_parallel (const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)
 
EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_1 (TableFunctionManager &mgr, const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)
 
EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_2 (TableFunctionManager &mgr, const int64_t start, const int64_t stop, Column< int64_t > &series_output)
 
HOST std::string gen_random_str (std::mt19937 &generator, const int64_t str_len)
 
EXTENSION_NOINLINE_HOST int32_t generate_random_strings__cpu_ (TableFunctionManager &mgr, const int64_t num_strings, const int64_t string_length, Column< int64_t > &output_id, Column< TextEncodingDict > &output_strings)
 

Function Documentation

HOST std::string gen_random_str ( std::mt19937 &  generator,
const int64_t  str_len 
)

Definition at line 108 of file UtilityTableFunctions.cpp.

Referenced by generate_random_strings__cpu_().

108  {
109  constexpr char alphanum_lookup_table[] =
110  "0123456789"
111  "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
112  "abcdefghijklmnopqrstuvwxyz";
113  constexpr size_t char_mod = sizeof(alphanum_lookup_table) - 1;
114  std::uniform_int_distribution<int32_t> rand_distribution(0, char_mod);
115 
116  std::string tmp_s;
117  tmp_s.reserve(str_len);
118  for (int i = 0; i < str_len; ++i) {
119  tmp_s += alphanum_lookup_table[rand_distribution(generator)];
120  }
121  return tmp_s;
122 }

+ Here is the caller graph for this function:

EXTENSION_NOINLINE_HOST int32_t generate_random_strings__cpu_ ( TableFunctionManager mgr,
const int64_t  num_strings,
const int64_t  string_length,
Column< int64_t > &  output_id,
Column< TextEncodingDict > &  output_strings 
)

Definition at line 130 of file UtilityTableFunctions.cpp.

References CHECK_LE, DEBUG_TIMER, gen_random_str(), ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), and TableFunctionManager::set_output_row_size().

134  {
135  auto timer = DEBUG_TIMER(__func__);
136  // Check for out-of-range errors for the input parameters
137  // in the function instead of with require due to issue encountered
138  // with require over multiple variables
139  constexpr int64_t max_strings{10000000L};
140  constexpr int64_t max_str_len{10000L};
141  if (num_strings > max_strings) {
142  return mgr.ERROR_MESSAGE(
143  "generate_random_strings: num_strings must be between 0 and 10,000,000.");
144  }
145  if (string_length > max_str_len) {
146  return mgr.ERROR_MESSAGE(
147  "generate_random_strings: string_length must be between 1 and 10,000.");
148  }
149  if (num_strings == 0L) {
150  // Bail early as there is no work to be done
151  return 0;
152  }
153 
154  mgr.set_output_row_size(num_strings);
155  constexpr int64_t target_strings_per_thread{5000};
156  const ThreadInfo thread_info(
157  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
158  std::vector<std::mt19937> per_thread_rand_generators;
159  per_thread_rand_generators.reserve(thread_info.num_threads);
160  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
161  const uint64_t seed = std::chrono::duration_cast<std::chrono::nanoseconds>(
162  std::chrono::system_clock::now().time_since_epoch())
163  .count() +
164  thread_idx * 971;
165  per_thread_rand_generators.emplace_back(seed);
166  }
167  std::vector<std::string> rand_strings(num_strings);
168  tbb::task_arena limited_arena(thread_info.num_threads);
169  limited_arena.execute([&] {
170  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
172  tbb::blocked_range<int64_t>(0, num_strings, thread_info.num_elems_per_thread),
173  [&](const tbb::blocked_range<int64_t>& r) {
174  const int64_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
175  const int64_t start_out_idx = r.begin();
176  const int64_t end_out_idx = r.end();
177  for (int64_t out_idx = start_out_idx; out_idx != end_out_idx; ++out_idx) {
178  rand_strings[out_idx] =
179  gen_random_str(per_thread_rand_generators[tbb_thread_idx], string_length);
180  }
181  },
182  tbb::simple_partitioner());
183  });
184  const std::vector<int32_t> rand_string_ids =
185  output_strings.string_dict_proxy_->getOrAddTransientBulk(rand_strings);
186  for (int64_t row_idx = 0; row_idx < num_strings; row_idx++) {
187  output_id[row_idx] = row_idx;
188  output_strings[row_idx] = rand_string_ids[row_idx];
189  }
190  return num_strings;
191 }
void set_output_row_size(int64_t num_rows)
StringDictionaryProxy * string_dict_proxy_
Definition: heavydbTypes.h:751
#define CHECK_LE(x, y)
Definition: Logger.h:233
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
std::vector< int32_t > getOrAddTransientBulk(const std::vector< std::string > &strings)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
HOST std::string gen_random_str(std::mt19937 &generator, const int64_t str_len)

+ Here is the call graph for this function:

EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_1 ( TableFunctionManager mgr,
const int64_t  start,
const int64_t  stop,
const int64_t  step,
Column< int64_t > &  series_output 
)

Definition at line 59 of file UtilityTableFunctions.cpp.

References generate_series_parallel(), TableFunctionManager::set_output_row_size(), and to_string().

Referenced by generate_series__cpu_2().

63  {
64  const int64_t MAX_ROWS{1L << 30};
65  const int64_t PARALLEL_THRESHOLD{10000L};
66  const int64_t num_rows = ((stop - start) / step) + 1;
67  if (num_rows <= 0) {
68  mgr.set_output_row_size(0);
69  return 0;
70  }
71  mgr.set_output_row_size(num_rows);
72 
73  if (num_rows > MAX_ROWS) {
74  return mgr.ERROR_MESSAGE(
75  "Invocation of generate_series would result in " + std::to_string(num_rows) +
76  " rows, which exceeds the max limit of " + std::to_string(MAX_ROWS) + " rows.");
77  }
78 
79 #ifdef HAVE_TBB
80  if (num_rows > PARALLEL_THRESHOLD) {
81  return generate_series_parallel(start, stop, step, series_output);
82  }
83 #endif
84 
85  for (int64_t out_idx = 0; out_idx != num_rows; ++out_idx) {
86  series_output[out_idx] = start + out_idx * step;
87  }
88  return num_rows;
89 }
void set_output_row_size(int64_t num_rows)
std::string to_string(char const *&&v)
EXTENSION_NOINLINE_HOST int32_t generate_series_parallel(const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_2 ( TableFunctionManager mgr,
const int64_t  start,
const int64_t  stop,
Column< int64_t > &  series_output 
)

Definition at line 97 of file UtilityTableFunctions.cpp.

References generate_series__cpu_1().

100  {
101  return generate_series__cpu_1(mgr, start, stop, 1, series_output);
102 }
EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_1(TableFunctionManager &mgr, const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)

+ Here is the call graph for this function:

EXTENSION_NOINLINE_HOST int32_t generate_series_parallel ( const int64_t  start,
const int64_t  stop,
const int64_t  step,
Column< int64_t > &  series_output 
)

Definition at line 35 of file UtilityTableFunctions.cpp.

References threading_serial::parallel_for().

Referenced by generate_series__cpu_1().

38  {
39  const int64_t num_rows = ((stop - start) / step) + 1;
40 
41  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_rows),
42  [&](const tbb::blocked_range<int64_t>& r) {
43  const int64_t start_out_idx = r.begin();
44  const int64_t end_out_idx = r.end();
45  for (int64_t out_idx = start_out_idx; out_idx != end_out_idx;
46  ++out_idx) {
47  series_output[out_idx] = start + out_idx * step;
48  }
49  });
50  return num_rows;
51 }
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function: