OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Shared/Intervals.h"
22 #include "Shared/likely.h"
23 #include "Shared/thread_count.h"
24 
25 #include <atomic>
26 #include <future>
27 #include <numeric>
28 
29 namespace {
30 
31 inline int64_t fixed_encoding_nullable_val(const int64_t val,
32  const SQLTypeInfo& type_info) {
33  if (type_info.get_compression() != kENCODING_NONE) {
34  CHECK(type_info.get_compression() == kENCODING_FIXED ||
35  type_info.get_compression() == kENCODING_DICT);
36  auto logical_ti = get_logical_type_info(type_info);
37  if (val == inline_int_null_val(logical_ti)) {
38  return inline_fixed_encoding_null_val(type_info);
39  }
40  }
41  return val;
42 }
43 
44 } // namespace
45 
46 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
47  const ResultSet& rows,
48  const size_t num_columns,
49  const std::vector<SQLTypeInfo>& target_types,
50  const size_t executor_id,
51  const size_t thread_idx,
52  const bool is_parallel_execution_enforced)
53  : column_buffers_(num_columns)
54  , num_rows_(result_set::use_parallel_algorithms(rows) ||
55  rows.isDirectColumnarConversionPossible()
56  ? rows.entryCount()
57  : rows.rowCount())
58  , target_types_(target_types)
59  , parallel_conversion_(is_parallel_execution_enforced
60  ? true
61  : result_set::use_parallel_algorithms(rows))
62  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
63  , thread_idx_(thread_idx) {
64  auto timer = DEBUG_TIMER(__func__);
65  column_buffers_.resize(num_columns);
66  executor_ = Executor::getExecutor(executor_id);
68  for (size_t i = 0; i < num_columns; ++i) {
69  const bool is_varlen = target_types[i].is_array() ||
70  (target_types[i].is_string() &&
71  target_types[i].get_compression() == kENCODING_NONE) ||
72  target_types[i].is_geometry();
73  if (is_varlen) {
75  }
77  !rows.isZeroCopyColumnarConversionPossible(i)) {
78  column_buffers_[i] = row_set_mem_owner->allocate(
79  num_rows_ * target_types[i].get_size(), thread_idx_);
80  }
81  }
82 
83  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
84  materializeAllColumnsDirectly(rows, num_columns);
85  } else {
86  materializeAllColumnsThroughIteration(rows, num_columns);
87  }
88 }
89 
90 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
91  const int8_t* one_col_buffer,
92  const size_t num_rows,
93  const SQLTypeInfo& target_type,
94  const size_t executor_id,
95  const size_t thread_idx)
96  : column_buffers_(1)
97  , num_rows_(num_rows)
98  , target_types_{target_type}
99  , parallel_conversion_(false)
100  , direct_columnar_conversion_(false)
101  , thread_idx_(thread_idx) {
102  auto timer = DEBUG_TIMER(__func__);
103  const bool is_varlen =
104  target_type.is_array() ||
105  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
106  target_type.is_geometry();
107  if (is_varlen) {
109  }
110  executor_ = Executor::getExecutor(executor_id);
111  CHECK(executor_);
112  const auto buf_size = num_rows * target_type.get_size();
113  column_buffers_[0] =
114  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
115  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
116 }
117 
118 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
119  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
120  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
121  if (sub_results.empty()) {
122  return nullptr;
123  }
124  const auto total_row_count = std::accumulate(
125  sub_results.begin(),
126  sub_results.end(),
127  size_t(0),
128  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
129  return init + result->size();
130  });
131  std::unique_ptr<ColumnarResults> merged_results(
132  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
133  const auto col_count = sub_results[0]->column_buffers_.size();
134  const auto nonempty_it = std::find_if(
135  sub_results.begin(),
136  sub_results.end(),
137  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
138  if (nonempty_it == sub_results.end()) {
139  return nullptr;
140  }
141  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
142  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
143  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
144  merged_results->column_buffers_.push_back(write_ptr);
145  for (auto& rs : sub_results) {
146  CHECK_EQ(col_count, rs->column_buffers_.size());
147  if (!rs->size()) {
148  continue;
149  }
150  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
151  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
152  write_ptr += rs->size() * byte_width;
153  }
154  }
155  return merged_results;
156 }
157 
163  const size_t num_columns) {
164  std::atomic<size_t> row_idx{0};
165  if (isParallelConversion()) {
166  const size_t worker_count = cpu_threads();
167  std::vector<std::future<void>> conversion_threads;
168  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
169  const auto crt_row = rows.getRowAtNoTranslations(i);
170  if (!crt_row.empty()) {
171  auto cur_row_idx = row_idx.fetch_add(1);
172  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
173  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
174  }
175  }
176  };
177  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
178  conversion_threads.push_back(std::async(
180  [&do_work, this](const size_t start, const size_t end) {
182  size_t local_idx = 0;
183  for (size_t i = start; i < end; ++i, ++local_idx) {
184  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
185  executor_->checkNonKernelTimeInterrupted())) {
187  }
188  do_work(i);
189  }
190  } else {
191  for (size_t i = start; i < end; ++i) {
192  do_work(i);
193  }
194  }
195  },
196  interval.begin,
197  interval.end));
198  }
199 
200  try {
201  for (auto& child : conversion_threads) {
202  child.wait();
203  }
204  } catch (QueryExecutionError& e) {
207  }
208  throw e;
209  } catch (...) {
210  throw;
211  }
212 
213  num_rows_ = row_idx;
214  rows.setCachedRowCount(num_rows_);
215  return;
216  }
217  bool done = false;
218  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
219  const auto crt_row = rows.getNextRow(false, false);
220  if (crt_row.empty()) {
221  done = true;
222  return;
223  }
224  for (size_t i = 0; i < num_columns; ++i) {
225  writeBackCell(crt_row[i], row_idx, i);
226  }
227  ++row_idx;
228  };
230  while (!done) {
231  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
232  executor_->checkNonKernelTimeInterrupted())) {
234  }
235  do_work();
236  }
237  } else {
238  while (!done) {
239  do_work();
240  }
241  }
242 
243  rows.moveToBegin();
244 }
245 
246 /*
247  * This function processes and decodes its input TargetValue
248  * and write it into its corresponding column buffer's cell (with corresponding
249  * row and column indices)
250  *
251  * NOTE: this is not supposed to be processing varlen types, and they should be
252  * handled differently outside this function.
253  */
254 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
255  const size_t row_idx,
256  const size_t column_idx) {
257  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
258  CHECK(scalar_col_val);
259  auto i64_p = boost::get<int64_t>(scalar_col_val);
260  const auto& type_info = target_types_[column_idx];
261  if (i64_p) {
262  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
263  switch (target_types_[column_idx].get_size()) {
264  case 1:
265  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
266  break;
267  case 2:
268  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
269  break;
270  case 4:
271  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
272  break;
273  case 8:
274  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
275  break;
276  default:
277  CHECK(false);
278  }
279  } else {
280  CHECK(target_types_[column_idx].is_fp());
281  switch (target_types_[column_idx].get_type()) {
282  case kFLOAT: {
283  auto float_p = boost::get<float>(scalar_col_val);
284  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
285  break;
286  }
287  case kDOUBLE: {
288  auto double_p = boost::get<double>(scalar_col_val);
289  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
290  break;
291  }
292  default:
293  CHECK(false);
294  }
295  }
296 }
297 
303 template <typename DATA_TYPE>
304 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
305  const size_t input_buffer_entry_idx,
306  const size_t output_buffer_entry_idx,
307  const size_t target_idx,
308  const size_t slot_idx,
309  const ReadFunction& read_from_function) {
310  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
311  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
312  target_types_[target_idx]));
313  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
314  val;
315 }
316 
317 template <>
318 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
319  const size_t input_buffer_entry_idx,
320  const size_t output_buffer_entry_idx,
321  const size_t target_idx,
322  const size_t slot_idx,
323  const ReadFunction& read_from_function) {
324  const int32_t ival =
325  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
326  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
327  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
328 }
329 
330 template <>
331 void ColumnarResults::writeBackCellDirect<double>(
332  const ResultSet& rows,
333  const size_t input_buffer_entry_idx,
334  const size_t output_buffer_entry_idx,
335  const size_t target_idx,
336  const size_t slot_idx,
337  const ReadFunction& read_from_function) {
338  const int64_t ival =
339  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
340  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
341  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
342 }
343 
354  const size_t num_columns) {
356  switch (rows.getQueryDescriptionType()) {
358  materializeAllColumnsProjection(rows, num_columns);
359  break;
360  }
362  materializeAllColumnsTableFunction(rows, num_columns);
363  break;
364  }
367  materializeAllColumnsGroupBy(rows, num_columns);
368  break;
369  }
370  default:
371  UNREACHABLE()
372  << "Direct columnar conversion for this query type is not supported yet.";
373  }
374 }
375 
384  const size_t num_columns) {
385  CHECK(rows.query_mem_desc_.didOutputColumnar());
387  (rows.query_mem_desc_.getQueryDescriptionType() ==
389 
390  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
391 
392  // We can directly copy each non-lazy column's content
393  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
394 
395  // Only lazy columns are iterated through first and then materialized
396  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
397 }
398 
400  const size_t num_columns) {
401  CHECK(rows.query_mem_desc_.didOutputColumnar());
403  (rows.query_mem_desc_.getQueryDescriptionType() ==
405 
406  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
407  // Lazy fetching is not currently allowed for table function outputs
408  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
409  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
410  }
411  // We can directly copy each non-lazy column's content
412  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
413 }
414 
415 /*
416  * For all non-lazy columns, we can directly copy back the results of each column's
417  * contents from different storages and put them into the corresponding output buffer.
418  *
419  * This function is parallelized through assigning each column to a CPU thread.
420  */
422  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
423  const ResultSet& rows,
424  const size_t num_columns) {
426  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
427  // Saman: make sure when this lazy_fetch_info is empty
428  if (lazy_fetch_info.empty()) {
429  return true;
430  } else {
431  return !lazy_fetch_info[col_idx].is_lazily_fetched;
432  }
433  };
434 
435  // parallelized by assigning each column to a thread
436  std::vector<std::future<void>> direct_copy_threads;
437  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
438  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
439  CHECK(!column_buffers_[col_idx]);
440  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
441  } else if (is_column_non_lazily_fetched(col_idx)) {
442  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
444  direct_copy_threads.push_back(std::async(
446  [&rows, this](const size_t column_index) {
447  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
448  rows.copyColumnIntoBuffer(
449  column_index, column_buffers_[column_index], column_size);
450  },
451  col_idx));
452  }
453  }
454 
455  for (auto& child : direct_copy_threads) {
456  child.wait();
457  }
458 }
459 
470  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
471  const ResultSet& rows,
472  const size_t num_columns) {
474  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
476  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
477  const size_t row_idx,
478  const std::vector<bool>& targets_to_skip) {
479  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
480  for (size_t i = 0; i < num_columns; ++i) {
481  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
482  writeBackCell(crt_row[i], row_idx, i);
483  }
484  }
485  };
486 
487  const auto contains_lazy_fetched_column =
488  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
489  for (auto& col_info : lazy_fetch_info) {
490  if (col_info.is_lazily_fetched) {
491  return true;
492  }
493  }
494  return false;
495  };
496 
497  // parallelized by assigning a chunk of rows to each thread)
498  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
499  if (contains_lazy_fetched_column(lazy_fetch_info)) {
500  const size_t worker_count =
502  std::vector<std::future<void>> conversion_threads;
503  std::vector<bool> targets_to_skip;
504  if (skip_non_lazy_columns) {
505  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
506  targets_to_skip.reserve(num_columns);
507  for (size_t i = 0; i < num_columns; i++) {
508  // we process lazy columns (i.e., skip non-lazy columns)
509  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
510  }
511  }
512  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
513  conversion_threads.push_back(std::async(
515  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
516  const size_t end) {
518  size_t local_idx = 0;
519  for (size_t i = start; i < end; ++i, ++local_idx) {
520  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
521  executor_->checkNonKernelTimeInterrupted())) {
523  }
524  do_work_just_lazy_columns(i, targets_to_skip);
525  }
526  } else {
527  for (size_t i = start; i < end; ++i) {
528  do_work_just_lazy_columns(i, targets_to_skip);
529  }
530  }
531  },
532  interval.begin,
533  interval.end));
534  }
535 
536  try {
537  for (auto& child : conversion_threads) {
538  child.wait();
539  }
540  } catch (QueryExecutionError& e) {
543  }
544  throw e;
545  } catch (...) {
546  throw;
547  }
548  }
549 }
550 
558  const size_t num_columns) {
560  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
561  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
562 
563  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
564  const size_t entry_count = rows.entryCount();
565  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
566 
567  // step 1: compute total non-empty elements and store a bitmap per thread
568  std::vector<size_t> non_empty_per_thread(num_threads,
569  0); // number of non-empty entries per thread
570 
571  ColumnBitmap bitmap(size_per_thread, num_threads);
572 
574  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
575 
576  // step 2: go through the generated bitmap and copy/decode corresponding entries
577  // into the output buffer
579  bitmap,
580  non_empty_per_thread,
581  num_columns,
582  entry_count,
583  num_threads,
584  size_per_thread);
585 }
586 
592 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
593  ColumnBitmap& bitmap,
594  std::vector<size_t>& non_empty_per_thread,
595  const size_t entry_count,
596  const size_t num_threads,
597  const size_t size_per_thread) const {
599  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
600  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
601  CHECK_EQ(num_threads, non_empty_per_thread.size());
602  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
603  const size_t local_idx,
604  const size_t entry_idx,
605  const size_t thread_idx) {
606  if (!rows.isRowAtEmpty(entry_idx)) {
607  total_non_empty++;
608  bitmap.set(local_idx, thread_idx, true);
609  }
610  };
611  auto locate_and_count_func =
612  [&do_work, &non_empty_per_thread, this](
613  size_t start_index, size_t end_index, size_t thread_idx) {
614  size_t total_non_empty = 0;
615  size_t local_idx = 0;
617  for (size_t entry_idx = start_index; entry_idx < end_index;
618  entry_idx++, local_idx++) {
619  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
620  executor_->checkNonKernelTimeInterrupted())) {
622  }
623  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
624  }
625  } else {
626  for (size_t entry_idx = start_index; entry_idx < end_index;
627  entry_idx++, local_idx++) {
628  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
629  }
630  }
631  non_empty_per_thread[thread_idx] = total_non_empty;
632  };
633 
634  std::vector<std::future<void>> conversion_threads;
635  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
636  const size_t start_entry = thread_idx * size_per_thread;
637  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
638  conversion_threads.push_back(std::async(
639  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
640  }
641 
642  try {
643  for (auto& child : conversion_threads) {
644  child.wait();
645  }
646  } catch (QueryExecutionError& e) {
649  }
650  throw e;
651  } catch (...) {
652  throw;
653  }
654 }
655 
666  const ResultSet& rows,
667  const ColumnBitmap& bitmap,
668  const std::vector<size_t>& non_empty_per_thread,
669  const size_t num_columns,
670  const size_t entry_count,
671  const size_t num_threads,
672  const size_t size_per_thread) {
674  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
675  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
676  CHECK_EQ(num_threads, non_empty_per_thread.size());
677 
678  // compute the exclusive scan over all non-empty totals
679  std::vector<size_t> global_offsets(num_threads + 1, 0);
680  std::partial_sum(non_empty_per_thread.begin(),
681  non_empty_per_thread.end(),
682  std::next(global_offsets.begin()));
683 
684  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
685  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
686  rows.getSupportedSingleSlotTargetBitmap();
687 
688  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
689  // differently and accessed through result set's iterator
690  if (num_single_slot_targets < num_columns) {
692  bitmap,
693  non_empty_per_thread,
694  global_offsets,
695  single_slot_targets_to_skip,
696  slot_idx_per_target_idx,
697  num_columns,
698  entry_count,
699  num_threads,
700  size_per_thread);
701  } else {
703  bitmap,
704  non_empty_per_thread,
705  global_offsets,
706  slot_idx_per_target_idx,
707  num_columns,
708  entry_count,
709  num_threads,
710  size_per_thread);
711  }
712 }
713 
721  const ResultSet& rows,
722  const ColumnBitmap& bitmap,
723  const std::vector<size_t>& non_empty_per_thread,
724  const std::vector<size_t>& global_offsets,
725  const std::vector<bool>& targets_to_skip,
726  const std::vector<size_t>& slot_idx_per_target_idx,
727  const size_t num_columns,
728  const size_t entry_count,
729  const size_t num_threads,
730  const size_t size_per_thread) {
732  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
733  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
734 
735  const auto [write_functions, read_functions] =
736  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
737  CHECK_EQ(write_functions.size(), num_columns);
738  CHECK_EQ(read_functions.size(), num_columns);
739  auto do_work = [this,
740  &bitmap,
741  &rows,
742  &slot_idx_per_target_idx,
743  &global_offsets,
744  &targets_to_skip,
745  &num_columns,
746  &write_functions = write_functions,
747  &read_functions = read_functions](size_t& non_empty_idx,
748  const size_t total_non_empty,
749  const size_t local_idx,
750  size_t& entry_idx,
751  const size_t thread_idx,
752  const size_t end_idx) {
753  if (non_empty_idx >= total_non_empty) {
754  // all non-empty entries has been written back
755  entry_idx = end_idx;
756  }
757  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
758  if (bitmap.get(local_idx, thread_idx)) {
759  // targets that are recovered from the result set iterators:
760  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
761  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
762  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
763  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
764  }
765  }
766  // targets that are copied directly without any translation/decoding from
767  // result set
768  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
769  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
770  continue;
771  }
772  write_functions[column_idx](rows,
773  entry_idx,
774  output_buffer_row_idx,
775  column_idx,
776  slot_idx_per_target_idx[column_idx],
777  read_functions[column_idx]);
778  }
779  non_empty_idx++;
780  }
781  };
782 
783  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
784  const size_t start_index,
785  const size_t end_index,
786  const size_t thread_idx) {
787  const size_t total_non_empty = non_empty_per_thread[thread_idx];
788  size_t non_empty_idx = 0;
789  size_t local_idx = 0;
791  for (size_t entry_idx = start_index; entry_idx < end_index;
792  entry_idx++, local_idx++) {
793  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
794  executor_->checkNonKernelTimeInterrupted())) {
796  }
797  do_work(
798  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
799  }
800  } else {
801  for (size_t entry_idx = start_index; entry_idx < end_index;
802  entry_idx++, local_idx++) {
803  do_work(
804  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
805  }
806  }
807  };
808 
809  std::vector<std::future<void>> compaction_threads;
810  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
811  const size_t start_entry = thread_idx * size_per_thread;
812  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
813  compaction_threads.push_back(std::async(
814  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
815  }
816 
817  try {
818  for (auto& child : compaction_threads) {
819  child.wait();
820  }
821  } catch (QueryExecutionError& e) {
824  }
825  throw e;
826  } catch (...) {
827  throw;
828  }
829 }
830 
838  const ResultSet& rows,
839  const ColumnBitmap& bitmap,
840  const std::vector<size_t>& non_empty_per_thread,
841  const std::vector<size_t>& global_offsets,
842  const std::vector<size_t>& slot_idx_per_target_idx,
843  const size_t num_columns,
844  const size_t entry_count,
845  const size_t num_threads,
846  const size_t size_per_thread) {
848  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
849  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
850 
851  const auto [write_functions, read_functions] =
852  initAllConversionFunctions(rows, slot_idx_per_target_idx);
853  CHECK_EQ(write_functions.size(), num_columns);
854  CHECK_EQ(read_functions.size(), num_columns);
855  auto do_work = [&rows,
856  &bitmap,
857  &global_offsets,
858  &num_columns,
859  &slot_idx_per_target_idx,
860  &write_functions = write_functions,
861  &read_functions = read_functions](size_t& entry_idx,
862  size_t& non_empty_idx,
863  const size_t total_non_empty,
864  const size_t local_idx,
865  const size_t thread_idx,
866  const size_t end_idx) {
867  if (non_empty_idx >= total_non_empty) {
868  // all non-empty entries has been written back
869  entry_idx = end_idx;
870  return;
871  }
872  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
873  if (bitmap.get(local_idx, thread_idx)) {
874  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
875  write_functions[column_idx](rows,
876  entry_idx,
877  output_buffer_row_idx,
878  column_idx,
879  slot_idx_per_target_idx[column_idx],
880  read_functions[column_idx]);
881  }
882  non_empty_idx++;
883  }
884  };
885  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
886  const size_t start_index,
887  const size_t end_index,
888  const size_t thread_idx) {
889  const size_t total_non_empty = non_empty_per_thread[thread_idx];
890  size_t non_empty_idx = 0;
891  size_t local_idx = 0;
893  for (size_t entry_idx = start_index; entry_idx < end_index;
894  entry_idx++, local_idx++) {
895  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
896  executor_->checkNonKernelTimeInterrupted())) {
898  }
899  do_work(
900  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
901  }
902  } else {
903  for (size_t entry_idx = start_index; entry_idx < end_index;
904  entry_idx++, local_idx++) {
905  do_work(
906  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
907  }
908  }
909  };
910 
911  std::vector<std::future<void>> compaction_threads;
912  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
913  const size_t start_entry = thread_idx * size_per_thread;
914  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
915  compaction_threads.push_back(std::async(
916  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
917  }
918 
919  try {
920  for (auto& child : compaction_threads) {
921  child.wait();
922  }
923  } catch (QueryExecutionError& e) {
926  }
927  throw e;
928  } catch (...) {
929  throw;
930  }
931 }
932 
938 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
939  const ResultSet& rows,
940  const std::vector<bool>& targets_to_skip) {
942  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
943  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
944 
945  std::vector<WriteFunction> result;
946  result.reserve(target_types_.size());
947 
948  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
949  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
950  result.emplace_back([](const ResultSet& rows,
951  const size_t input_buffer_entry_idx,
952  const size_t output_buffer_entry_idx,
953  const size_t target_idx,
954  const size_t slot_idx,
955  const ReadFunction& read_function) {
956  UNREACHABLE() << "Invalid write back function used.";
957  });
958  continue;
959  }
960 
961  if (target_types_[target_idx].is_fp()) {
962  switch (target_types_[target_idx].get_size()) {
963  case 8:
964  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
965  this,
966  std::placeholders::_1,
967  std::placeholders::_2,
968  std::placeholders::_3,
969  std::placeholders::_4,
970  std::placeholders::_5,
971  std::placeholders::_6));
972  break;
973  case 4:
974  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
975  this,
976  std::placeholders::_1,
977  std::placeholders::_2,
978  std::placeholders::_3,
979  std::placeholders::_4,
980  std::placeholders::_5,
981  std::placeholders::_6));
982  break;
983  default:
984  UNREACHABLE() << "Invalid target type encountered.";
985  break;
986  }
987  } else {
988  switch (target_types_[target_idx].get_size()) {
989  case 8:
990  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
991  this,
992  std::placeholders::_1,
993  std::placeholders::_2,
994  std::placeholders::_3,
995  std::placeholders::_4,
996  std::placeholders::_5,
997  std::placeholders::_6));
998  break;
999  case 4:
1000  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1001  this,
1002  std::placeholders::_1,
1003  std::placeholders::_2,
1004  std::placeholders::_3,
1005  std::placeholders::_4,
1006  std::placeholders::_5,
1007  std::placeholders::_6));
1008  break;
1009  case 2:
1010  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1011  this,
1012  std::placeholders::_1,
1013  std::placeholders::_2,
1014  std::placeholders::_3,
1015  std::placeholders::_4,
1016  std::placeholders::_5,
1017  std::placeholders::_6));
1018  break;
1019  case 1:
1020  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1021  this,
1022  std::placeholders::_1,
1023  std::placeholders::_2,
1024  std::placeholders::_3,
1025  std::placeholders::_4,
1026  std::placeholders::_5,
1027  std::placeholders::_6));
1028  break;
1029  default:
1030  UNREACHABLE() << "Invalid target type encountered.";
1031  break;
1032  }
1033  }
1034  }
1035  return result;
1036 }
1037 
1038 namespace {
1039 
1040 int64_t invalid_read_func(const ResultSet& rows,
1041  const size_t input_buffer_entry_idx,
1042  const size_t target_idx,
1043  const size_t slot_idx) {
1044  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1045  return static_cast<int64_t>(0);
1046 }
1047 
1048 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1049 int64_t read_float_key_baseline(const ResultSet& rows,
1050  const size_t input_buffer_entry_idx,
1051  const size_t target_idx,
1052  const size_t slot_idx) {
1053  // float keys in baseline hash are written as doubles in the buffer, so
1054  // the result should properly be casted before being written in the output
1055  // columns
1056  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1057  input_buffer_entry_idx, target_idx, slot_idx));
1058  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1059 }
1060 
1061 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1062 int64_t read_int64_func(const ResultSet& rows,
1063  const size_t input_buffer_entry_idx,
1064  const size_t target_idx,
1065  const size_t slot_idx) {
1066  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1067  input_buffer_entry_idx, target_idx, slot_idx);
1068 }
1069 
1070 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1071 int64_t read_int32_func(const ResultSet& rows,
1072  const size_t input_buffer_entry_idx,
1073  const size_t target_idx,
1074  const size_t slot_idx) {
1075  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1076  input_buffer_entry_idx, target_idx, slot_idx);
1077 }
1078 
1079 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1080 int64_t read_int16_func(const ResultSet& rows,
1081  const size_t input_buffer_entry_idx,
1082  const size_t target_idx,
1083  const size_t slot_idx) {
1084  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1085  input_buffer_entry_idx, target_idx, slot_idx);
1086 }
1087 
1088 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1089 int64_t read_int8_func(const ResultSet& rows,
1090  const size_t input_buffer_entry_idx,
1091  const size_t target_idx,
1092  const size_t slot_idx) {
1093  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1094  input_buffer_entry_idx, target_idx, slot_idx);
1095 }
1096 
1097 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1098 int64_t read_float_func(const ResultSet& rows,
1099  const size_t input_buffer_entry_idx,
1100  const size_t target_idx,
1101  const size_t slot_idx) {
1102  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1103  input_buffer_entry_idx, target_idx, slot_idx);
1104  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1105 }
1106 
1107 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1108 int64_t read_double_func(const ResultSet& rows,
1109  const size_t input_buffer_entry_idx,
1110  const size_t target_idx,
1111  const size_t slot_idx) {
1112  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1113  input_buffer_entry_idx, target_idx, slot_idx);
1114  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1115 }
1116 
1117 } // namespace
1118 
1125 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1126 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1127  const ResultSet& rows,
1128  const std::vector<size_t>& slot_idx_per_target_idx,
1129  const std::vector<bool>& targets_to_skip) {
1131  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1132  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1133 
1134  std::vector<ReadFunction> read_functions;
1135  read_functions.reserve(target_types_.size());
1136 
1137  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1138  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1139  // for targets that should be skipped, we use a placeholder function that should
1140  // never be called. The CHECKs inside it make sure that never happens.
1141  read_functions.emplace_back(invalid_read_func);
1142  continue;
1143  }
1144 
1145  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1146  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1147  // for key columns only
1148  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1149  if (target_types_[target_idx].is_fp()) {
1150  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1151  switch (target_types_[target_idx].get_type()) {
1152  case kFLOAT:
1153  read_functions.emplace_back(
1154  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1155  break;
1156  case kDOUBLE:
1157  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1158  break;
1159  default:
1160  UNREACHABLE()
1161  << "Invalid data type encountered (BaselineHash, floating point key).";
1162  break;
1163  }
1164  } else {
1165  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1166  case 8:
1167  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1168  break;
1169  case 4:
1170  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1171  break;
1172  default:
1173  UNREACHABLE()
1174  << "Invalid data type encountered (BaselineHash, integer key).";
1175  }
1176  }
1177  continue;
1178  }
1179  }
1180  if (target_types_[target_idx].is_fp()) {
1181  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1182  case 8:
1183  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1184  break;
1185  case 4:
1186  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1187  break;
1188  default:
1189  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1190  break;
1191  }
1192  } else {
1193  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1194  case 8:
1195  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1196  break;
1197  case 4:
1198  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1199  break;
1200  case 2:
1201  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1202  break;
1203  case 1:
1204  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1205  break;
1206  default:
1207  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1208  break;
1209  }
1210  }
1211  }
1212  return read_functions;
1213 }
1214 
1222 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1223  std::vector<ColumnarResults::ReadFunction>>
1225  const ResultSet& rows,
1226  const std::vector<size_t>& slot_idx_per_target_idx,
1227  const std::vector<bool>& targets_to_skip) {
1229  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1230  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1231 
1232  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1233  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1234  if (rows.didOutputColumnar()) {
1235  return std::make_tuple(
1236  std::move(write_functions),
1237  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1238  rows, slot_idx_per_target_idx, targets_to_skip));
1239  } else {
1240  return std::make_tuple(
1241  std::move(write_functions),
1242  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1243  rows, slot_idx_per_target_idx, targets_to_skip));
1244  }
1245  } else {
1246  if (rows.didOutputColumnar()) {
1247  return std::make_tuple(
1248  std::move(write_functions),
1249  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1250  rows, slot_idx_per_target_idx, targets_to_skip));
1251  } else {
1252  return std::make_tuple(
1253  std::move(write_functions),
1254  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1255  rows, slot_idx_per_target_idx, targets_to_skip));
1256  }
1257  }
1258 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:219
std::vector< int8_t * > column_buffers_
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1277
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:255
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1064
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:121
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:388
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1354
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:305
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:24
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)