OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Shared/Intervals.h"
22 #include "Shared/likely.h"
23 #include "Shared/thread_count.h"
24 
25 #include <atomic>
26 #include <future>
27 #include <numeric>
28 
29 namespace {
30 
31 inline int64_t fixed_encoding_nullable_val(const int64_t val,
32  const SQLTypeInfo& type_info) {
33  if (type_info.get_compression() != kENCODING_NONE) {
34  CHECK(type_info.get_compression() == kENCODING_FIXED ||
35  type_info.get_compression() == kENCODING_DICT);
36  auto logical_ti = get_logical_type_info(type_info);
37  if (val == inline_int_null_val(logical_ti)) {
38  return inline_fixed_encoding_null_val(type_info);
39  }
40  }
41  return val;
42 }
43 
44 } // namespace
45 
46 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
47  const ResultSet& rows,
48  const size_t num_columns,
49  const std::vector<SQLTypeInfo>& target_types,
50  const size_t thread_idx,
51  const bool is_parallel_execution_enforced)
52  : column_buffers_(num_columns)
53  , num_rows_(result_set::use_parallel_algorithms(rows) ||
54  rows.isDirectColumnarConversionPossible()
55  ? rows.entryCount()
56  : rows.rowCount())
57  , target_types_(target_types)
58  , parallel_conversion_(is_parallel_execution_enforced
59  ? true
60  : result_set::use_parallel_algorithms(rows))
61  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
62  , thread_idx_(thread_idx) {
63  auto timer = DEBUG_TIMER(__func__);
64  column_buffers_.resize(num_columns);
65  for (size_t i = 0; i < num_columns; ++i) {
66  const bool is_varlen = target_types[i].is_array() ||
67  (target_types[i].is_string() &&
68  target_types[i].get_compression() == kENCODING_NONE) ||
69  target_types[i].is_geometry();
70  if (is_varlen) {
72  }
74  !rows.isZeroCopyColumnarConversionPossible(i)) {
75  column_buffers_[i] = row_set_mem_owner->allocate(
76  num_rows_ * target_types[i].get_size(), thread_idx_);
77  }
78  }
79 
80  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
81  materializeAllColumnsDirectly(rows, num_columns);
82  } else {
83  materializeAllColumnsThroughIteration(rows, num_columns);
84  }
85 }
86 
87 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
88  const int8_t* one_col_buffer,
89  const size_t num_rows,
90  const SQLTypeInfo& target_type,
91  const size_t thread_idx)
92  : column_buffers_(1)
93  , num_rows_(num_rows)
94  , target_types_{target_type}
95  , parallel_conversion_(false)
96  , direct_columnar_conversion_(false)
97  , thread_idx_(thread_idx) {
98  auto timer = DEBUG_TIMER(__func__);
99  const bool is_varlen =
100  target_type.is_array() ||
101  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
102  target_type.is_geometry();
103  if (is_varlen) {
105  }
106  const auto buf_size = num_rows * target_type.get_size();
107  column_buffers_[0] =
108  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
109  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
110 }
111 
112 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
113  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
114  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
115  if (sub_results.empty()) {
116  return nullptr;
117  }
118  const auto total_row_count = std::accumulate(
119  sub_results.begin(),
120  sub_results.end(),
121  size_t(0),
122  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
123  return init + result->size();
124  });
125  std::unique_ptr<ColumnarResults> merged_results(
126  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
127  const auto col_count = sub_results[0]->column_buffers_.size();
128  const auto nonempty_it = std::find_if(
129  sub_results.begin(),
130  sub_results.end(),
131  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
132  if (nonempty_it == sub_results.end()) {
133  return nullptr;
134  }
135  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
136  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
137  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
138  merged_results->column_buffers_.push_back(write_ptr);
139  for (auto& rs : sub_results) {
140  CHECK_EQ(col_count, rs->column_buffers_.size());
141  if (!rs->size()) {
142  continue;
143  }
144  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
145  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
146  write_ptr += rs->size() * byte_width;
147  }
148  }
149  return merged_results;
150 }
151 
157  const size_t num_columns) {
158  std::atomic<size_t> row_idx{0};
159  if (isParallelConversion()) {
160  const size_t worker_count = cpu_threads();
161  std::vector<std::future<void>> conversion_threads;
162  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
163  const auto crt_row = rows.getRowAtNoTranslations(i);
164  if (!crt_row.empty()) {
165  auto cur_row_idx = row_idx.fetch_add(1);
166  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
167  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
168  }
169  }
170  };
171  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
172  conversion_threads.push_back(std::async(
173  std::launch::async,
174  [&do_work](const size_t start, const size_t end) {
176  size_t local_idx = 0;
177  for (size_t i = start; i < end; ++i, ++local_idx) {
178  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
180  }
181  do_work(i);
182  }
183  } else {
184  for (size_t i = start; i < end; ++i) {
185  do_work(i);
186  }
187  }
188  },
189  interval.begin,
190  interval.end));
191  }
192 
193  try {
194  for (auto& child : conversion_threads) {
195  child.wait();
196  }
197  } catch (QueryExecutionError& e) {
200  }
201  throw e;
202  } catch (...) {
203  throw;
204  }
205 
206  num_rows_ = row_idx;
207  rows.setCachedRowCount(num_rows_);
208  return;
209  }
210  bool done = false;
211  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
212  const auto crt_row = rows.getNextRow(false, false);
213  if (crt_row.empty()) {
214  done = true;
215  return;
216  }
217  for (size_t i = 0; i < num_columns; ++i) {
218  writeBackCell(crt_row[i], row_idx, i);
219  }
220  ++row_idx;
221  };
223  while (!done) {
224  if (UNLIKELY((row_idx & 0xFFFF) == 0 && check_interrupt())) {
226  }
227  do_work();
228  }
229  } else {
230  while (!done) {
231  do_work();
232  }
233  }
234 
235  rows.moveToBegin();
236 }
237 
238 /*
239  * This function processes and decodes its input TargetValue
240  * and write it into its corresponding column buffer's cell (with corresponding
241  * row and column indices)
242  *
243  * NOTE: this is not supposed to be processing varlen types, and they should be
244  * handled differently outside this function.
245  */
246 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
247  const size_t row_idx,
248  const size_t column_idx) {
249  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
250  CHECK(scalar_col_val);
251  auto i64_p = boost::get<int64_t>(scalar_col_val);
252  const auto& type_info = target_types_[column_idx];
253  if (i64_p) {
254  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
255  switch (target_types_[column_idx].get_size()) {
256  case 1:
257  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
258  break;
259  case 2:
260  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
261  break;
262  case 4:
263  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
264  break;
265  case 8:
266  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
267  break;
268  default:
269  CHECK(false);
270  }
271  } else {
272  CHECK(target_types_[column_idx].is_fp());
273  switch (target_types_[column_idx].get_type()) {
274  case kFLOAT: {
275  auto float_p = boost::get<float>(scalar_col_val);
276  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
277  break;
278  }
279  case kDOUBLE: {
280  auto double_p = boost::get<double>(scalar_col_val);
281  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
282  break;
283  }
284  default:
285  CHECK(false);
286  }
287  }
288 }
289 
295 template <typename DATA_TYPE>
296 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
297  const size_t input_buffer_entry_idx,
298  const size_t output_buffer_entry_idx,
299  const size_t target_idx,
300  const size_t slot_idx,
301  const ReadFunction& read_from_function) {
302  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
303  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
304  target_types_[target_idx]));
305  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
306  val;
307 }
308 
309 template <>
310 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
311  const size_t input_buffer_entry_idx,
312  const size_t output_buffer_entry_idx,
313  const size_t target_idx,
314  const size_t slot_idx,
315  const ReadFunction& read_from_function) {
316  const int32_t ival =
317  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
318  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
319  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
320 }
321 
322 template <>
323 void ColumnarResults::writeBackCellDirect<double>(
324  const ResultSet& rows,
325  const size_t input_buffer_entry_idx,
326  const size_t output_buffer_entry_idx,
327  const size_t target_idx,
328  const size_t slot_idx,
329  const ReadFunction& read_from_function) {
330  const int64_t ival =
331  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
332  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
333  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
334 }
335 
346  const size_t num_columns) {
348  switch (rows.getQueryDescriptionType()) {
350  materializeAllColumnsProjection(rows, num_columns);
351  break;
352  }
355  materializeAllColumnsGroupBy(rows, num_columns);
356  break;
357  }
358  default:
359  UNREACHABLE()
360  << "Direct columnar conversion for this query type is not supported yet.";
361  }
362 }
363 
372  const size_t num_columns) {
373  CHECK(rows.query_mem_desc_.didOutputColumnar());
375  rows.query_mem_desc_.getQueryDescriptionType() ==
377 
378  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
379 
380  // We can directly copy each non-lazy column's content
381  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
382 
383  // Only lazy columns are iterated through first and then materialized
384  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
385 }
386 
387 /*
388  * For all non-lazy columns, we can directly copy back the results of each column's
389  * contents from different storages and put them into the corresponding output buffer.
390  *
391  * This function is parallelized through assigning each column to a CPU thread.
392  */
394  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
395  const ResultSet& rows,
396  const size_t num_columns) {
398  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
399  // Saman: make sure when this lazy_fetch_info is empty
400  if (lazy_fetch_info.empty()) {
401  return true;
402  } else {
403  return !lazy_fetch_info[col_idx].is_lazily_fetched;
404  }
405  };
406 
407  // parallelized by assigning each column to a thread
408  std::vector<std::future<void>> direct_copy_threads;
409  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
410  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
411  CHECK(!column_buffers_[col_idx]);
412  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
413  } else if (is_column_non_lazily_fetched(col_idx)) {
414  direct_copy_threads.push_back(std::async(
415  std::launch::async,
416  [&rows, this](const size_t column_index) {
417  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
418  rows.copyColumnIntoBuffer(
419  column_index, column_buffers_[column_index], column_size);
420  },
421  col_idx));
422  }
423  }
424 
425  for (auto& child : direct_copy_threads) {
426  child.wait();
427  }
428 }
429 
440  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
441  const ResultSet& rows,
442  const size_t num_columns) {
444  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
445  const size_t row_idx,
446  const std::vector<bool>& targets_to_skip) {
447  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
448  for (size_t i = 0; i < num_columns; ++i) {
449  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
450  writeBackCell(crt_row[i], row_idx, i);
451  }
452  }
453  };
454 
455  const auto contains_lazy_fetched_column =
456  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
457  for (auto& col_info : lazy_fetch_info) {
458  if (col_info.is_lazily_fetched) {
459  return true;
460  }
461  }
462  return false;
463  };
464 
465  // parallelized by assigning a chunk of rows to each thread)
466  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
467  if (contains_lazy_fetched_column(lazy_fetch_info)) {
468  const size_t worker_count =
470  std::vector<std::future<void>> conversion_threads;
471  std::vector<bool> targets_to_skip;
472  if (skip_non_lazy_columns) {
473  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
474  targets_to_skip.reserve(num_columns);
475  for (size_t i = 0; i < num_columns; i++) {
476  // we process lazy columns (i.e., skip non-lazy columns)
477  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
478  }
479  }
480  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
481  conversion_threads.push_back(std::async(
482  std::launch::async,
483  [&do_work_just_lazy_columns, &targets_to_skip](const size_t start,
484  const size_t end) {
486  size_t local_idx = 0;
487  for (size_t i = start; i < end; ++i, ++local_idx) {
488  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
490  }
491  do_work_just_lazy_columns(i, targets_to_skip);
492  }
493  } else {
494  for (size_t i = start; i < end; ++i) {
495  do_work_just_lazy_columns(i, targets_to_skip);
496  }
497  }
498  },
499  interval.begin,
500  interval.end));
501  }
502 
503  try {
504  for (auto& child : conversion_threads) {
505  child.wait();
506  }
507  } catch (QueryExecutionError& e) {
510  }
511  throw e;
512  } catch (...) {
513  throw;
514  }
515  }
516 }
517 
525  const size_t num_columns) {
527  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
528  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
529 
530  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
531  const size_t entry_count = rows.entryCount();
532  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
533 
534  // step 1: compute total non-empty elements and store a bitmap per thread
535  std::vector<size_t> non_empty_per_thread(num_threads,
536  0); // number of non-empty entries per thread
537 
538  ColumnBitmap bitmap(size_per_thread, num_threads);
539 
541  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
542 
543  // step 2: go through the generated bitmap and copy/decode corresponding entries
544  // into the output buffer
546  bitmap,
547  non_empty_per_thread,
548  num_columns,
549  entry_count,
550  num_threads,
551  size_per_thread);
552 }
553 
559 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
560  ColumnBitmap& bitmap,
561  std::vector<size_t>& non_empty_per_thread,
562  const size_t entry_count,
563  const size_t num_threads,
564  const size_t size_per_thread) const {
566  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
567  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
568  CHECK_EQ(num_threads, non_empty_per_thread.size());
569  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
570  const size_t local_idx,
571  const size_t entry_idx,
572  const size_t thread_idx) {
573  if (!rows.isRowAtEmpty(entry_idx)) {
574  total_non_empty++;
575  bitmap.set(local_idx, thread_idx, true);
576  }
577  };
578  auto locate_and_count_func = [&do_work, &non_empty_per_thread](size_t start_index,
579  size_t end_index,
580  size_t thread_idx) {
581  size_t total_non_empty = 0;
582  size_t local_idx = 0;
584  for (size_t entry_idx = start_index; entry_idx < end_index;
585  entry_idx++, local_idx++) {
586  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
588  }
589  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
590  }
591  } else {
592  for (size_t entry_idx = start_index; entry_idx < end_index;
593  entry_idx++, local_idx++) {
594  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
595  }
596  }
597  non_empty_per_thread[thread_idx] = total_non_empty;
598  };
599 
600  std::vector<std::future<void>> conversion_threads;
601  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
602  const size_t start_entry = thread_idx * size_per_thread;
603  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
604  conversion_threads.push_back(std::async(
605  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
606  }
607 
608  try {
609  for (auto& child : conversion_threads) {
610  child.wait();
611  }
612  } catch (QueryExecutionError& e) {
615  }
616  throw e;
617  } catch (...) {
618  throw;
619  }
620 }
621 
632  const ResultSet& rows,
633  const ColumnBitmap& bitmap,
634  const std::vector<size_t>& non_empty_per_thread,
635  const size_t num_columns,
636  const size_t entry_count,
637  const size_t num_threads,
638  const size_t size_per_thread) {
640  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
641  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
642  CHECK_EQ(num_threads, non_empty_per_thread.size());
643 
644  // compute the exclusive scan over all non-empty totals
645  std::vector<size_t> global_offsets(num_threads + 1, 0);
646  std::partial_sum(non_empty_per_thread.begin(),
647  non_empty_per_thread.end(),
648  std::next(global_offsets.begin()));
649 
650  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
651  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
652  rows.getSupportedSingleSlotTargetBitmap();
653 
654  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
655  // differently and accessed through result set's iterator
656  if (num_single_slot_targets < num_columns) {
658  bitmap,
659  non_empty_per_thread,
660  global_offsets,
661  single_slot_targets_to_skip,
662  slot_idx_per_target_idx,
663  num_columns,
664  entry_count,
665  num_threads,
666  size_per_thread);
667  } else {
669  bitmap,
670  non_empty_per_thread,
671  global_offsets,
672  slot_idx_per_target_idx,
673  num_columns,
674  entry_count,
675  num_threads,
676  size_per_thread);
677  }
678 }
679 
687  const ResultSet& rows,
688  const ColumnBitmap& bitmap,
689  const std::vector<size_t>& non_empty_per_thread,
690  const std::vector<size_t>& global_offsets,
691  const std::vector<bool>& targets_to_skip,
692  const std::vector<size_t>& slot_idx_per_target_idx,
693  const size_t num_columns,
694  const size_t entry_count,
695  const size_t num_threads,
696  const size_t size_per_thread) {
698  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
699  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
700 
701  const auto [write_functions, read_functions] =
702  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
703  CHECK_EQ(write_functions.size(), num_columns);
704  CHECK_EQ(read_functions.size(), num_columns);
705  auto do_work = [this,
706  &bitmap,
707  &rows,
708  &slot_idx_per_target_idx,
709  &global_offsets,
710  &targets_to_skip,
711  &num_columns,
712  &write_functions = write_functions,
713  &read_functions = read_functions](size_t& non_empty_idx,
714  const size_t total_non_empty,
715  const size_t local_idx,
716  size_t& entry_idx,
717  const size_t thread_idx,
718  const size_t end_idx) {
719  if (non_empty_idx >= total_non_empty) {
720  // all non-empty entries has been written back
721  entry_idx = end_idx;
722  }
723  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
724  if (bitmap.get(local_idx, thread_idx)) {
725  // targets that are recovered from the result set iterators:
726  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
727  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
728  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
729  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
730  }
731  }
732  // targets that are copied directly without any translation/decoding from
733  // result set
734  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
735  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
736  continue;
737  }
738  write_functions[column_idx](rows,
739  entry_idx,
740  output_buffer_row_idx,
741  column_idx,
742  slot_idx_per_target_idx[column_idx],
743  read_functions[column_idx]);
744  }
745  non_empty_idx++;
746  }
747  };
748 
749  auto compact_buffer_func = [&non_empty_per_thread, &do_work](const size_t start_index,
750  const size_t end_index,
751  const size_t thread_idx) {
752  const size_t total_non_empty = non_empty_per_thread[thread_idx];
753  size_t non_empty_idx = 0;
754  size_t local_idx = 0;
756  for (size_t entry_idx = start_index; entry_idx < end_index;
757  entry_idx++, local_idx++) {
758  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
760  }
761  do_work(
762  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
763  }
764  } else {
765  for (size_t entry_idx = start_index; entry_idx < end_index;
766  entry_idx++, local_idx++) {
767  do_work(
768  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
769  }
770  }
771  };
772 
773  std::vector<std::future<void>> compaction_threads;
774  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
775  const size_t start_entry = thread_idx * size_per_thread;
776  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
777  compaction_threads.push_back(std::async(
778  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
779  }
780 
781  try {
782  for (auto& child : compaction_threads) {
783  child.wait();
784  }
785  } catch (QueryExecutionError& e) {
788  }
789  throw e;
790  } catch (...) {
791  throw;
792  }
793 }
794 
802  const ResultSet& rows,
803  const ColumnBitmap& bitmap,
804  const std::vector<size_t>& non_empty_per_thread,
805  const std::vector<size_t>& global_offsets,
806  const std::vector<size_t>& slot_idx_per_target_idx,
807  const size_t num_columns,
808  const size_t entry_count,
809  const size_t num_threads,
810  const size_t size_per_thread) {
812  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
813  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
814 
815  const auto [write_functions, read_functions] =
816  initAllConversionFunctions(rows, slot_idx_per_target_idx);
817  CHECK_EQ(write_functions.size(), num_columns);
818  CHECK_EQ(read_functions.size(), num_columns);
819  auto do_work = [&rows,
820  &bitmap,
821  &global_offsets,
822  &num_columns,
823  &slot_idx_per_target_idx,
824  &write_functions = write_functions,
825  &read_functions = read_functions](size_t& entry_idx,
826  size_t& non_empty_idx,
827  const size_t total_non_empty,
828  const size_t local_idx,
829  const size_t thread_idx,
830  const size_t end_idx) {
831  if (non_empty_idx >= total_non_empty) {
832  // all non-empty entries has been written back
833  entry_idx = end_idx;
834  return;
835  }
836  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
837  if (bitmap.get(local_idx, thread_idx)) {
838  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
839  write_functions[column_idx](rows,
840  entry_idx,
841  output_buffer_row_idx,
842  column_idx,
843  slot_idx_per_target_idx[column_idx],
844  read_functions[column_idx]);
845  }
846  non_empty_idx++;
847  }
848  };
849  auto compact_buffer_func = [&non_empty_per_thread, &do_work](const size_t start_index,
850  const size_t end_index,
851  const size_t thread_idx) {
852  const size_t total_non_empty = non_empty_per_thread[thread_idx];
853  size_t non_empty_idx = 0;
854  size_t local_idx = 0;
856  for (size_t entry_idx = start_index; entry_idx < end_index;
857  entry_idx++, local_idx++) {
858  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
860  }
861  do_work(
862  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
863  }
864  } else {
865  for (size_t entry_idx = start_index; entry_idx < end_index;
866  entry_idx++, local_idx++) {
867  do_work(
868  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
869  }
870  }
871  };
872 
873  std::vector<std::future<void>> compaction_threads;
874  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
875  const size_t start_entry = thread_idx * size_per_thread;
876  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
877  compaction_threads.push_back(std::async(
878  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
879  }
880 
881  try {
882  for (auto& child : compaction_threads) {
883  child.wait();
884  }
885  } catch (QueryExecutionError& e) {
888  }
889  throw e;
890  } catch (...) {
891  throw;
892  }
893 }
894 
900 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
901  const ResultSet& rows,
902  const std::vector<bool>& targets_to_skip) {
904  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
905  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
906 
907  std::vector<WriteFunction> result;
908  result.reserve(target_types_.size());
909 
910  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
911  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
912  result.emplace_back([](const ResultSet& rows,
913  const size_t input_buffer_entry_idx,
914  const size_t output_buffer_entry_idx,
915  const size_t target_idx,
916  const size_t slot_idx,
917  const ReadFunction& read_function) {
918  UNREACHABLE() << "Invalid write back function used.";
919  });
920  continue;
921  }
922 
923  if (target_types_[target_idx].is_fp()) {
924  switch (target_types_[target_idx].get_size()) {
925  case 8:
926  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
927  this,
928  std::placeholders::_1,
929  std::placeholders::_2,
930  std::placeholders::_3,
931  std::placeholders::_4,
932  std::placeholders::_5,
933  std::placeholders::_6));
934  break;
935  case 4:
936  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
937  this,
938  std::placeholders::_1,
939  std::placeholders::_2,
940  std::placeholders::_3,
941  std::placeholders::_4,
942  std::placeholders::_5,
943  std::placeholders::_6));
944  break;
945  default:
946  UNREACHABLE() << "Invalid target type encountered.";
947  break;
948  }
949  } else {
950  switch (target_types_[target_idx].get_size()) {
951  case 8:
952  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
953  this,
954  std::placeholders::_1,
955  std::placeholders::_2,
956  std::placeholders::_3,
957  std::placeholders::_4,
958  std::placeholders::_5,
959  std::placeholders::_6));
960  break;
961  case 4:
962  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
963  this,
964  std::placeholders::_1,
965  std::placeholders::_2,
966  std::placeholders::_3,
967  std::placeholders::_4,
968  std::placeholders::_5,
969  std::placeholders::_6));
970  break;
971  case 2:
972  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
973  this,
974  std::placeholders::_1,
975  std::placeholders::_2,
976  std::placeholders::_3,
977  std::placeholders::_4,
978  std::placeholders::_5,
979  std::placeholders::_6));
980  break;
981  case 1:
982  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
983  this,
984  std::placeholders::_1,
985  std::placeholders::_2,
986  std::placeholders::_3,
987  std::placeholders::_4,
988  std::placeholders::_5,
989  std::placeholders::_6));
990  break;
991  default:
992  UNREACHABLE() << "Invalid target type encountered.";
993  break;
994  }
995  }
996  }
997  return result;
998 }
999 
1000 namespace {
1001 
1002 int64_t invalid_read_func(const ResultSet& rows,
1003  const size_t input_buffer_entry_idx,
1004  const size_t target_idx,
1005  const size_t slot_idx) {
1006  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1007  return static_cast<int64_t>(0);
1008 }
1009 
1010 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1011 int64_t read_float_key_baseline(const ResultSet& rows,
1012  const size_t input_buffer_entry_idx,
1013  const size_t target_idx,
1014  const size_t slot_idx) {
1015  // float keys in baseline hash are written as doubles in the buffer, so
1016  // the result should properly be casted before being written in the output
1017  // columns
1018  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1019  input_buffer_entry_idx, target_idx, slot_idx));
1020  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1021 }
1022 
1023 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1024 int64_t read_int64_func(const ResultSet& rows,
1025  const size_t input_buffer_entry_idx,
1026  const size_t target_idx,
1027  const size_t slot_idx) {
1028  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1029  input_buffer_entry_idx, target_idx, slot_idx);
1030 }
1031 
1032 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1033 int64_t read_int32_func(const ResultSet& rows,
1034  const size_t input_buffer_entry_idx,
1035  const size_t target_idx,
1036  const size_t slot_idx) {
1037  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1038  input_buffer_entry_idx, target_idx, slot_idx);
1039 }
1040 
1041 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1042 int64_t read_int16_func(const ResultSet& rows,
1043  const size_t input_buffer_entry_idx,
1044  const size_t target_idx,
1045  const size_t slot_idx) {
1046  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1047  input_buffer_entry_idx, target_idx, slot_idx);
1048 }
1049 
1050 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1051 int64_t read_int8_func(const ResultSet& rows,
1052  const size_t input_buffer_entry_idx,
1053  const size_t target_idx,
1054  const size_t slot_idx) {
1055  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1056  input_buffer_entry_idx, target_idx, slot_idx);
1057 }
1058 
1059 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1060 int64_t read_float_func(const ResultSet& rows,
1061  const size_t input_buffer_entry_idx,
1062  const size_t target_idx,
1063  const size_t slot_idx) {
1064  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1065  input_buffer_entry_idx, target_idx, slot_idx);
1066  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1067 }
1068 
1069 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1070 int64_t read_double_func(const ResultSet& rows,
1071  const size_t input_buffer_entry_idx,
1072  const size_t target_idx,
1073  const size_t slot_idx) {
1074  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1075  input_buffer_entry_idx, target_idx, slot_idx);
1076  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1077 }
1078 
1079 } // namespace
1080 
1087 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1088 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1089  const ResultSet& rows,
1090  const std::vector<size_t>& slot_idx_per_target_idx,
1091  const std::vector<bool>& targets_to_skip) {
1093  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1094  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1095 
1096  std::vector<ReadFunction> read_functions;
1097  read_functions.reserve(target_types_.size());
1098 
1099  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1100  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1101  // for targets that should be skipped, we use a placeholder function that should
1102  // never be called. The CHECKs inside it make sure that never happens.
1103  read_functions.emplace_back(invalid_read_func);
1104  continue;
1105  }
1106 
1107  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1108  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1109  // for key columns only
1110  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1111  if (target_types_[target_idx].is_fp()) {
1112  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1113  switch (target_types_[target_idx].get_type()) {
1114  case kFLOAT:
1115  read_functions.emplace_back(
1116  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1117  break;
1118  case kDOUBLE:
1119  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1120  break;
1121  default:
1122  UNREACHABLE()
1123  << "Invalid data type encountered (BaselineHash, floating point key).";
1124  break;
1125  }
1126  } else {
1127  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1128  case 8:
1129  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1130  break;
1131  case 4:
1132  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1133  break;
1134  default:
1135  UNREACHABLE()
1136  << "Invalid data type encountered (BaselineHash, integer key).";
1137  }
1138  }
1139  continue;
1140  }
1141  }
1142  if (target_types_[target_idx].is_fp()) {
1143  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1144  case 8:
1145  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1146  break;
1147  case 4:
1148  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1149  break;
1150  default:
1151  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1152  break;
1153  }
1154  } else {
1155  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1156  case 8:
1157  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1158  break;
1159  case 4:
1160  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1161  break;
1162  case 2:
1163  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1164  break;
1165  case 1:
1166  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1167  break;
1168  default:
1169  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1170  break;
1171  }
1172  }
1173  }
1174  return read_functions;
1175 }
1176 
1184 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1185  std::vector<ColumnarResults::ReadFunction>>
1187  const ResultSet& rows,
1188  const std::vector<size_t>& slot_idx_per_target_idx,
1189  const std::vector<bool>& targets_to_skip) {
1191  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1192  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1193 
1194  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1195  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1196  if (rows.didOutputColumnar()) {
1197  return std::make_tuple(
1198  std::move(write_functions),
1199  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1200  rows, slot_idx_per_target_idx, targets_to_skip));
1201  } else {
1202  return std::make_tuple(
1203  std::move(write_functions),
1204  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1205  rows, slot_idx_per_target_idx, targets_to_skip));
1206  }
1207  } else {
1208  if (rows.didOutputColumnar()) {
1209  return std::make_tuple(
1210  std::move(write_functions),
1211  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1212  rows, slot_idx_per_target_idx, targets_to_skip));
1213  } else {
1214  return std::make_tuple(
1215  std::move(write_functions),
1216  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1217  rows, slot_idx_per_target_idx, targets_to_skip));
1218  }
1219  }
1220 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int8_t * > column_buffers_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1120
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:247
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:911
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:115
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1122
__device__ bool check_interrupt()
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:24
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)