OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Shared/Intervals.h"
22 #include "Shared/likely.h"
23 #include "Shared/thread_count.h"
24 
25 #include <atomic>
26 #include <future>
27 #include <numeric>
28 
29 namespace {
30 
31 inline int64_t fixed_encoding_nullable_val(const int64_t val,
32  const SQLTypeInfo& type_info) {
33  if (type_info.get_compression() != kENCODING_NONE) {
34  CHECK(type_info.get_compression() == kENCODING_FIXED ||
35  type_info.get_compression() == kENCODING_DICT);
36  auto logical_ti = get_logical_type_info(type_info);
37  if (val == inline_int_null_val(logical_ti)) {
38  return inline_fixed_encoding_null_val(type_info);
39  }
40  }
41  return val;
42 }
43 
44 } // namespace
45 
46 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
47  const ResultSet& rows,
48  const size_t num_columns,
49  const std::vector<SQLTypeInfo>& target_types,
50  const size_t executor_id,
51  const size_t thread_idx,
52  const bool is_parallel_execution_enforced)
53  : column_buffers_(num_columns)
54  , num_rows_(result_set::use_parallel_algorithms(rows) ||
55  rows.isDirectColumnarConversionPossible()
56  ? rows.entryCount()
57  : rows.rowCount())
58  , target_types_(target_types)
59  , parallel_conversion_(is_parallel_execution_enforced
60  ? true
61  : result_set::use_parallel_algorithms(rows))
62  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
63  , thread_idx_(thread_idx) {
64  auto timer = DEBUG_TIMER(__func__);
65  column_buffers_.resize(num_columns);
66  executor_ = Executor::getExecutor(executor_id);
68  for (size_t i = 0; i < num_columns; ++i) {
69  const bool is_varlen = target_types[i].is_array() ||
70  (target_types[i].is_string() &&
71  target_types[i].get_compression() == kENCODING_NONE) ||
72  target_types[i].is_geometry();
73  if (is_varlen) {
75  }
77  !rows.isZeroCopyColumnarConversionPossible(i)) {
78  column_buffers_[i] = row_set_mem_owner->allocate(
79  num_rows_ * target_types[i].get_size(), thread_idx_);
80  }
81  }
82 
83  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
84  materializeAllColumnsDirectly(rows, num_columns);
85  } else {
86  materializeAllColumnsThroughIteration(rows, num_columns);
87  }
88 }
89 
90 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
91  const int8_t* one_col_buffer,
92  const size_t num_rows,
93  const SQLTypeInfo& target_type,
94  const size_t executor_id,
95  const size_t thread_idx)
96  : column_buffers_(1)
97  , num_rows_(num_rows)
98  , target_types_{target_type}
99  , parallel_conversion_(false)
100  , direct_columnar_conversion_(false)
101  , thread_idx_(thread_idx) {
102  auto timer = DEBUG_TIMER(__func__);
103  const bool is_varlen =
104  target_type.is_array() ||
105  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
106  target_type.is_geometry();
107  if (is_varlen) {
109  }
110  executor_ = Executor::getExecutor(executor_id);
111  CHECK(executor_);
112  const auto buf_size = num_rows * target_type.get_size();
113  column_buffers_[0] =
114  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
115  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
116 }
117 
118 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
119  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
120  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
121  if (sub_results.empty()) {
122  return nullptr;
123  }
124  const auto total_row_count = std::accumulate(
125  sub_results.begin(),
126  sub_results.end(),
127  size_t(0),
128  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
129  return init + result->size();
130  });
131  std::unique_ptr<ColumnarResults> merged_results(
132  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
133  const auto col_count = sub_results[0]->column_buffers_.size();
134  const auto nonempty_it = std::find_if(
135  sub_results.begin(),
136  sub_results.end(),
137  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
138  if (nonempty_it == sub_results.end()) {
139  return nullptr;
140  }
141  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
142  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
143  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
144  merged_results->column_buffers_.push_back(write_ptr);
145  for (auto& rs : sub_results) {
146  CHECK_EQ(col_count, rs->column_buffers_.size());
147  if (!rs->size()) {
148  continue;
149  }
150  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
151  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
152  write_ptr += rs->size() * byte_width;
153  }
154  }
155  return merged_results;
156 }
157 
163  const size_t num_columns) {
164  std::atomic<size_t> row_idx{0};
165  if (isParallelConversion()) {
166  const size_t worker_count = cpu_threads();
167  std::vector<std::future<void>> conversion_threads;
168  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
169  const auto crt_row = rows.getRowAtNoTranslations(i);
170  if (!crt_row.empty()) {
171  auto cur_row_idx = row_idx.fetch_add(1);
172  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
173  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
174  }
175  }
176  };
177  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
178  conversion_threads.push_back(std::async(
180  [&do_work, this](const size_t start, const size_t end) {
182  size_t local_idx = 0;
183  for (size_t i = start; i < end; ++i, ++local_idx) {
184  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
185  executor_->checkNonKernelTimeInterrupted())) {
187  }
188  do_work(i);
189  }
190  } else {
191  for (size_t i = start; i < end; ++i) {
192  do_work(i);
193  }
194  }
195  },
196  interval.begin,
197  interval.end));
198  }
199 
200  try {
201  for (auto& child : conversion_threads) {
202  child.wait();
203  }
204  } catch (QueryExecutionError& e) {
207  }
208  throw e;
209  } catch (...) {
210  throw;
211  }
212 
213  num_rows_ = row_idx;
214  rows.setCachedRowCount(num_rows_);
215  return;
216  }
217  bool done = false;
218  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
219  const auto crt_row = rows.getNextRow(false, false);
220  if (crt_row.empty()) {
221  done = true;
222  return;
223  }
224  for (size_t i = 0; i < num_columns; ++i) {
225  writeBackCell(crt_row[i], row_idx, i);
226  }
227  ++row_idx;
228  };
230  while (!done) {
231  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
232  executor_->checkNonKernelTimeInterrupted())) {
234  }
235  do_work();
236  }
237  } else {
238  while (!done) {
239  do_work();
240  }
241  }
242 
243  rows.moveToBegin();
244 }
245 
246 /*
247  * This function processes and decodes its input TargetValue
248  * and write it into its corresponding column buffer's cell (with corresponding
249  * row and column indices)
250  *
251  * NOTE: this is not supposed to be processing varlen types, and they should be
252  * handled differently outside this function.
253  */
254 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
255  const size_t row_idx,
256  const size_t column_idx) {
257  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
258  CHECK(scalar_col_val);
259  auto i64_p = boost::get<int64_t>(scalar_col_val);
260  const auto& type_info = target_types_[column_idx];
261  if (i64_p) {
262  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
263  switch (target_types_[column_idx].get_size()) {
264  case 1:
265  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
266  break;
267  case 2:
268  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
269  break;
270  case 4:
271  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
272  break;
273  case 8:
274  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
275  break;
276  default:
277  CHECK(false);
278  }
279  } else {
280  CHECK(target_types_[column_idx].is_fp());
281  switch (target_types_[column_idx].get_type()) {
282  case kFLOAT: {
283  auto float_p = boost::get<float>(scalar_col_val);
284  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
285  break;
286  }
287  case kDOUBLE: {
288  auto double_p = boost::get<double>(scalar_col_val);
289  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
290  break;
291  }
292  default:
293  CHECK(false);
294  }
295  }
296 }
297 
303 template <typename DATA_TYPE>
304 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
305  const size_t input_buffer_entry_idx,
306  const size_t output_buffer_entry_idx,
307  const size_t target_idx,
308  const size_t slot_idx,
309  const ReadFunction& read_from_function) {
310  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
311  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
312  target_types_[target_idx]));
313  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
314  val;
315 }
316 
317 template <>
318 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
319  const size_t input_buffer_entry_idx,
320  const size_t output_buffer_entry_idx,
321  const size_t target_idx,
322  const size_t slot_idx,
323  const ReadFunction& read_from_function) {
324  const int32_t ival =
325  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
326  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
327  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
328 }
329 
330 template <>
331 void ColumnarResults::writeBackCellDirect<double>(
332  const ResultSet& rows,
333  const size_t input_buffer_entry_idx,
334  const size_t output_buffer_entry_idx,
335  const size_t target_idx,
336  const size_t slot_idx,
337  const ReadFunction& read_from_function) {
338  const int64_t ival =
339  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
340  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
341  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
342 }
343 
354  const size_t num_columns) {
356  switch (rows.getQueryDescriptionType()) {
358  materializeAllColumnsProjection(rows, num_columns);
359  break;
360  }
363  materializeAllColumnsGroupBy(rows, num_columns);
364  break;
365  }
366  default:
367  UNREACHABLE()
368  << "Direct columnar conversion for this query type is not supported yet.";
369  }
370 }
371 
380  const size_t num_columns) {
381  CHECK(rows.query_mem_desc_.didOutputColumnar());
383  rows.query_mem_desc_.getQueryDescriptionType() ==
385 
386  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
387 
388  // We can directly copy each non-lazy column's content
389  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
390 
391  // Only lazy columns are iterated through first and then materialized
392  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
393 }
394 
395 /*
396  * For all non-lazy columns, we can directly copy back the results of each column's
397  * contents from different storages and put them into the corresponding output buffer.
398  *
399  * This function is parallelized through assigning each column to a CPU thread.
400  */
402  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
403  const ResultSet& rows,
404  const size_t num_columns) {
406  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
407  // Saman: make sure when this lazy_fetch_info is empty
408  if (lazy_fetch_info.empty()) {
409  return true;
410  } else {
411  return !lazy_fetch_info[col_idx].is_lazily_fetched;
412  }
413  };
414 
415  // parallelized by assigning each column to a thread
416  std::vector<std::future<void>> direct_copy_threads;
417  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
418  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
419  CHECK(!column_buffers_[col_idx]);
420  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
421  } else if (is_column_non_lazily_fetched(col_idx)) {
422  direct_copy_threads.push_back(std::async(
424  [&rows, this](const size_t column_index) {
425  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
426  rows.copyColumnIntoBuffer(
427  column_index, column_buffers_[column_index], column_size);
428  },
429  col_idx));
430  }
431  }
432 
433  for (auto& child : direct_copy_threads) {
434  child.wait();
435  }
436 }
437 
448  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
449  const ResultSet& rows,
450  const size_t num_columns) {
452  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
453  const size_t row_idx,
454  const std::vector<bool>& targets_to_skip) {
455  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
456  for (size_t i = 0; i < num_columns; ++i) {
457  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
458  writeBackCell(crt_row[i], row_idx, i);
459  }
460  }
461  };
462 
463  const auto contains_lazy_fetched_column =
464  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
465  for (auto& col_info : lazy_fetch_info) {
466  if (col_info.is_lazily_fetched) {
467  return true;
468  }
469  }
470  return false;
471  };
472 
473  // parallelized by assigning a chunk of rows to each thread)
474  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
475  if (contains_lazy_fetched_column(lazy_fetch_info)) {
476  const size_t worker_count =
478  std::vector<std::future<void>> conversion_threads;
479  std::vector<bool> targets_to_skip;
480  if (skip_non_lazy_columns) {
481  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
482  targets_to_skip.reserve(num_columns);
483  for (size_t i = 0; i < num_columns; i++) {
484  // we process lazy columns (i.e., skip non-lazy columns)
485  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
486  }
487  }
488  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
489  conversion_threads.push_back(std::async(
491  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
492  const size_t end) {
494  size_t local_idx = 0;
495  for (size_t i = start; i < end; ++i, ++local_idx) {
496  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
497  executor_->checkNonKernelTimeInterrupted())) {
499  }
500  do_work_just_lazy_columns(i, targets_to_skip);
501  }
502  } else {
503  for (size_t i = start; i < end; ++i) {
504  do_work_just_lazy_columns(i, targets_to_skip);
505  }
506  }
507  },
508  interval.begin,
509  interval.end));
510  }
511 
512  try {
513  for (auto& child : conversion_threads) {
514  child.wait();
515  }
516  } catch (QueryExecutionError& e) {
519  }
520  throw e;
521  } catch (...) {
522  throw;
523  }
524  }
525 }
526 
534  const size_t num_columns) {
536  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
537  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
538 
539  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
540  const size_t entry_count = rows.entryCount();
541  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
542 
543  // step 1: compute total non-empty elements and store a bitmap per thread
544  std::vector<size_t> non_empty_per_thread(num_threads,
545  0); // number of non-empty entries per thread
546 
547  ColumnBitmap bitmap(size_per_thread, num_threads);
548 
550  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
551 
552  // step 2: go through the generated bitmap and copy/decode corresponding entries
553  // into the output buffer
555  bitmap,
556  non_empty_per_thread,
557  num_columns,
558  entry_count,
559  num_threads,
560  size_per_thread);
561 }
562 
568 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
569  ColumnBitmap& bitmap,
570  std::vector<size_t>& non_empty_per_thread,
571  const size_t entry_count,
572  const size_t num_threads,
573  const size_t size_per_thread) const {
575  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
576  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
577  CHECK_EQ(num_threads, non_empty_per_thread.size());
578  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
579  const size_t local_idx,
580  const size_t entry_idx,
581  const size_t thread_idx) {
582  if (!rows.isRowAtEmpty(entry_idx)) {
583  total_non_empty++;
584  bitmap.set(local_idx, thread_idx, true);
585  }
586  };
587  auto locate_and_count_func =
588  [&do_work, &non_empty_per_thread, this](
589  size_t start_index, size_t end_index, size_t thread_idx) {
590  size_t total_non_empty = 0;
591  size_t local_idx = 0;
593  for (size_t entry_idx = start_index; entry_idx < end_index;
594  entry_idx++, local_idx++) {
595  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
596  executor_->checkNonKernelTimeInterrupted())) {
598  }
599  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
600  }
601  } else {
602  for (size_t entry_idx = start_index; entry_idx < end_index;
603  entry_idx++, local_idx++) {
604  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
605  }
606  }
607  non_empty_per_thread[thread_idx] = total_non_empty;
608  };
609 
610  std::vector<std::future<void>> conversion_threads;
611  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
612  const size_t start_entry = thread_idx * size_per_thread;
613  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
614  conversion_threads.push_back(std::async(
615  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
616  }
617 
618  try {
619  for (auto& child : conversion_threads) {
620  child.wait();
621  }
622  } catch (QueryExecutionError& e) {
625  }
626  throw e;
627  } catch (...) {
628  throw;
629  }
630 }
631 
642  const ResultSet& rows,
643  const ColumnBitmap& bitmap,
644  const std::vector<size_t>& non_empty_per_thread,
645  const size_t num_columns,
646  const size_t entry_count,
647  const size_t num_threads,
648  const size_t size_per_thread) {
650  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
651  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
652  CHECK_EQ(num_threads, non_empty_per_thread.size());
653 
654  // compute the exclusive scan over all non-empty totals
655  std::vector<size_t> global_offsets(num_threads + 1, 0);
656  std::partial_sum(non_empty_per_thread.begin(),
657  non_empty_per_thread.end(),
658  std::next(global_offsets.begin()));
659 
660  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
661  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
662  rows.getSupportedSingleSlotTargetBitmap();
663 
664  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
665  // differently and accessed through result set's iterator
666  if (num_single_slot_targets < num_columns) {
668  bitmap,
669  non_empty_per_thread,
670  global_offsets,
671  single_slot_targets_to_skip,
672  slot_idx_per_target_idx,
673  num_columns,
674  entry_count,
675  num_threads,
676  size_per_thread);
677  } else {
679  bitmap,
680  non_empty_per_thread,
681  global_offsets,
682  slot_idx_per_target_idx,
683  num_columns,
684  entry_count,
685  num_threads,
686  size_per_thread);
687  }
688 }
689 
697  const ResultSet& rows,
698  const ColumnBitmap& bitmap,
699  const std::vector<size_t>& non_empty_per_thread,
700  const std::vector<size_t>& global_offsets,
701  const std::vector<bool>& targets_to_skip,
702  const std::vector<size_t>& slot_idx_per_target_idx,
703  const size_t num_columns,
704  const size_t entry_count,
705  const size_t num_threads,
706  const size_t size_per_thread) {
708  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
709  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
710 
711  const auto [write_functions, read_functions] =
712  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
713  CHECK_EQ(write_functions.size(), num_columns);
714  CHECK_EQ(read_functions.size(), num_columns);
715  auto do_work = [this,
716  &bitmap,
717  &rows,
718  &slot_idx_per_target_idx,
719  &global_offsets,
720  &targets_to_skip,
721  &num_columns,
722  &write_functions = write_functions,
723  &read_functions = read_functions](size_t& non_empty_idx,
724  const size_t total_non_empty,
725  const size_t local_idx,
726  size_t& entry_idx,
727  const size_t thread_idx,
728  const size_t end_idx) {
729  if (non_empty_idx >= total_non_empty) {
730  // all non-empty entries has been written back
731  entry_idx = end_idx;
732  }
733  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
734  if (bitmap.get(local_idx, thread_idx)) {
735  // targets that are recovered from the result set iterators:
736  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
737  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
738  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
739  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
740  }
741  }
742  // targets that are copied directly without any translation/decoding from
743  // result set
744  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
745  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
746  continue;
747  }
748  write_functions[column_idx](rows,
749  entry_idx,
750  output_buffer_row_idx,
751  column_idx,
752  slot_idx_per_target_idx[column_idx],
753  read_functions[column_idx]);
754  }
755  non_empty_idx++;
756  }
757  };
758 
759  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
760  const size_t start_index,
761  const size_t end_index,
762  const size_t thread_idx) {
763  const size_t total_non_empty = non_empty_per_thread[thread_idx];
764  size_t non_empty_idx = 0;
765  size_t local_idx = 0;
767  for (size_t entry_idx = start_index; entry_idx < end_index;
768  entry_idx++, local_idx++) {
769  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
770  executor_->checkNonKernelTimeInterrupted())) {
772  }
773  do_work(
774  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
775  }
776  } else {
777  for (size_t entry_idx = start_index; entry_idx < end_index;
778  entry_idx++, local_idx++) {
779  do_work(
780  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
781  }
782  }
783  };
784 
785  std::vector<std::future<void>> compaction_threads;
786  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
787  const size_t start_entry = thread_idx * size_per_thread;
788  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
789  compaction_threads.push_back(std::async(
790  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
791  }
792 
793  try {
794  for (auto& child : compaction_threads) {
795  child.wait();
796  }
797  } catch (QueryExecutionError& e) {
800  }
801  throw e;
802  } catch (...) {
803  throw;
804  }
805 }
806 
814  const ResultSet& rows,
815  const ColumnBitmap& bitmap,
816  const std::vector<size_t>& non_empty_per_thread,
817  const std::vector<size_t>& global_offsets,
818  const std::vector<size_t>& slot_idx_per_target_idx,
819  const size_t num_columns,
820  const size_t entry_count,
821  const size_t num_threads,
822  const size_t size_per_thread) {
824  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
825  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
826 
827  const auto [write_functions, read_functions] =
828  initAllConversionFunctions(rows, slot_idx_per_target_idx);
829  CHECK_EQ(write_functions.size(), num_columns);
830  CHECK_EQ(read_functions.size(), num_columns);
831  auto do_work = [&rows,
832  &bitmap,
833  &global_offsets,
834  &num_columns,
835  &slot_idx_per_target_idx,
836  &write_functions = write_functions,
837  &read_functions = read_functions](size_t& entry_idx,
838  size_t& non_empty_idx,
839  const size_t total_non_empty,
840  const size_t local_idx,
841  const size_t thread_idx,
842  const size_t end_idx) {
843  if (non_empty_idx >= total_non_empty) {
844  // all non-empty entries has been written back
845  entry_idx = end_idx;
846  return;
847  }
848  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
849  if (bitmap.get(local_idx, thread_idx)) {
850  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
851  write_functions[column_idx](rows,
852  entry_idx,
853  output_buffer_row_idx,
854  column_idx,
855  slot_idx_per_target_idx[column_idx],
856  read_functions[column_idx]);
857  }
858  non_empty_idx++;
859  }
860  };
861  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
862  const size_t start_index,
863  const size_t end_index,
864  const size_t thread_idx) {
865  const size_t total_non_empty = non_empty_per_thread[thread_idx];
866  size_t non_empty_idx = 0;
867  size_t local_idx = 0;
869  for (size_t entry_idx = start_index; entry_idx < end_index;
870  entry_idx++, local_idx++) {
871  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
872  executor_->checkNonKernelTimeInterrupted())) {
874  }
875  do_work(
876  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
877  }
878  } else {
879  for (size_t entry_idx = start_index; entry_idx < end_index;
880  entry_idx++, local_idx++) {
881  do_work(
882  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
883  }
884  }
885  };
886 
887  std::vector<std::future<void>> compaction_threads;
888  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
889  const size_t start_entry = thread_idx * size_per_thread;
890  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
891  compaction_threads.push_back(std::async(
892  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
893  }
894 
895  try {
896  for (auto& child : compaction_threads) {
897  child.wait();
898  }
899  } catch (QueryExecutionError& e) {
902  }
903  throw e;
904  } catch (...) {
905  throw;
906  }
907 }
908 
914 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
915  const ResultSet& rows,
916  const std::vector<bool>& targets_to_skip) {
918  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
919  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
920 
921  std::vector<WriteFunction> result;
922  result.reserve(target_types_.size());
923 
924  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
925  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
926  result.emplace_back([](const ResultSet& rows,
927  const size_t input_buffer_entry_idx,
928  const size_t output_buffer_entry_idx,
929  const size_t target_idx,
930  const size_t slot_idx,
931  const ReadFunction& read_function) {
932  UNREACHABLE() << "Invalid write back function used.";
933  });
934  continue;
935  }
936 
937  if (target_types_[target_idx].is_fp()) {
938  switch (target_types_[target_idx].get_size()) {
939  case 8:
940  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
941  this,
942  std::placeholders::_1,
943  std::placeholders::_2,
944  std::placeholders::_3,
945  std::placeholders::_4,
946  std::placeholders::_5,
947  std::placeholders::_6));
948  break;
949  case 4:
950  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
951  this,
952  std::placeholders::_1,
953  std::placeholders::_2,
954  std::placeholders::_3,
955  std::placeholders::_4,
956  std::placeholders::_5,
957  std::placeholders::_6));
958  break;
959  default:
960  UNREACHABLE() << "Invalid target type encountered.";
961  break;
962  }
963  } else {
964  switch (target_types_[target_idx].get_size()) {
965  case 8:
966  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
967  this,
968  std::placeholders::_1,
969  std::placeholders::_2,
970  std::placeholders::_3,
971  std::placeholders::_4,
972  std::placeholders::_5,
973  std::placeholders::_6));
974  break;
975  case 4:
976  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
977  this,
978  std::placeholders::_1,
979  std::placeholders::_2,
980  std::placeholders::_3,
981  std::placeholders::_4,
982  std::placeholders::_5,
983  std::placeholders::_6));
984  break;
985  case 2:
986  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
987  this,
988  std::placeholders::_1,
989  std::placeholders::_2,
990  std::placeholders::_3,
991  std::placeholders::_4,
992  std::placeholders::_5,
993  std::placeholders::_6));
994  break;
995  case 1:
996  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
997  this,
998  std::placeholders::_1,
999  std::placeholders::_2,
1000  std::placeholders::_3,
1001  std::placeholders::_4,
1002  std::placeholders::_5,
1003  std::placeholders::_6));
1004  break;
1005  default:
1006  UNREACHABLE() << "Invalid target type encountered.";
1007  break;
1008  }
1009  }
1010  }
1011  return result;
1012 }
1013 
1014 namespace {
1015 
1016 int64_t invalid_read_func(const ResultSet& rows,
1017  const size_t input_buffer_entry_idx,
1018  const size_t target_idx,
1019  const size_t slot_idx) {
1020  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1021  return static_cast<int64_t>(0);
1022 }
1023 
1024 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1025 int64_t read_float_key_baseline(const ResultSet& rows,
1026  const size_t input_buffer_entry_idx,
1027  const size_t target_idx,
1028  const size_t slot_idx) {
1029  // float keys in baseline hash are written as doubles in the buffer, so
1030  // the result should properly be casted before being written in the output
1031  // columns
1032  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1033  input_buffer_entry_idx, target_idx, slot_idx));
1034  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1035 }
1036 
1037 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1038 int64_t read_int64_func(const ResultSet& rows,
1039  const size_t input_buffer_entry_idx,
1040  const size_t target_idx,
1041  const size_t slot_idx) {
1042  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1043  input_buffer_entry_idx, target_idx, slot_idx);
1044 }
1045 
1046 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1047 int64_t read_int32_func(const ResultSet& rows,
1048  const size_t input_buffer_entry_idx,
1049  const size_t target_idx,
1050  const size_t slot_idx) {
1051  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1052  input_buffer_entry_idx, target_idx, slot_idx);
1053 }
1054 
1055 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1056 int64_t read_int16_func(const ResultSet& rows,
1057  const size_t input_buffer_entry_idx,
1058  const size_t target_idx,
1059  const size_t slot_idx) {
1060  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1061  input_buffer_entry_idx, target_idx, slot_idx);
1062 }
1063 
1064 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1065 int64_t read_int8_func(const ResultSet& rows,
1066  const size_t input_buffer_entry_idx,
1067  const size_t target_idx,
1068  const size_t slot_idx) {
1069  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1070  input_buffer_entry_idx, target_idx, slot_idx);
1071 }
1072 
1073 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1074 int64_t read_float_func(const ResultSet& rows,
1075  const size_t input_buffer_entry_idx,
1076  const size_t target_idx,
1077  const size_t slot_idx) {
1078  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1079  input_buffer_entry_idx, target_idx, slot_idx);
1080  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1081 }
1082 
1083 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1084 int64_t read_double_func(const ResultSet& rows,
1085  const size_t input_buffer_entry_idx,
1086  const size_t target_idx,
1087  const size_t slot_idx) {
1088  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1089  input_buffer_entry_idx, target_idx, slot_idx);
1090  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1091 }
1092 
1093 } // namespace
1094 
1101 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1102 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1103  const ResultSet& rows,
1104  const std::vector<size_t>& slot_idx_per_target_idx,
1105  const std::vector<bool>& targets_to_skip) {
1107  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1108  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1109 
1110  std::vector<ReadFunction> read_functions;
1111  read_functions.reserve(target_types_.size());
1112 
1113  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1114  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1115  // for targets that should be skipped, we use a placeholder function that should
1116  // never be called. The CHECKs inside it make sure that never happens.
1117  read_functions.emplace_back(invalid_read_func);
1118  continue;
1119  }
1120 
1121  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1122  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1123  // for key columns only
1124  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1125  if (target_types_[target_idx].is_fp()) {
1126  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1127  switch (target_types_[target_idx].get_type()) {
1128  case kFLOAT:
1129  read_functions.emplace_back(
1130  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1131  break;
1132  case kDOUBLE:
1133  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1134  break;
1135  default:
1136  UNREACHABLE()
1137  << "Invalid data type encountered (BaselineHash, floating point key).";
1138  break;
1139  }
1140  } else {
1141  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1142  case 8:
1143  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1144  break;
1145  case 4:
1146  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1147  break;
1148  default:
1149  UNREACHABLE()
1150  << "Invalid data type encountered (BaselineHash, integer key).";
1151  }
1152  }
1153  continue;
1154  }
1155  }
1156  if (target_types_[target_idx].is_fp()) {
1157  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1158  case 8:
1159  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1160  break;
1161  case 4:
1162  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1163  break;
1164  default:
1165  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1166  break;
1167  }
1168  } else {
1169  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1170  case 8:
1171  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1172  break;
1173  case 4:
1174  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1175  break;
1176  case 2:
1177  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1178  break;
1179  case 1:
1180  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1181  break;
1182  default:
1183  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1184  break;
1185  }
1186  }
1187  }
1188  return read_functions;
1189 }
1190 
1198 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1199  std::vector<ColumnarResults::ReadFunction>>
1201  const ResultSet& rows,
1202  const std::vector<size_t>& slot_idx_per_target_idx,
1203  const std::vector<bool>& targets_to_skip) {
1205  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1206  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1207 
1208  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1209  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1210  if (rows.didOutputColumnar()) {
1211  return std::make_tuple(
1212  std::move(write_functions),
1213  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1214  rows, slot_idx_per_target_idx, targets_to_skip));
1215  } else {
1216  return std::make_tuple(
1217  std::move(write_functions),
1218  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1219  rows, slot_idx_per_target_idx, targets_to_skip));
1220  }
1221  } else {
1222  if (rows.didOutputColumnar()) {
1223  return std::make_tuple(
1224  std::move(write_functions),
1225  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1226  rows, slot_idx_per_target_idx, targets_to_skip));
1227  } else {
1228  return std::make_tuple(
1229  std::move(write_functions),
1230  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1231  rows, slot_idx_per_target_idx, targets_to_skip));
1232  }
1233  }
1234 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int8_t * > column_buffers_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1159
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:253
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:931
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:171
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1141
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:290
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:24
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)