22 #include <tbb/parallel_for.h>
23 #include <tbb/task_arena.h>
31 #pragma comment(linker "/INCLUDE:generate_series_parallel")
39 const int64_t num_rows = ((stop - start) / step) + 1;
42 [&](
const tbb::blocked_range<int64_t>& r) {
43 const int64_t start_out_idx = r.begin();
44 const int64_t end_out_idx = r.end();
45 for (int64_t out_idx = start_out_idx; out_idx != end_out_idx;
47 series_output[out_idx] = start + out_idx * step;
55 #pragma comment(linker "/INCLUDE:generate_series__cpu_1")
64 const int64_t MAX_ROWS{1L << 30};
65 const int64_t PARALLEL_THRESHOLD{10000L};
66 const int64_t num_rows = ((stop - start) / step) + 1;
73 if (num_rows > MAX_ROWS) {
74 return mgr.ERROR_MESSAGE(
75 "Invocation of generate_series would result in " +
std::to_string(num_rows) +
76 " rows, which exceeds the max limit of " +
std::to_string(MAX_ROWS) +
" rows.");
80 if (num_rows > PARALLEL_THRESHOLD) {
85 for (int64_t out_idx = 0; out_idx != num_rows; ++out_idx) {
86 series_output[out_idx] = start + out_idx * step;
93 #pragma comment(linker "/INCLUDE:generate_series__cpu_2")
109 constexpr
char alphanum_lookup_table[] =
111 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
112 "abcdefghijklmnopqrstuvwxyz";
113 constexpr
size_t char_mod =
sizeof(alphanum_lookup_table) - 1;
114 std::uniform_int_distribution<int32_t> rand_distribution(0, char_mod);
117 tmp_s.reserve(str_len);
118 for (
int i = 0; i < str_len; ++i) {
119 tmp_s += alphanum_lookup_table[rand_distribution(generator)];
126 #pragma comment(linker "/INCLUDE:generate_random_strings__cpu_")
131 const int64_t num_strings,
132 const int64_t string_length,
139 constexpr int64_t max_strings{10000000L};
140 constexpr int64_t max_str_len{10000L};
141 if (num_strings > max_strings) {
142 return mgr.ERROR_MESSAGE(
143 "generate_random_strings: num_strings must be between 0 and 10,000,000.");
145 if (string_length > max_str_len) {
146 return mgr.ERROR_MESSAGE(
147 "generate_random_strings: string_length must be between 1 and 10,000.");
149 if (num_strings == 0L) {
155 constexpr int64_t target_strings_per_thread{5000};
157 std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
158 std::vector<std::mt19937> per_thread_rand_generators;
159 per_thread_rand_generators.reserve(thread_info.
num_threads);
160 for (int64_t thread_idx = 0; thread_idx < thread_info.
num_threads; ++thread_idx) {
161 const uint64_t seed = std::chrono::duration_cast<std::chrono::nanoseconds>(
162 std::chrono::system_clock::now().time_since_epoch())
165 per_thread_rand_generators.emplace_back(seed);
167 std::vector<std::string> rand_strings(num_strings);
168 tbb::task_arena limited_arena(thread_info.
num_threads);
169 limited_arena.execute([&] {
173 [&](
const tbb::blocked_range<int64_t>& r) {
174 const int64_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
175 const int64_t start_out_idx = r.begin();
176 const int64_t end_out_idx = r.end();
177 for (int64_t out_idx = start_out_idx; out_idx != end_out_idx; ++out_idx) {
178 rand_strings[out_idx] =
179 gen_random_str(per_thread_rand_generators[tbb_thread_idx], string_length);
182 tbb::simple_partitioner());
184 const std::vector<int32_t> rand_string_ids =
185 output_strings.string_dict_proxy_->getOrAddTransientBulk(rand_strings);
186 for (int64_t row_idx = 0; row_idx < num_strings; row_idx++) {
187 output_id[row_idx] = row_idx;
188 output_strings[row_idx] = rand_string_ids[row_idx];
void set_output_row_size(int64_t num_rows)
int64_t num_elems_per_thread
EXTENSION_NOINLINE_HOST int32_t generate_random_strings__cpu_(TableFunctionManager &mgr, const int64_t num_strings, const int64_t string_length, Column< int64_t > &output_id, Column< TextEncodingDict > &output_strings)
EXTENSION_NOINLINE_HOST int32_t generate_series_parallel(const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)
FORCE_INLINE T __attribute__((__may_alias__))*may_alias_ptr(T *ptr)
#define EXTENSION_NOINLINE_HOST
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_2(TableFunctionManager &mgr, const int64_t start, const int64_t stop, Column< int64_t > &series_output)
#define DEBUG_TIMER(name)
HOST std::string gen_random_str(std::mt19937 &generator, const int64_t str_len)
EXTENSION_NOINLINE_HOST int32_t generate_series__cpu_1(TableFunctionManager &mgr, const int64_t start, const int64_t stop, const int64_t step, Column< int64_t > &series_output)