OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Shared/Intervals.h"
22 #include "Shared/likely.h"
23 #include "Shared/thread_count.h"
24 
25 #include <atomic>
26 #include <future>
27 #include <numeric>
28 
29 namespace {
30 
31 inline int64_t fixed_encoding_nullable_val(const int64_t val,
32  const SQLTypeInfo& type_info) {
33  if (type_info.get_compression() != kENCODING_NONE) {
34  CHECK(type_info.get_compression() == kENCODING_FIXED ||
35  type_info.get_compression() == kENCODING_DICT);
36  auto logical_ti = get_logical_type_info(type_info);
37  if (val == inline_int_null_val(logical_ti)) {
38  return inline_fixed_encoding_null_val(type_info);
39  }
40  }
41  return val;
42 }
43 
44 std::vector<size_t> get_padded_target_sizes(
45  const ResultSet& rows,
46  const std::vector<SQLTypeInfo>& target_types) {
47  std::vector<size_t> padded_target_sizes;
48  // We have to check that the result set is valid as one entry point
49  // to columnar results constructs effectively a fake result set.
50  // In these cases it should be safe to assume that we can use the type
51  // target widths
52  if (!rows.hasValidBuffer() ||
53  rows.getQueryMemDesc().getColCount() < target_types.size()) {
54  for (const auto& target_type : target_types) {
55  padded_target_sizes.emplace_back(target_type.get_size());
56  }
57  return padded_target_sizes;
58  }
59 
60  // If here we have a valid result set, so use it's QMD padded widths
61  const auto col_context = rows.getQueryMemDesc().getColSlotContext();
62  for (size_t col_idx = 0; col_idx < target_types.size(); col_idx++) {
63  // Lazy fetch columns will have 0 as a padded with, so use the type's
64  // logical width for those
65  const auto idx = col_context.getSlotsForCol(col_idx).front();
66  const size_t padded_slot_width =
67  static_cast<size_t>(rows.getPaddedSlotWidthBytes(idx));
68  padded_target_sizes.emplace_back(
69  padded_slot_width == 0UL ? target_types[col_idx].get_size() : padded_slot_width);
70  }
71  return padded_target_sizes;
72 }
73 
74 } // namespace
75 
76 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
77  const ResultSet& rows,
78  const size_t num_columns,
79  const std::vector<SQLTypeInfo>& target_types,
80  const size_t executor_id,
81  const size_t thread_idx,
82  const bool is_parallel_execution_enforced)
83  : column_buffers_(num_columns)
84  , num_rows_(result_set::use_parallel_algorithms(rows) ||
85  rows.isDirectColumnarConversionPossible()
86  ? rows.entryCount()
87  : rows.rowCount())
88  , target_types_(target_types)
89  , parallel_conversion_(is_parallel_execution_enforced
90  ? true
91  : result_set::use_parallel_algorithms(rows))
92  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
93  , thread_idx_(thread_idx)
94  , padded_target_sizes_(get_padded_target_sizes(rows, target_types)) {
95  auto timer = DEBUG_TIMER(__func__);
96  column_buffers_.resize(num_columns);
97  executor_ = Executor::getExecutor(executor_id);
99  for (size_t i = 0; i < num_columns; ++i) {
100  const bool is_varlen = target_types[i].is_array() ||
101  (target_types[i].is_string() &&
102  target_types[i].get_compression() == kENCODING_NONE) ||
103  target_types[i].is_geometry();
104  if (is_varlen) {
106  }
107  CHECK_EQ(padded_target_sizes_.size(), target_types.size());
109  !rows.isZeroCopyColumnarConversionPossible(i)) {
110  column_buffers_[i] =
111  row_set_mem_owner->allocate(num_rows_ * padded_target_sizes_[i], thread_idx_);
112  }
113  }
114 
115  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
116  materializeAllColumnsDirectly(rows, num_columns);
117  } else {
118  materializeAllColumnsThroughIteration(rows, num_columns);
119  }
120 }
121 
122 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
123  const int8_t* one_col_buffer,
124  const size_t num_rows,
125  const SQLTypeInfo& target_type,
126  const size_t executor_id,
127  const size_t thread_idx)
128  : column_buffers_(1)
129  , num_rows_(num_rows)
130  , target_types_{target_type}
131  , parallel_conversion_(false)
132  , direct_columnar_conversion_(false)
133  , thread_idx_(thread_idx) {
134  auto timer = DEBUG_TIMER(__func__);
135  const bool is_varlen =
136  target_type.is_array() ||
137  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
138  target_type.is_geometry();
139  if (is_varlen) {
141  }
142  executor_ = Executor::getExecutor(executor_id);
143  padded_target_sizes_.emplace_back(target_type.get_size());
144  CHECK(executor_);
145  const auto buf_size = num_rows * target_type.get_size();
146  column_buffers_[0] =
147  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
148  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
149 }
150 
151 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
152  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
153  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
154  if (sub_results.empty()) {
155  return nullptr;
156  }
157  const auto total_row_count = std::accumulate(
158  sub_results.begin(),
159  sub_results.end(),
160  size_t(0),
161  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
162  return init + result->size();
163  });
164  std::unique_ptr<ColumnarResults> merged_results(
165  new ColumnarResults(total_row_count,
166  sub_results[0]->target_types_,
167  sub_results[0]->padded_target_sizes_));
168  const auto col_count = sub_results[0]->column_buffers_.size();
169  const auto nonempty_it = std::find_if(
170  sub_results.begin(),
171  sub_results.end(),
172  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
173  if (nonempty_it == sub_results.end()) {
174  return nullptr;
175  }
176  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
177  const auto byte_width = merged_results->padded_target_sizes_[col_idx];
178  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
179  merged_results->column_buffers_.push_back(write_ptr);
180  for (auto& rs : sub_results) {
181  CHECK_EQ(col_count, rs->column_buffers_.size());
182  if (!rs->size()) {
183  continue;
184  }
185  CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
186  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
187  write_ptr += rs->size() * byte_width;
188  }
189  }
190  return merged_results;
191 }
192 
198  const size_t num_columns) {
199  std::atomic<size_t> row_idx{0};
200  if (isParallelConversion()) {
201  const size_t worker_count = cpu_threads();
202  std::vector<std::future<void>> conversion_threads;
203  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
204  const auto crt_row = rows.getRowAtNoTranslations(i);
205  if (!crt_row.empty()) {
206  auto cur_row_idx = row_idx.fetch_add(1);
207  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
208  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
209  }
210  }
211  };
212  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
213  conversion_threads.push_back(std::async(
215  [&do_work, this](const size_t start, const size_t end) {
217  size_t local_idx = 0;
218  for (size_t i = start; i < end; ++i, ++local_idx) {
219  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
220  executor_->checkNonKernelTimeInterrupted())) {
222  }
223  do_work(i);
224  }
225  } else {
226  for (size_t i = start; i < end; ++i) {
227  do_work(i);
228  }
229  }
230  },
231  interval.begin,
232  interval.end));
233  }
234 
235  try {
236  for (auto& child : conversion_threads) {
237  child.wait();
238  }
239  } catch (QueryExecutionError& e) {
242  }
243  throw e;
244  } catch (...) {
245  throw;
246  }
247 
248  num_rows_ = row_idx;
249  rows.setCachedRowCount(num_rows_);
250  return;
251  }
252  bool done = false;
253  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
254  const auto crt_row = rows.getNextRow(false, false);
255  if (crt_row.empty()) {
256  done = true;
257  return;
258  }
259  for (size_t i = 0; i < num_columns; ++i) {
260  writeBackCell(crt_row[i], row_idx, i);
261  }
262  ++row_idx;
263  };
265  while (!done) {
266  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
267  executor_->checkNonKernelTimeInterrupted())) {
269  }
270  do_work();
271  }
272  } else {
273  while (!done) {
274  do_work();
275  }
276  }
277 
278  rows.moveToBegin();
279 }
280 
281 /*
282  * This function processes and decodes its input TargetValue
283  * and write it into its corresponding column buffer's cell (with corresponding
284  * row and column indices)
285  *
286  * NOTE: this is not supposed to be processing varlen types, and they should be
287  * handled differently outside this function.
288  */
289 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
290  const size_t row_idx,
291  const size_t column_idx) {
292  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
293  CHECK(scalar_col_val);
294  auto i64_p = boost::get<int64_t>(scalar_col_val);
295  const auto& type_info = target_types_[column_idx];
296  if (i64_p) {
297  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
298  switch (target_types_[column_idx].get_size()) {
299  case 1:
300  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
301  break;
302  case 2:
303  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
304  break;
305  case 4:
306  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
307  break;
308  case 8:
309  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
310  break;
311  default:
312  CHECK(false);
313  }
314  } else {
315  CHECK(target_types_[column_idx].is_fp());
316  switch (target_types_[column_idx].get_type()) {
317  case kFLOAT: {
318  auto float_p = boost::get<float>(scalar_col_val);
319  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
320  break;
321  }
322  case kDOUBLE: {
323  auto double_p = boost::get<double>(scalar_col_val);
324  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
325  break;
326  }
327  default:
328  CHECK(false);
329  }
330  }
331 }
332 
338 template <typename DATA_TYPE>
339 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
340  const size_t input_buffer_entry_idx,
341  const size_t output_buffer_entry_idx,
342  const size_t target_idx,
343  const size_t slot_idx,
344  const ReadFunction& read_from_function) {
345  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
346  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
347  target_types_[target_idx]));
348  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
349  val;
350 }
351 
352 template <>
353 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
354  const size_t input_buffer_entry_idx,
355  const size_t output_buffer_entry_idx,
356  const size_t target_idx,
357  const size_t slot_idx,
358  const ReadFunction& read_from_function) {
359  const int32_t ival =
360  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
361  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
362  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
363 }
364 
365 template <>
366 void ColumnarResults::writeBackCellDirect<double>(
367  const ResultSet& rows,
368  const size_t input_buffer_entry_idx,
369  const size_t output_buffer_entry_idx,
370  const size_t target_idx,
371  const size_t slot_idx,
372  const ReadFunction& read_from_function) {
373  const int64_t ival =
374  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
375  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
376  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
377 }
378 
389  const size_t num_columns) {
391  switch (rows.getQueryDescriptionType()) {
393  materializeAllColumnsProjection(rows, num_columns);
394  break;
395  }
397  materializeAllColumnsTableFunction(rows, num_columns);
398  break;
399  }
402  materializeAllColumnsGroupBy(rows, num_columns);
403  break;
404  }
405  default:
406  UNREACHABLE()
407  << "Direct columnar conversion for this query type is not supported yet.";
408  }
409 }
410 
419  const size_t num_columns) {
420  CHECK(rows.query_mem_desc_.didOutputColumnar());
422  (rows.query_mem_desc_.getQueryDescriptionType() ==
424 
425  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
426 
427  // We can directly copy each non-lazy column's content
428  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
429 
430  // Only lazy columns are iterated through first and then materialized
431  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
432 }
433 
435  const size_t num_columns) {
436  CHECK(rows.query_mem_desc_.didOutputColumnar());
438  (rows.query_mem_desc_.getQueryDescriptionType() ==
440 
441  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
442  // Lazy fetching is not currently allowed for table function outputs
443  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
444  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
445  }
446  // We can directly copy each non-lazy column's content
447  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
448 }
449 
450 /*
451  * For all non-lazy columns, we can directly copy back the results of each column's
452  * contents from different storages and put them into the corresponding output buffer.
453  *
454  * This function is parallelized through assigning each column to a CPU thread.
455  */
457  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
458  const ResultSet& rows,
459  const size_t num_columns) {
461  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
462  // Saman: make sure when this lazy_fetch_info is empty
463  if (lazy_fetch_info.empty()) {
464  return true;
465  } else {
466  return !lazy_fetch_info[col_idx].is_lazily_fetched;
467  }
468  };
469 
470  // parallelized by assigning each column to a thread
471  std::vector<std::future<void>> direct_copy_threads;
472  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
473  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
474  CHECK(!column_buffers_[col_idx]);
475  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
476  } else if (is_column_non_lazily_fetched(col_idx)) {
477  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
479  direct_copy_threads.push_back(std::async(
481  [&rows, this](const size_t column_index) {
482  const size_t column_size = num_rows_ * padded_target_sizes_[column_index];
483  rows.copyColumnIntoBuffer(
484  column_index, column_buffers_[column_index], column_size);
485  },
486  col_idx));
487  }
488  }
489 
490  for (auto& child : direct_copy_threads) {
491  child.wait();
492  }
493 }
494 
505  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
506  const ResultSet& rows,
507  const size_t num_columns) {
509  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
511  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
512  const size_t row_idx,
513  const std::vector<bool>& targets_to_skip) {
514  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
515  for (size_t i = 0; i < num_columns; ++i) {
516  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
517  writeBackCell(crt_row[i], row_idx, i);
518  }
519  }
520  };
521 
522  const auto contains_lazy_fetched_column =
523  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
524  for (auto& col_info : lazy_fetch_info) {
525  if (col_info.is_lazily_fetched) {
526  return true;
527  }
528  }
529  return false;
530  };
531 
532  // parallelized by assigning a chunk of rows to each thread)
533  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
534  if (contains_lazy_fetched_column(lazy_fetch_info)) {
535  const size_t worker_count =
537  std::vector<std::future<void>> conversion_threads;
538  std::vector<bool> targets_to_skip;
539  if (skip_non_lazy_columns) {
540  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
541  targets_to_skip.reserve(num_columns);
542  for (size_t i = 0; i < num_columns; i++) {
543  // we process lazy columns (i.e., skip non-lazy columns)
544  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
545  }
546  }
547  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
548  conversion_threads.push_back(std::async(
550  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
551  const size_t end) {
553  size_t local_idx = 0;
554  for (size_t i = start; i < end; ++i, ++local_idx) {
555  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
556  executor_->checkNonKernelTimeInterrupted())) {
558  }
559  do_work_just_lazy_columns(i, targets_to_skip);
560  }
561  } else {
562  for (size_t i = start; i < end; ++i) {
563  do_work_just_lazy_columns(i, targets_to_skip);
564  }
565  }
566  },
567  interval.begin,
568  interval.end));
569  }
570 
571  try {
572  for (auto& child : conversion_threads) {
573  child.wait();
574  }
575  } catch (QueryExecutionError& e) {
578  }
579  throw e;
580  } catch (...) {
581  throw;
582  }
583  }
584 }
585 
593  const size_t num_columns) {
595  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
596  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
597 
598  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
599  const size_t entry_count = rows.entryCount();
600  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
601 
602  // step 1: compute total non-empty elements and store a bitmap per thread
603  std::vector<size_t> non_empty_per_thread(num_threads,
604  0); // number of non-empty entries per thread
605 
606  ColumnBitmap bitmap(size_per_thread, num_threads);
607 
609  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
610 
611  // step 2: go through the generated bitmap and copy/decode corresponding entries
612  // into the output buffer
614  bitmap,
615  non_empty_per_thread,
616  num_columns,
617  entry_count,
618  num_threads,
619  size_per_thread);
620 }
621 
627 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
628  ColumnBitmap& bitmap,
629  std::vector<size_t>& non_empty_per_thread,
630  const size_t entry_count,
631  const size_t num_threads,
632  const size_t size_per_thread) const {
634  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
635  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
636  CHECK_EQ(num_threads, non_empty_per_thread.size());
637  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
638  const size_t local_idx,
639  const size_t entry_idx,
640  const size_t thread_idx) {
641  if (!rows.isRowAtEmpty(entry_idx)) {
642  total_non_empty++;
643  bitmap.set(local_idx, thread_idx, true);
644  }
645  };
646  auto locate_and_count_func =
647  [&do_work, &non_empty_per_thread, this](
648  size_t start_index, size_t end_index, size_t thread_idx) {
649  size_t total_non_empty = 0;
650  size_t local_idx = 0;
652  for (size_t entry_idx = start_index; entry_idx < end_index;
653  entry_idx++, local_idx++) {
654  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
655  executor_->checkNonKernelTimeInterrupted())) {
657  }
658  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
659  }
660  } else {
661  for (size_t entry_idx = start_index; entry_idx < end_index;
662  entry_idx++, local_idx++) {
663  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
664  }
665  }
666  non_empty_per_thread[thread_idx] = total_non_empty;
667  };
668 
669  std::vector<std::future<void>> conversion_threads;
670  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
671  const size_t start_entry = thread_idx * size_per_thread;
672  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
673  conversion_threads.push_back(std::async(
674  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
675  }
676 
677  try {
678  for (auto& child : conversion_threads) {
679  child.wait();
680  }
681  } catch (QueryExecutionError& e) {
684  }
685  throw e;
686  } catch (...) {
687  throw;
688  }
689 }
690 
701  const ResultSet& rows,
702  const ColumnBitmap& bitmap,
703  const std::vector<size_t>& non_empty_per_thread,
704  const size_t num_columns,
705  const size_t entry_count,
706  const size_t num_threads,
707  const size_t size_per_thread) {
709  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
710  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
711  CHECK_EQ(num_threads, non_empty_per_thread.size());
712 
713  // compute the exclusive scan over all non-empty totals
714  std::vector<size_t> global_offsets(num_threads + 1, 0);
715  std::partial_sum(non_empty_per_thread.begin(),
716  non_empty_per_thread.end(),
717  std::next(global_offsets.begin()));
718 
719  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
720  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
721  rows.getSupportedSingleSlotTargetBitmap();
722 
723  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
724  // differently and accessed through result set's iterator
725  if (num_single_slot_targets < num_columns) {
727  bitmap,
728  non_empty_per_thread,
729  global_offsets,
730  single_slot_targets_to_skip,
731  slot_idx_per_target_idx,
732  num_columns,
733  entry_count,
734  num_threads,
735  size_per_thread);
736  } else {
738  bitmap,
739  non_empty_per_thread,
740  global_offsets,
741  slot_idx_per_target_idx,
742  num_columns,
743  entry_count,
744  num_threads,
745  size_per_thread);
746  }
747 }
748 
756  const ResultSet& rows,
757  const ColumnBitmap& bitmap,
758  const std::vector<size_t>& non_empty_per_thread,
759  const std::vector<size_t>& global_offsets,
760  const std::vector<bool>& targets_to_skip,
761  const std::vector<size_t>& slot_idx_per_target_idx,
762  const size_t num_columns,
763  const size_t entry_count,
764  const size_t num_threads,
765  const size_t size_per_thread) {
767  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
768  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
769 
770  const auto [write_functions, read_functions] =
771  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
772  CHECK_EQ(write_functions.size(), num_columns);
773  CHECK_EQ(read_functions.size(), num_columns);
774  auto do_work = [this,
775  &bitmap,
776  &rows,
777  &slot_idx_per_target_idx,
778  &global_offsets,
779  &targets_to_skip,
780  &num_columns,
781  &write_functions = write_functions,
782  &read_functions = read_functions](size_t& non_empty_idx,
783  const size_t total_non_empty,
784  const size_t local_idx,
785  size_t& entry_idx,
786  const size_t thread_idx,
787  const size_t end_idx) {
788  if (non_empty_idx >= total_non_empty) {
789  // all non-empty entries has been written back
790  entry_idx = end_idx;
791  }
792  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
793  if (bitmap.get(local_idx, thread_idx)) {
794  // targets that are recovered from the result set iterators:
795  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
796  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
797  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
798  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
799  }
800  }
801  // targets that are copied directly without any translation/decoding from
802  // result set
803  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
804  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
805  continue;
806  }
807  write_functions[column_idx](rows,
808  entry_idx,
809  output_buffer_row_idx,
810  column_idx,
811  slot_idx_per_target_idx[column_idx],
812  read_functions[column_idx]);
813  }
814  non_empty_idx++;
815  }
816  };
817 
818  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
819  const size_t start_index,
820  const size_t end_index,
821  const size_t thread_idx) {
822  const size_t total_non_empty = non_empty_per_thread[thread_idx];
823  size_t non_empty_idx = 0;
824  size_t local_idx = 0;
826  for (size_t entry_idx = start_index; entry_idx < end_index;
827  entry_idx++, local_idx++) {
828  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
829  executor_->checkNonKernelTimeInterrupted())) {
831  }
832  do_work(
833  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
834  }
835  } else {
836  for (size_t entry_idx = start_index; entry_idx < end_index;
837  entry_idx++, local_idx++) {
838  do_work(
839  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
840  }
841  }
842  };
843 
844  std::vector<std::future<void>> compaction_threads;
845  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
846  const size_t start_entry = thread_idx * size_per_thread;
847  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
848  compaction_threads.push_back(std::async(
849  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
850  }
851 
852  try {
853  for (auto& child : compaction_threads) {
854  child.wait();
855  }
856  } catch (QueryExecutionError& e) {
859  }
860  throw e;
861  } catch (...) {
862  throw;
863  }
864 }
865 
873  const ResultSet& rows,
874  const ColumnBitmap& bitmap,
875  const std::vector<size_t>& non_empty_per_thread,
876  const std::vector<size_t>& global_offsets,
877  const std::vector<size_t>& slot_idx_per_target_idx,
878  const size_t num_columns,
879  const size_t entry_count,
880  const size_t num_threads,
881  const size_t size_per_thread) {
883  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
884  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
885 
886  const auto [write_functions, read_functions] =
887  initAllConversionFunctions(rows, slot_idx_per_target_idx);
888  CHECK_EQ(write_functions.size(), num_columns);
889  CHECK_EQ(read_functions.size(), num_columns);
890  auto do_work = [&rows,
891  &bitmap,
892  &global_offsets,
893  &num_columns,
894  &slot_idx_per_target_idx,
895  &write_functions = write_functions,
896  &read_functions = read_functions](size_t& entry_idx,
897  size_t& non_empty_idx,
898  const size_t total_non_empty,
899  const size_t local_idx,
900  const size_t thread_idx,
901  const size_t end_idx) {
902  if (non_empty_idx >= total_non_empty) {
903  // all non-empty entries has been written back
904  entry_idx = end_idx;
905  return;
906  }
907  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
908  if (bitmap.get(local_idx, thread_idx)) {
909  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
910  write_functions[column_idx](rows,
911  entry_idx,
912  output_buffer_row_idx,
913  column_idx,
914  slot_idx_per_target_idx[column_idx],
915  read_functions[column_idx]);
916  }
917  non_empty_idx++;
918  }
919  };
920  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
921  const size_t start_index,
922  const size_t end_index,
923  const size_t thread_idx) {
924  const size_t total_non_empty = non_empty_per_thread[thread_idx];
925  size_t non_empty_idx = 0;
926  size_t local_idx = 0;
928  for (size_t entry_idx = start_index; entry_idx < end_index;
929  entry_idx++, local_idx++) {
930  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
931  executor_->checkNonKernelTimeInterrupted())) {
933  }
934  do_work(
935  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
936  }
937  } else {
938  for (size_t entry_idx = start_index; entry_idx < end_index;
939  entry_idx++, local_idx++) {
940  do_work(
941  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
942  }
943  }
944  };
945 
946  std::vector<std::future<void>> compaction_threads;
947  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
948  const size_t start_entry = thread_idx * size_per_thread;
949  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
950  compaction_threads.push_back(std::async(
951  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
952  }
953 
954  try {
955  for (auto& child : compaction_threads) {
956  child.wait();
957  }
958  } catch (QueryExecutionError& e) {
961  }
962  throw e;
963  } catch (...) {
964  throw;
965  }
966 }
967 
973 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
974  const ResultSet& rows,
975  const std::vector<bool>& targets_to_skip) {
977  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
978  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
979 
980  std::vector<WriteFunction> result;
981  result.reserve(target_types_.size());
982 
983  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
984  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
985  result.emplace_back([](const ResultSet& rows,
986  const size_t input_buffer_entry_idx,
987  const size_t output_buffer_entry_idx,
988  const size_t target_idx,
989  const size_t slot_idx,
990  const ReadFunction& read_function) {
991  UNREACHABLE() << "Invalid write back function used.";
992  });
993  continue;
994  }
995 
996  if (target_types_[target_idx].is_fp()) {
997  switch (target_types_[target_idx].get_size()) {
998  case 8:
999  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1000  this,
1001  std::placeholders::_1,
1002  std::placeholders::_2,
1003  std::placeholders::_3,
1004  std::placeholders::_4,
1005  std::placeholders::_5,
1006  std::placeholders::_6));
1007  break;
1008  case 4:
1009  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1010  this,
1011  std::placeholders::_1,
1012  std::placeholders::_2,
1013  std::placeholders::_3,
1014  std::placeholders::_4,
1015  std::placeholders::_5,
1016  std::placeholders::_6));
1017  break;
1018  default:
1019  UNREACHABLE() << "Invalid target type encountered.";
1020  break;
1021  }
1022  } else {
1023  switch (target_types_[target_idx].get_size()) {
1024  case 8:
1025  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1026  this,
1027  std::placeholders::_1,
1028  std::placeholders::_2,
1029  std::placeholders::_3,
1030  std::placeholders::_4,
1031  std::placeholders::_5,
1032  std::placeholders::_6));
1033  break;
1034  case 4:
1035  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1036  this,
1037  std::placeholders::_1,
1038  std::placeholders::_2,
1039  std::placeholders::_3,
1040  std::placeholders::_4,
1041  std::placeholders::_5,
1042  std::placeholders::_6));
1043  break;
1044  case 2:
1045  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1046  this,
1047  std::placeholders::_1,
1048  std::placeholders::_2,
1049  std::placeholders::_3,
1050  std::placeholders::_4,
1051  std::placeholders::_5,
1052  std::placeholders::_6));
1053  break;
1054  case 1:
1055  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1056  this,
1057  std::placeholders::_1,
1058  std::placeholders::_2,
1059  std::placeholders::_3,
1060  std::placeholders::_4,
1061  std::placeholders::_5,
1062  std::placeholders::_6));
1063  break;
1064  default:
1065  UNREACHABLE() << "Invalid target type encountered.";
1066  break;
1067  }
1068  }
1069  }
1070  return result;
1071 }
1072 
1073 namespace {
1074 
1075 int64_t invalid_read_func(const ResultSet& rows,
1076  const size_t input_buffer_entry_idx,
1077  const size_t target_idx,
1078  const size_t slot_idx) {
1079  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1080  return static_cast<int64_t>(0);
1081 }
1082 
1083 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1084 int64_t read_float_key_baseline(const ResultSet& rows,
1085  const size_t input_buffer_entry_idx,
1086  const size_t target_idx,
1087  const size_t slot_idx) {
1088  // float keys in baseline hash are written as doubles in the buffer, so
1089  // the result should properly be casted before being written in the output
1090  // columns
1091  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1092  input_buffer_entry_idx, target_idx, slot_idx));
1093  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1094 }
1095 
1096 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1097 int64_t read_int64_func(const ResultSet& rows,
1098  const size_t input_buffer_entry_idx,
1099  const size_t target_idx,
1100  const size_t slot_idx) {
1101  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1102  input_buffer_entry_idx, target_idx, slot_idx);
1103 }
1104 
1105 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1106 int64_t read_int32_func(const ResultSet& rows,
1107  const size_t input_buffer_entry_idx,
1108  const size_t target_idx,
1109  const size_t slot_idx) {
1110  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1111  input_buffer_entry_idx, target_idx, slot_idx);
1112 }
1113 
1114 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1115 int64_t read_int16_func(const ResultSet& rows,
1116  const size_t input_buffer_entry_idx,
1117  const size_t target_idx,
1118  const size_t slot_idx) {
1119  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1120  input_buffer_entry_idx, target_idx, slot_idx);
1121 }
1122 
1123 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1124 int64_t read_int8_func(const ResultSet& rows,
1125  const size_t input_buffer_entry_idx,
1126  const size_t target_idx,
1127  const size_t slot_idx) {
1128  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1129  input_buffer_entry_idx, target_idx, slot_idx);
1130 }
1131 
1132 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1133 int64_t read_float_func(const ResultSet& rows,
1134  const size_t input_buffer_entry_idx,
1135  const size_t target_idx,
1136  const size_t slot_idx) {
1137  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1138  input_buffer_entry_idx, target_idx, slot_idx);
1139  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1140 }
1141 
1142 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1143 int64_t read_double_func(const ResultSet& rows,
1144  const size_t input_buffer_entry_idx,
1145  const size_t target_idx,
1146  const size_t slot_idx) {
1147  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1148  input_buffer_entry_idx, target_idx, slot_idx);
1149  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1150 }
1151 
1152 } // namespace
1153 
1160 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1161 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1162  const ResultSet& rows,
1163  const std::vector<size_t>& slot_idx_per_target_idx,
1164  const std::vector<bool>& targets_to_skip) {
1166  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1167  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1168 
1169  std::vector<ReadFunction> read_functions;
1170  read_functions.reserve(target_types_.size());
1171 
1172  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1173  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1174  // for targets that should be skipped, we use a placeholder function that should
1175  // never be called. The CHECKs inside it make sure that never happens.
1176  read_functions.emplace_back(invalid_read_func);
1177  continue;
1178  }
1179 
1180  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1181  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1182  // for key columns only
1183  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1184  if (target_types_[target_idx].is_fp()) {
1185  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1186  switch (target_types_[target_idx].get_type()) {
1187  case kFLOAT:
1188  read_functions.emplace_back(
1189  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1190  break;
1191  case kDOUBLE:
1192  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1193  break;
1194  default:
1195  UNREACHABLE()
1196  << "Invalid data type encountered (BaselineHash, floating point key).";
1197  break;
1198  }
1199  } else {
1200  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1201  case 8:
1202  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1203  break;
1204  case 4:
1205  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1206  break;
1207  default:
1208  UNREACHABLE()
1209  << "Invalid data type encountered (BaselineHash, integer key).";
1210  }
1211  }
1212  continue;
1213  }
1214  }
1215  if (target_types_[target_idx].is_fp()) {
1216  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1217  case 8:
1218  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1219  break;
1220  case 4:
1221  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1222  break;
1223  default:
1224  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1225  break;
1226  }
1227  } else {
1228  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1229  case 8:
1230  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1231  break;
1232  case 4:
1233  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1234  break;
1235  case 2:
1236  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1237  break;
1238  case 1:
1239  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1240  break;
1241  default:
1242  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1243  break;
1244  }
1245  }
1246  }
1247  return read_functions;
1248 }
1249 
1257 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1258  std::vector<ColumnarResults::ReadFunction>>
1260  const ResultSet& rows,
1261  const std::vector<size_t>& slot_idx_per_target_idx,
1262  const std::vector<bool>& targets_to_skip) {
1264  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1265  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1266 
1267  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1268  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1269  if (rows.didOutputColumnar()) {
1270  return std::make_tuple(
1271  std::move(write_functions),
1272  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1273  rows, slot_idx_per_target_idx, targets_to_skip));
1274  } else {
1275  return std::make_tuple(
1276  std::move(write_functions),
1277  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1278  rows, slot_idx_per_target_idx, targets_to_skip));
1279  }
1280  } else {
1281  if (rows.didOutputColumnar()) {
1282  return std::make_tuple(
1283  std::move(write_functions),
1284  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1285  rows, slot_idx_per_target_idx, targets_to_skip));
1286  } else {
1287  return std::make_tuple(
1288  std::move(write_functions),
1289  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1290  rows, slot_idx_per_target_idx, targets_to_skip));
1291  }
1292  }
1293 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int8_t * > column_buffers_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1346
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:266
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1087
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1468
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:308
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
std::vector< size_t > padded_target_sizes_
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:165
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:24
const std::vector< SQLTypeInfo > target_types_
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)