OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "ErrorHandling.h"
20 #include "Execute.h"
21 #include "Shared/Intervals.h"
22 #include "Shared/likely.h"
23 #include "Shared/sqltypes.h"
24 #include "Shared/thread_count.h"
25 
26 #include <tbb/parallel_reduce.h>
27 #include <atomic>
28 #include <future>
29 #include <numeric>
30 
31 namespace {
32 
33 inline int64_t fixed_encoding_nullable_val(const int64_t val,
34  const SQLTypeInfo& type_info) {
35  if (type_info.get_compression() != kENCODING_NONE) {
36  CHECK(type_info.get_compression() == kENCODING_FIXED ||
37  type_info.get_compression() == kENCODING_DICT);
38  auto logical_ti = get_logical_type_info(type_info);
39  if (val == inline_int_null_val(logical_ti)) {
40  return inline_fixed_encoding_null_val(type_info);
41  }
42  }
43  return val;
44 }
45 
46 std::vector<size_t> get_padded_target_sizes(
47  const ResultSet& rows,
48  const std::vector<SQLTypeInfo>& target_types) {
49  std::vector<size_t> padded_target_sizes;
50  // We have to check that the result set is valid as one entry point
51  // to columnar results constructs effectively a fake result set.
52  // In these cases it should be safe to assume that we can use the type
53  // target widths
54  if (!rows.hasValidBuffer() ||
55  rows.getQueryMemDesc().getColCount() < target_types.size()) {
56  for (const auto& target_type : target_types) {
57  padded_target_sizes.emplace_back(target_type.get_size());
58  }
59  return padded_target_sizes;
60  }
61 
62  // If here we have a valid result set, so use it's QMD padded widths
63  const auto col_context = rows.getQueryMemDesc().getColSlotContext();
64  for (size_t col_idx = 0; col_idx < target_types.size(); col_idx++) {
65  // Lazy fetch columns will have 0 as a padded with, so use the type's
66  // logical width for those
67  const auto idx = col_context.getSlotsForCol(col_idx).front();
68  const size_t padded_slot_width =
69  static_cast<size_t>(rows.getPaddedSlotWidthBytes(idx));
70  padded_target_sizes.emplace_back(
71  padded_slot_width == 0UL ? target_types[col_idx].get_size() : padded_slot_width);
72  }
73  return padded_target_sizes;
74 }
75 
76 int64_t toBuffer(const TargetValue& col_val, const SQLTypeInfo& type_info, int8_t* buf) {
77  if (type_info.is_array()) {
78  const auto array_col_val = boost::get<ArrayTargetValue>(&col_val);
79  CHECK(array_col_val);
80  const auto& vec = array_col_val->get();
81  int64_t offset = 0;
82  const auto elem_type_info = type_info.get_elem_type();
83  for (const auto& item : vec) {
84  offset += toBuffer(item, elem_type_info, buf + offset);
85  }
86  return offset;
87  } else if (type_info.is_fp()) {
88  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
89  switch (type_info.get_type()) {
90  case kFLOAT: {
91  auto float_p = boost::get<float>(scalar_col_val);
92  *((float*)buf) = static_cast<float>(*float_p);
93  return 4;
94  }
95  case kDOUBLE: {
96  auto double_p = boost::get<double>(scalar_col_val);
97  *((double*)buf) = static_cast<double>(*double_p);
98  return 8;
99  }
100  default:
101  CHECK(false);
102  }
103  } else {
104  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
105  CHECK(scalar_col_val);
106  auto i64_p = boost::get<int64_t>(scalar_col_val);
107  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
108  switch (type_info.get_size()) {
109  case 1:
110  *buf = static_cast<int8_t>(val);
111  return 1;
112  case 2:
113  *((int16_t*)buf) = static_cast<int16_t>(val);
114  return 2;
115  case 4:
116  *((int32_t*)buf) = static_cast<int32_t>(val);
117  return 4;
118  case 8:
119  *((int64_t*)buf) = static_cast<int64_t>(val);
120  return 8;
121  default:
122  UNREACHABLE();
123  }
124  }
125  return 0;
126 }
127 
128 int64_t countNumberOfValues(const ResultSet& rows, const size_t column_idx) {
129  return tbb::parallel_reduce(
130  tbb::blocked_range<int64_t>(0, rows.rowCount()),
131  static_cast<int64_t>(0),
132  [&](tbb::blocked_range<int64_t> r, int64_t running_count) {
133  for (int i = r.begin(); i < r.end(); ++i) {
134  const auto crt_row = rows.getRowAtNoTranslations(i);
135  const auto arr_tv = boost::get<ArrayTargetValue>(&crt_row[column_idx]);
136  CHECK(arr_tv);
137  if (arr_tv->is_initialized()) {
138  const auto& vec = arr_tv->get();
139  running_count += vec.size();
140  }
141  }
142  return running_count;
143  },
144  std::plus<int64_t>());
145 }
146 
147 } // namespace
148 
149 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
150  const ResultSet& rows,
151  const size_t num_columns,
152  const std::vector<SQLTypeInfo>& target_types,
153  const size_t executor_id,
154  const size_t thread_idx,
155  const bool is_parallel_execution_enforced)
156  : column_buffers_(num_columns)
157  , num_rows_(result_set::use_parallel_algorithms(rows) ||
158  rows.isDirectColumnarConversionPossible()
159  ? rows.entryCount()
160  : rows.rowCount())
161  , target_types_(target_types)
162  , parallel_conversion_(is_parallel_execution_enforced
163  ? true
164  : result_set::use_parallel_algorithms(rows))
165  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
166  , thread_idx_(thread_idx)
167  , padded_target_sizes_(get_padded_target_sizes(rows, target_types)) {
168  auto timer = DEBUG_TIMER(__func__);
169  column_buffers_.resize(num_columns);
170  executor_ = Executor::getExecutor(executor_id);
171  CHECK(executor_);
172  CHECK_EQ(padded_target_sizes_.size(), target_types.size());
173  for (size_t i = 0; i < num_columns; ++i) {
174  const auto ti = target_types[i];
175  if (ti.is_array()) {
177  rows.isZeroCopyColumnarConversionPossible(i)) {
178  const int8_t* col_buf = rows.getColumnarBuffer(i);
180  } else {
181  int64_t values_count = countNumberOfValues(rows, i);
182  const int64_t flatbuffer_size =
183  getVarlenArrayBufferSize(num_rows_, values_count, ti);
184  column_buffers_[i] = row_set_mem_owner->allocate(flatbuffer_size, thread_idx_);
186  initializeVarlenArray(m, num_rows_, values_count, ti);
187  }
188  } else {
189  const bool is_varlen =
190  (ti.is_string() && ti.get_compression() == kENCODING_NONE) || ti.is_geometry();
191  if (is_varlen) {
193  }
195  !rows.isZeroCopyColumnarConversionPossible(i)) {
196  column_buffers_[i] =
197  row_set_mem_owner->allocate(num_rows_ * padded_target_sizes_[i], thread_idx_);
198  }
199  }
200  }
201 
202  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
203  materializeAllColumnsDirectly(rows, num_columns);
204  } else {
205  materializeAllColumnsThroughIteration(rows, num_columns);
206  }
207 }
208 
209 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
210  const int8_t* one_col_buffer,
211  const size_t num_rows,
212  const SQLTypeInfo& target_type,
213  const size_t executor_id,
214  const size_t thread_idx)
215  : column_buffers_(1)
216  , num_rows_(num_rows)
217  , target_types_{target_type}
218  , parallel_conversion_(false)
219  , direct_columnar_conversion_(false)
220  , thread_idx_(thread_idx) {
221  auto timer = DEBUG_TIMER(__func__);
222  const bool is_varlen =
223  target_type.is_array() ||
224  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
225  target_type.is_geometry();
226  if (is_varlen) {
228  }
229  executor_ = Executor::getExecutor(executor_id);
230  padded_target_sizes_.emplace_back(target_type.get_size());
231  CHECK(executor_);
232  const auto buf_size = num_rows * target_type.get_size();
233  column_buffers_[0] =
234  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
235  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
236 }
237 
238 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
239  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
240  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
241  if (sub_results.empty()) {
242  return nullptr;
243  }
244  const auto total_row_count = std::accumulate(
245  sub_results.begin(),
246  sub_results.end(),
247  size_t(0),
248  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
249  return init + result->size();
250  });
251  std::unique_ptr<ColumnarResults> merged_results(
252  new ColumnarResults(total_row_count,
253  sub_results[0]->target_types_,
254  sub_results[0]->padded_target_sizes_));
255  const auto col_count = sub_results[0]->column_buffers_.size();
256  const auto nonempty_it = std::find_if(
257  sub_results.begin(),
258  sub_results.end(),
259  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
260  if (nonempty_it == sub_results.end()) {
261  return nullptr;
262  }
263  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
264  const auto byte_width = merged_results->padded_target_sizes_[col_idx];
265  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
266  merged_results->column_buffers_.push_back(write_ptr);
267  for (auto& rs : sub_results) {
268  CHECK_EQ(col_count, rs->column_buffers_.size());
269  if (!rs->size()) {
270  continue;
271  }
272  CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
273  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
274  write_ptr += rs->size() * byte_width;
275  }
276  }
277  return merged_results;
278 }
279 
285  const size_t num_columns) {
286  std::atomic<size_t> row_idx{0};
287  if (isParallelConversion()) {
288  const size_t worker_count = cpu_threads();
289  std::vector<std::future<void>> conversion_threads;
290  std::mutex write_mutex;
291  const auto do_work =
292  [num_columns, &rows, &row_idx, &write_mutex, this](const size_t i) {
293  const auto crt_row = rows.getRowAtNoTranslations(i);
294  if (!crt_row.empty()) {
295  auto cur_row_idx = row_idx.fetch_add(1);
296  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
297  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx, &write_mutex);
298  }
299  }
300  };
301  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
302  conversion_threads.push_back(std::async(
304  [&do_work, this](const size_t start, const size_t end) {
306  size_t local_idx = 0;
307  for (size_t i = start; i < end; ++i, ++local_idx) {
308  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
309  executor_->checkNonKernelTimeInterrupted())) {
311  }
312  do_work(i);
313  }
314  } else {
315  for (size_t i = start; i < end; ++i) {
316  do_work(i);
317  }
318  }
319  },
320  interval.begin,
321  interval.end));
322  }
323 
324  try {
325  for (auto& child : conversion_threads) {
326  child.wait();
327  }
328  } catch (QueryExecutionError& e) {
331  }
332  throw e;
333  } catch (...) {
334  throw;
335  }
336 
337  num_rows_ = row_idx;
338  rows.setCachedRowCount(num_rows_);
339  return;
340  }
341  bool done = false;
342  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
343  const auto crt_row = rows.getNextRow(false, false);
344  if (crt_row.empty()) {
345  done = true;
346  return;
347  }
348  for (size_t i = 0; i < num_columns; ++i) {
349  writeBackCell(crt_row[i], row_idx, i);
350  }
351  ++row_idx;
352  };
354  while (!done) {
355  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
356  executor_->checkNonKernelTimeInterrupted())) {
358  }
359  do_work();
360  }
361  } else {
362  while (!done) {
363  do_work();
364  }
365  }
366 
367  rows.moveToBegin();
368 }
369 
370 /*
371  * This function processes and decodes its input TargetValue
372  * and write it into its corresponding column buffer's cell (with corresponding
373  * row and column indices)
374  *
375  * NOTE: this is not supposed to be processing varlen types (except
376  * FlatBuffer supported varlen types such as Array), and they should
377  * be handled differently outside this function.
378  */
379 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
380  const size_t row_idx,
381  const size_t column_idx,
382  std::mutex* write_mutex) {
383  auto& type_info = target_types_[column_idx];
384  if (type_info.is_array()) {
386  FlatBufferManager m{column_buffers_[column_idx]};
387  const auto arr_tv = boost::get<ArrayTargetValue>(&col_val);
388  CHECK(arr_tv);
389  if (arr_tv->is_initialized()) {
390  const auto& vec = arr_tv->get();
391  auto array_item_size = type_info.get_elem_type().get_size();
392  // setEmptyItem reserves a buffer in FlatBuffer instance
393  // that corresponds to varlen array at row_idx row
394  // index. The pointer value to the corresponding buffer is
395  // stored in buf:
396  int8_t* buf = nullptr;
397  FlatBufferManager::Status status{};
398  {
399  auto lock_scope =
400  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
401  : std::unique_lock<std::mutex>(*write_mutex));
402  status = m.setEmptyItemNoValidation(row_idx, vec.size() * array_item_size, &buf);
403  }
404  CHECK_EQ(status, FlatBufferManager::Status::Success);
405  CHECK(buf);
406  // toBuffer initializes varlen array buffer buf using the
407  // result set row with row_idx row index:
408  toBuffer(col_val, type_info, buf);
409  } else {
410  auto lock_scope =
411  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
412  : std::unique_lock<std::mutex>(*write_mutex));
413  m.setNullNoValidation(row_idx);
414  }
415 
416  } else {
417  int8_t* buf = column_buffers_[column_idx];
418  toBuffer(col_val, type_info, buf + type_info.get_size() * row_idx);
419  }
420 }
421 
427 template <typename DATA_TYPE>
428 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
429  const size_t input_buffer_entry_idx,
430  const size_t output_buffer_entry_idx,
431  const size_t target_idx,
432  const size_t slot_idx,
433  const ReadFunction& read_from_function) {
434  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
435  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
436  target_types_[target_idx]));
437  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
438  val;
439 }
440 
441 template <>
442 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
443  const size_t input_buffer_entry_idx,
444  const size_t output_buffer_entry_idx,
445  const size_t target_idx,
446  const size_t slot_idx,
447  const ReadFunction& read_from_function) {
448  const int32_t ival =
449  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
450  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
451  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
452 }
453 
454 template <>
455 void ColumnarResults::writeBackCellDirect<double>(
456  const ResultSet& rows,
457  const size_t input_buffer_entry_idx,
458  const size_t output_buffer_entry_idx,
459  const size_t target_idx,
460  const size_t slot_idx,
461  const ReadFunction& read_from_function) {
462  const int64_t ival =
463  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
464  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
465  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
466 }
467 
478  const size_t num_columns) {
480  switch (rows.getQueryDescriptionType()) {
482  materializeAllColumnsProjection(rows, num_columns);
483  break;
484  }
486  materializeAllColumnsTableFunction(rows, num_columns);
487  break;
488  }
491  materializeAllColumnsGroupBy(rows, num_columns);
492  break;
493  }
494  default:
495  UNREACHABLE()
496  << "Direct columnar conversion for this query type is not supported yet.";
497  }
498 }
499 
508  const size_t num_columns) {
509  CHECK(rows.query_mem_desc_.didOutputColumnar());
511  (rows.query_mem_desc_.getQueryDescriptionType() ==
513 
514  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
515 
516  // We can directly copy each non-lazy column's content
517  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
518 
519  // Only lazy columns are iterated through first and then materialized
520  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
521 }
522 
524  const size_t num_columns) {
525  CHECK(rows.query_mem_desc_.didOutputColumnar());
527  (rows.query_mem_desc_.getQueryDescriptionType() ==
529 
530  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
531  // Lazy fetching is not currently allowed for table function outputs
532  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
533  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
534  }
535  // We can directly copy each non-lazy column's content
536  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
537 }
538 
539 /*
540  * For all non-lazy columns, we can directly copy back the results of each column's
541  * contents from different storages and put them into the corresponding output buffer.
542  *
543  * This function is parallelized through assigning each column to a CPU thread.
544  */
546  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
547  const ResultSet& rows,
548  const size_t num_columns) {
550  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
551  // Saman: make sure when this lazy_fetch_info is empty
552  if (lazy_fetch_info.empty()) {
553  return true;
554  } else {
555  return !lazy_fetch_info[col_idx].is_lazily_fetched;
556  }
557  };
558 
559  // parallelized by assigning each column to a thread
560  std::vector<std::future<void>> direct_copy_threads;
561  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
562  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
563  CHECK(!column_buffers_[col_idx]);
564  // The name of the method implies a copy but this is not a copy!!
565  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
566  } else if (is_column_non_lazily_fetched(col_idx)) {
567  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
569  direct_copy_threads.push_back(std::async(
571  [&rows, this](const size_t column_index) {
572  const size_t column_size = num_rows_ * padded_target_sizes_[column_index];
573  rows.copyColumnIntoBuffer(
574  column_index, column_buffers_[column_index], column_size);
575  },
576  col_idx));
577  }
578  }
579 
580  for (auto& child : direct_copy_threads) {
581  child.wait();
582  }
583 }
584 
595  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
596  const ResultSet& rows,
597  const size_t num_columns) {
599  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
601  std::mutex write_mutex;
602  const auto do_work_just_lazy_columns = [num_columns, &rows, &write_mutex, this](
603  const size_t row_idx,
604  const std::vector<bool>& targets_to_skip) {
605  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
606  for (size_t i = 0; i < num_columns; ++i) {
607  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
608  writeBackCell(crt_row[i], row_idx, i, &write_mutex);
609  }
610  }
611  };
612 
613  const auto contains_lazy_fetched_column =
614  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
615  for (auto& col_info : lazy_fetch_info) {
616  if (col_info.is_lazily_fetched) {
617  return true;
618  }
619  }
620  return false;
621  };
622 
623  // parallelized by assigning a chunk of rows to each thread)
624  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
625  if (contains_lazy_fetched_column(lazy_fetch_info)) {
626  const size_t worker_count =
628  std::vector<std::future<void>> conversion_threads;
629  std::vector<bool> targets_to_skip;
630  if (skip_non_lazy_columns) {
631  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
632  targets_to_skip.reserve(num_columns);
633  for (size_t i = 0; i < num_columns; i++) {
634  // we process lazy columns (i.e., skip non-lazy columns)
635  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
636  }
637  }
638  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
639  conversion_threads.push_back(std::async(
641  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
642  const size_t end) {
644  size_t local_idx = 0;
645  for (size_t i = start; i < end; ++i, ++local_idx) {
646  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
647  executor_->checkNonKernelTimeInterrupted())) {
649  }
650  do_work_just_lazy_columns(i, targets_to_skip);
651  }
652  } else {
653  for (size_t i = start; i < end; ++i) {
654  do_work_just_lazy_columns(i, targets_to_skip);
655  }
656  }
657  },
658  interval.begin,
659  interval.end));
660  }
661 
662  try {
663  for (auto& child : conversion_threads) {
664  child.wait();
665  }
666  } catch (QueryExecutionError& e) {
669  }
670  throw e;
671  } catch (...) {
672  throw;
673  }
674  }
675 }
676 
684  const size_t num_columns) {
686  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
687  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
688 
689  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
690  const size_t entry_count = rows.entryCount();
691  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
692 
693  // step 1: compute total non-empty elements and store a bitmap per thread
694  std::vector<size_t> non_empty_per_thread(num_threads,
695  0); // number of non-empty entries per thread
696 
697  ColumnBitmap bitmap(size_per_thread, num_threads);
698 
700  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
701 
702  // step 2: go through the generated bitmap and copy/decode corresponding entries
703  // into the output buffer
705  bitmap,
706  non_empty_per_thread,
707  num_columns,
708  entry_count,
709  num_threads,
710  size_per_thread);
711 }
712 
718 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
719  ColumnBitmap& bitmap,
720  std::vector<size_t>& non_empty_per_thread,
721  const size_t entry_count,
722  const size_t num_threads,
723  const size_t size_per_thread) const {
725  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
726  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
727  CHECK_EQ(num_threads, non_empty_per_thread.size());
728  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
729  const size_t local_idx,
730  const size_t entry_idx,
731  const size_t thread_idx) {
732  if (!rows.isRowAtEmpty(entry_idx)) {
733  total_non_empty++;
734  bitmap.set(local_idx, thread_idx, true);
735  }
736  };
737  auto locate_and_count_func =
738  [&do_work, &non_empty_per_thread, this](
739  size_t start_index, size_t end_index, size_t thread_idx) {
740  size_t total_non_empty = 0;
741  size_t local_idx = 0;
743  for (size_t entry_idx = start_index; entry_idx < end_index;
744  entry_idx++, local_idx++) {
745  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
746  executor_->checkNonKernelTimeInterrupted())) {
748  }
749  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
750  }
751  } else {
752  for (size_t entry_idx = start_index; entry_idx < end_index;
753  entry_idx++, local_idx++) {
754  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
755  }
756  }
757  non_empty_per_thread[thread_idx] = total_non_empty;
758  };
759 
760  std::vector<std::future<void>> conversion_threads;
761  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
762  const size_t start_entry = thread_idx * size_per_thread;
763  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
764  conversion_threads.push_back(std::async(
765  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
766  }
767 
768  try {
769  for (auto& child : conversion_threads) {
770  child.wait();
771  }
772  } catch (QueryExecutionError& e) {
775  }
776  throw e;
777  } catch (...) {
778  throw;
779  }
780 }
781 
792  const ResultSet& rows,
793  const ColumnBitmap& bitmap,
794  const std::vector<size_t>& non_empty_per_thread,
795  const size_t num_columns,
796  const size_t entry_count,
797  const size_t num_threads,
798  const size_t size_per_thread) {
800  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
801  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
802  CHECK_EQ(num_threads, non_empty_per_thread.size());
803 
804  // compute the exclusive scan over all non-empty totals
805  std::vector<size_t> global_offsets(num_threads + 1, 0);
806  std::partial_sum(non_empty_per_thread.begin(),
807  non_empty_per_thread.end(),
808  std::next(global_offsets.begin()));
809 
810  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
811  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
812  rows.getSupportedSingleSlotTargetBitmap();
813 
814  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
815  // differently and accessed through result set's iterator
816  if (num_single_slot_targets < num_columns) {
818  bitmap,
819  non_empty_per_thread,
820  global_offsets,
821  single_slot_targets_to_skip,
822  slot_idx_per_target_idx,
823  num_columns,
824  entry_count,
825  num_threads,
826  size_per_thread);
827  } else {
829  bitmap,
830  non_empty_per_thread,
831  global_offsets,
832  slot_idx_per_target_idx,
833  num_columns,
834  entry_count,
835  num_threads,
836  size_per_thread);
837  }
838 }
839 
847  const ResultSet& rows,
848  const ColumnBitmap& bitmap,
849  const std::vector<size_t>& non_empty_per_thread,
850  const std::vector<size_t>& global_offsets,
851  const std::vector<bool>& targets_to_skip,
852  const std::vector<size_t>& slot_idx_per_target_idx,
853  const size_t num_columns,
854  const size_t entry_count,
855  const size_t num_threads,
856  const size_t size_per_thread) {
858  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
859  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
860 
861  const auto [write_functions, read_functions] =
862  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
863  CHECK_EQ(write_functions.size(), num_columns);
864  CHECK_EQ(read_functions.size(), num_columns);
865  std::mutex write_mutex;
866  auto do_work = [this,
867  &bitmap,
868  &rows,
869  &slot_idx_per_target_idx,
870  &global_offsets,
871  &targets_to_skip,
872  &num_columns,
873  &write_mutex,
874  &write_functions = write_functions,
875  &read_functions = read_functions](size_t& non_empty_idx,
876  const size_t total_non_empty,
877  const size_t local_idx,
878  size_t& entry_idx,
879  const size_t thread_idx,
880  const size_t end_idx) {
881  if (non_empty_idx >= total_non_empty) {
882  // all non-empty entries has been written back
883  entry_idx = end_idx;
884  }
885  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
886  if (bitmap.get(local_idx, thread_idx)) {
887  // targets that are recovered from the result set iterators:
888  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
889  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
890  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
892  crt_row[column_idx], output_buffer_row_idx, column_idx, &write_mutex);
893  }
894  }
895  // targets that are copied directly without any translation/decoding from
896  // result set
897  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
898  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
899  continue;
900  }
901  write_functions[column_idx](rows,
902  entry_idx,
903  output_buffer_row_idx,
904  column_idx,
905  slot_idx_per_target_idx[column_idx],
906  read_functions[column_idx]);
907  }
908  non_empty_idx++;
909  }
910  };
911 
912  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
913  const size_t start_index,
914  const size_t end_index,
915  const size_t thread_idx) {
916  const size_t total_non_empty = non_empty_per_thread[thread_idx];
917  size_t non_empty_idx = 0;
918  size_t local_idx = 0;
920  for (size_t entry_idx = start_index; entry_idx < end_index;
921  entry_idx++, local_idx++) {
922  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
923  executor_->checkNonKernelTimeInterrupted())) {
925  }
926  do_work(
927  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
928  }
929  } else {
930  for (size_t entry_idx = start_index; entry_idx < end_index;
931  entry_idx++, local_idx++) {
932  do_work(
933  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
934  }
935  }
936  };
937 
938  std::vector<std::future<void>> compaction_threads;
939  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
940  const size_t start_entry = thread_idx * size_per_thread;
941  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
942  compaction_threads.push_back(std::async(
943  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
944  }
945 
946  try {
947  for (auto& child : compaction_threads) {
948  child.wait();
949  }
950  } catch (QueryExecutionError& e) {
953  }
954  throw e;
955  } catch (...) {
956  throw;
957  }
958 }
959 
967  const ResultSet& rows,
968  const ColumnBitmap& bitmap,
969  const std::vector<size_t>& non_empty_per_thread,
970  const std::vector<size_t>& global_offsets,
971  const std::vector<size_t>& slot_idx_per_target_idx,
972  const size_t num_columns,
973  const size_t entry_count,
974  const size_t num_threads,
975  const size_t size_per_thread) {
977  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
978  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
979 
980  const auto [write_functions, read_functions] =
981  initAllConversionFunctions(rows, slot_idx_per_target_idx);
982  CHECK_EQ(write_functions.size(), num_columns);
983  CHECK_EQ(read_functions.size(), num_columns);
984  auto do_work = [&rows,
985  &bitmap,
986  &global_offsets,
987  &num_columns,
988  &slot_idx_per_target_idx,
989  &write_functions = write_functions,
990  &read_functions = read_functions](size_t& entry_idx,
991  size_t& non_empty_idx,
992  const size_t total_non_empty,
993  const size_t local_idx,
994  const size_t thread_idx,
995  const size_t end_idx) {
996  if (non_empty_idx >= total_non_empty) {
997  // all non-empty entries has been written back
998  entry_idx = end_idx;
999  return;
1000  }
1001  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1002  if (bitmap.get(local_idx, thread_idx)) {
1003  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1004  write_functions[column_idx](rows,
1005  entry_idx,
1006  output_buffer_row_idx,
1007  column_idx,
1008  slot_idx_per_target_idx[column_idx],
1009  read_functions[column_idx]);
1010  }
1011  non_empty_idx++;
1012  }
1013  };
1014  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1015  const size_t start_index,
1016  const size_t end_index,
1017  const size_t thread_idx) {
1018  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1019  size_t non_empty_idx = 0;
1020  size_t local_idx = 0;
1022  for (size_t entry_idx = start_index; entry_idx < end_index;
1023  entry_idx++, local_idx++) {
1024  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1025  executor_->checkNonKernelTimeInterrupted())) {
1027  }
1028  do_work(
1029  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1030  }
1031  } else {
1032  for (size_t entry_idx = start_index; entry_idx < end_index;
1033  entry_idx++, local_idx++) {
1034  do_work(
1035  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1036  }
1037  }
1038  };
1039 
1040  std::vector<std::future<void>> compaction_threads;
1041  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1042  const size_t start_entry = thread_idx * size_per_thread;
1043  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1044  compaction_threads.push_back(std::async(
1045  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1046  }
1047 
1048  try {
1049  for (auto& child : compaction_threads) {
1050  child.wait();
1051  }
1052  } catch (QueryExecutionError& e) {
1055  }
1056  throw e;
1057  } catch (...) {
1058  throw;
1059  }
1060 }
1061 
1067 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
1068  const ResultSet& rows,
1069  const std::vector<bool>& targets_to_skip) {
1071  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1072  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1073 
1074  std::vector<WriteFunction> result;
1075  result.reserve(target_types_.size());
1076 
1077  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1078  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1079  result.emplace_back([](const ResultSet& rows,
1080  const size_t input_buffer_entry_idx,
1081  const size_t output_buffer_entry_idx,
1082  const size_t target_idx,
1083  const size_t slot_idx,
1084  const ReadFunction& read_function) {
1085  UNREACHABLE() << "Invalid write back function used.";
1086  });
1087  continue;
1088  }
1089 
1090  if (target_types_[target_idx].is_fp()) {
1091  switch (target_types_[target_idx].get_size()) {
1092  case 8:
1093  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1094  this,
1095  std::placeholders::_1,
1096  std::placeholders::_2,
1097  std::placeholders::_3,
1098  std::placeholders::_4,
1099  std::placeholders::_5,
1100  std::placeholders::_6));
1101  break;
1102  case 4:
1103  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1104  this,
1105  std::placeholders::_1,
1106  std::placeholders::_2,
1107  std::placeholders::_3,
1108  std::placeholders::_4,
1109  std::placeholders::_5,
1110  std::placeholders::_6));
1111  break;
1112  default:
1113  UNREACHABLE() << "Invalid target type encountered.";
1114  break;
1115  }
1116  } else {
1117  switch (target_types_[target_idx].get_size()) {
1118  case 8:
1119  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1120  this,
1121  std::placeholders::_1,
1122  std::placeholders::_2,
1123  std::placeholders::_3,
1124  std::placeholders::_4,
1125  std::placeholders::_5,
1126  std::placeholders::_6));
1127  break;
1128  case 4:
1129  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1130  this,
1131  std::placeholders::_1,
1132  std::placeholders::_2,
1133  std::placeholders::_3,
1134  std::placeholders::_4,
1135  std::placeholders::_5,
1136  std::placeholders::_6));
1137  break;
1138  case 2:
1139  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1140  this,
1141  std::placeholders::_1,
1142  std::placeholders::_2,
1143  std::placeholders::_3,
1144  std::placeholders::_4,
1145  std::placeholders::_5,
1146  std::placeholders::_6));
1147  break;
1148  case 1:
1149  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1150  this,
1151  std::placeholders::_1,
1152  std::placeholders::_2,
1153  std::placeholders::_3,
1154  std::placeholders::_4,
1155  std::placeholders::_5,
1156  std::placeholders::_6));
1157  break;
1158  default:
1159  UNREACHABLE() << "Invalid target type encountered.";
1160  break;
1161  }
1162  }
1163  }
1164  return result;
1165 }
1166 
1167 namespace {
1168 
1169 int64_t invalid_read_func(const ResultSet& rows,
1170  const size_t input_buffer_entry_idx,
1171  const size_t target_idx,
1172  const size_t slot_idx) {
1173  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
1174  return static_cast<int64_t>(0);
1175 }
1176 
1177 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1178 int64_t read_float_key_baseline(const ResultSet& rows,
1179  const size_t input_buffer_entry_idx,
1180  const size_t target_idx,
1181  const size_t slot_idx) {
1182  // float keys in baseline hash are written as doubles in the buffer, so
1183  // the result should properly be casted before being written in the output
1184  // columns
1185  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1186  input_buffer_entry_idx, target_idx, slot_idx));
1187  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1188 }
1189 
1190 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1191 int64_t read_int64_func(const ResultSet& rows,
1192  const size_t input_buffer_entry_idx,
1193  const size_t target_idx,
1194  const size_t slot_idx) {
1195  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1196  input_buffer_entry_idx, target_idx, slot_idx);
1197 }
1198 
1199 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1200 int64_t read_int32_func(const ResultSet& rows,
1201  const size_t input_buffer_entry_idx,
1202  const size_t target_idx,
1203  const size_t slot_idx) {
1204  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1205  input_buffer_entry_idx, target_idx, slot_idx);
1206 }
1207 
1208 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1209 int64_t read_int16_func(const ResultSet& rows,
1210  const size_t input_buffer_entry_idx,
1211  const size_t target_idx,
1212  const size_t slot_idx) {
1213  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1214  input_buffer_entry_idx, target_idx, slot_idx);
1215 }
1216 
1217 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1218 int64_t read_int8_func(const ResultSet& rows,
1219  const size_t input_buffer_entry_idx,
1220  const size_t target_idx,
1221  const size_t slot_idx) {
1222  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1223  input_buffer_entry_idx, target_idx, slot_idx);
1224 }
1225 
1226 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1227 int64_t read_float_func(const ResultSet& rows,
1228  const size_t input_buffer_entry_idx,
1229  const size_t target_idx,
1230  const size_t slot_idx) {
1231  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1232  input_buffer_entry_idx, target_idx, slot_idx);
1233  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
1234 }
1235 
1236 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1237 int64_t read_double_func(const ResultSet& rows,
1238  const size_t input_buffer_entry_idx,
1239  const size_t target_idx,
1240  const size_t slot_idx) {
1241  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1242  input_buffer_entry_idx, target_idx, slot_idx);
1243  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
1244 }
1245 
1246 } // namespace
1247 
1254 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
1255 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
1256  const ResultSet& rows,
1257  const std::vector<size_t>& slot_idx_per_target_idx,
1258  const std::vector<bool>& targets_to_skip) {
1260  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1261  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1262 
1263  std::vector<ReadFunction> read_functions;
1264  read_functions.reserve(target_types_.size());
1265 
1266  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1267  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1268  // for targets that should be skipped, we use a placeholder function that should
1269  // never be called. The CHECKs inside it make sure that never happens.
1270  read_functions.emplace_back(invalid_read_func);
1271  continue;
1272  }
1273 
1274  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1275  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1276  // for key columns only
1277  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1278  if (target_types_[target_idx].is_fp()) {
1279  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1280  switch (target_types_[target_idx].get_type()) {
1281  case kFLOAT:
1282  read_functions.emplace_back(
1283  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1284  break;
1285  case kDOUBLE:
1286  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1287  break;
1288  default:
1289  UNREACHABLE()
1290  << "Invalid data type encountered (BaselineHash, floating point key).";
1291  break;
1292  }
1293  } else {
1294  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1295  case 8:
1296  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1297  break;
1298  case 4:
1299  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1300  break;
1301  default:
1302  UNREACHABLE()
1303  << "Invalid data type encountered (BaselineHash, integer key).";
1304  }
1305  }
1306  continue;
1307  }
1308  }
1309  if (target_types_[target_idx].is_fp()) {
1310  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1311  case 8:
1312  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1313  break;
1314  case 4:
1315  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1316  break;
1317  default:
1318  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1319  break;
1320  }
1321  } else {
1322  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1323  case 8:
1324  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1325  break;
1326  case 4:
1327  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1328  break;
1329  case 2:
1330  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1331  break;
1332  case 1:
1333  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1334  break;
1335  default:
1336  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1337  break;
1338  }
1339  }
1340  }
1341  return read_functions;
1342 }
1343 
1351 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1352  std::vector<ColumnarResults::ReadFunction>>
1354  const ResultSet& rows,
1355  const std::vector<size_t>& slot_idx_per_target_idx,
1356  const std::vector<bool>& targets_to_skip) {
1358  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1359  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1360 
1361  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1362  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1363  if (rows.didOutputColumnar()) {
1364  return std::make_tuple(
1365  std::move(write_functions),
1366  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1367  rows, slot_idx_per_target_idx, targets_to_skip));
1368  } else {
1369  return std::make_tuple(
1370  std::move(write_functions),
1371  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1372  rows, slot_idx_per_target_idx, targets_to_skip));
1373  }
1374  } else {
1375  if (rows.didOutputColumnar()) {
1376  return std::make_tuple(
1377  std::move(write_functions),
1378  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1379  rows, slot_idx_per_target_idx, targets_to_skip));
1380  } else {
1381  return std::make_tuple(
1382  std::move(write_functions),
1383  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1384  rows, slot_idx_per_target_idx, targets_to_skip));
1385  }
1386  }
1387 }
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1436
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
bool is_fp() const
Definition: sqltypes.h:584
int64_t countNumberOfValues(const ResultSet &rows, const size_t column_idx)
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:337
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1235
Constants for Builtin SQL Types supported by HEAVY.AI.
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:475
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1577
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:360
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
executor_(executor)
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
void initializeVarlenArray(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1481
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:389
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
int64_t toBuffer(const TargetValue &col_val, const SQLTypeInfo &type_info, int8_t *buf)
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
int64_t getVarlenArrayBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1461
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:195
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:186
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:963
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
bool is_array() const
Definition: sqltypes.h:588
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)