OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "Shared/Intervals.h"
20 #include "Shared/thread_count.h"
21 
22 #include <atomic>
23 #include <future>
24 #include <numeric>
25 
26 namespace {
27 
28 inline int64_t fixed_encoding_nullable_val(const int64_t val,
29  const SQLTypeInfo& type_info) {
30  if (type_info.get_compression() != kENCODING_NONE) {
31  CHECK(type_info.get_compression() == kENCODING_FIXED ||
32  type_info.get_compression() == kENCODING_DICT);
33  auto logical_ti = get_logical_type_info(type_info);
34  if (val == inline_int_null_val(logical_ti)) {
35  return inline_fixed_encoding_null_val(type_info);
36  }
37  }
38  return val;
39 }
40 
41 } // namespace
42 
43 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
44  const ResultSet& rows,
45  const size_t num_columns,
46  const std::vector<SQLTypeInfo>& target_types,
47  const bool is_parallel_execution_enforced)
48  : column_buffers_(num_columns)
49  , num_rows_(result_set::use_parallel_algorithms(rows) ||
50  rows.isDirectColumnarConversionPossible()
51  ? rows.entryCount()
52  : rows.rowCount())
53  , target_types_(target_types)
54  , parallel_conversion_(is_parallel_execution_enforced
55  ? true
56  : result_set::use_parallel_algorithms(rows))
57  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
58  auto timer = DEBUG_TIMER(__func__);
59  column_buffers_.resize(num_columns);
60  for (size_t i = 0; i < num_columns; ++i) {
61  const bool is_varlen = target_types[i].is_array() ||
62  (target_types[i].is_string() &&
63  target_types[i].get_compression() == kENCODING_NONE) ||
64  target_types[i].is_geometry();
65  if (is_varlen) {
67  }
69  !rows.isZeroCopyColumnarConversionPossible(i)) {
70  column_buffers_[i] =
71  row_set_mem_owner->allocate(num_rows_ * target_types[i].get_size());
72  }
73  }
74 
75  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
76  materializeAllColumnsDirectly(rows, num_columns);
77  } else {
78  materializeAllColumnsThroughIteration(rows, num_columns);
79  }
80 }
81 
82 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
83  const int8_t* one_col_buffer,
84  const size_t num_rows,
85  const SQLTypeInfo& target_type)
86  : column_buffers_(1)
87  , num_rows_(num_rows)
88  , target_types_{target_type}
89  , parallel_conversion_(false)
90  , direct_columnar_conversion_(false) {
91  auto timer = DEBUG_TIMER(__func__);
92  const bool is_varlen =
93  target_type.is_array() ||
94  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
95  target_type.is_geometry();
96  if (is_varlen) {
98  }
99  const auto buf_size = num_rows * target_type.get_size();
100  column_buffers_[0] = reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size));
101  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
102 }
103 
104 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
105  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
106  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
107  if (sub_results.empty()) {
108  return nullptr;
109  }
110  const auto total_row_count = std::accumulate(
111  sub_results.begin(),
112  sub_results.end(),
113  size_t(0),
114  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
115  return init + result->size();
116  });
117  std::unique_ptr<ColumnarResults> merged_results(
118  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
119  const auto col_count = sub_results[0]->column_buffers_.size();
120  const auto nonempty_it = std::find_if(
121  sub_results.begin(),
122  sub_results.end(),
123  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
124  if (nonempty_it == sub_results.end()) {
125  return nullptr;
126  }
127  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
128  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
129  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
130  merged_results->column_buffers_.push_back(write_ptr);
131  for (auto& rs : sub_results) {
132  CHECK_EQ(col_count, rs->column_buffers_.size());
133  if (!rs->size()) {
134  continue;
135  }
136  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
137  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
138  write_ptr += rs->size() * byte_width;
139  }
140  }
141  return merged_results;
142 }
143 
149  const size_t num_columns) {
150  std::atomic<size_t> row_idx{0};
151  const auto do_work = [num_columns, this](const std::vector<TargetValue>& crt_row,
152  const size_t row_idx) {
153  for (size_t i = 0; i < num_columns; ++i) {
154  writeBackCell(crt_row[i], row_idx, i);
155  }
156  };
157  if (isParallelConversion()) {
158  const size_t worker_count = cpu_threads();
159  std::vector<std::future<void>> conversion_threads;
160  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
161  conversion_threads.push_back(std::async(
162  std::launch::async,
163  [&rows, &do_work, &row_idx](const size_t start, const size_t end) {
164  for (size_t i = start; i < end; ++i) {
165  const auto crt_row = rows.getRowAtNoTranslations(i);
166  if (!crt_row.empty()) {
167  do_work(crt_row, row_idx.fetch_add(1));
168  }
169  }
170  },
171  interval.begin,
172  interval.end));
173  }
174  for (auto& child : conversion_threads) {
175  child.wait();
176  }
177 
178  num_rows_ = row_idx;
179  rows.setCachedRowCount(num_rows_);
180  return;
181  }
182  while (true) {
183  const auto crt_row = rows.getNextRow(false, false);
184  if (crt_row.empty()) {
185  break;
186  }
187  do_work(crt_row, row_idx);
188  ++row_idx;
189  }
190  rows.moveToBegin();
191 }
192 
193 /*
194  * This function processes and decodes its input TargetValue
195  * and write it into its corresponding column buffer's cell (with corresponding
196  * row and column indices)
197  *
198  * NOTE: this is not supposed to be processing varlen types, and they should be
199  * handled differently outside this function.
200  */
201 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
202  const size_t row_idx,
203  const size_t column_idx) {
204  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
205  CHECK(scalar_col_val);
206  auto i64_p = boost::get<int64_t>(scalar_col_val);
207  const auto& type_info = target_types_[column_idx];
208  if (i64_p) {
209  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
210  switch (target_types_[column_idx].get_size()) {
211  case 1:
212  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
213  break;
214  case 2:
215  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
216  break;
217  case 4:
218  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
219  break;
220  case 8:
221  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
222  break;
223  default:
224  CHECK(false);
225  }
226  } else {
227  CHECK(target_types_[column_idx].is_fp());
228  switch (target_types_[column_idx].get_type()) {
229  case kFLOAT: {
230  auto float_p = boost::get<float>(scalar_col_val);
231  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
232  break;
233  }
234  case kDOUBLE: {
235  auto double_p = boost::get<double>(scalar_col_val);
236  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
237  break;
238  }
239  default:
240  CHECK(false);
241  }
242  }
243 }
244 
250 template <typename DATA_TYPE>
251 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
252  const size_t input_buffer_entry_idx,
253  const size_t output_buffer_entry_idx,
254  const size_t target_idx,
255  const size_t slot_idx,
256  const ReadFunction& read_from_function) {
257  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
258  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
259  target_types_[target_idx]));
260  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
261  val;
262 }
263 
264 template <>
265 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
266  const size_t input_buffer_entry_idx,
267  const size_t output_buffer_entry_idx,
268  const size_t target_idx,
269  const size_t slot_idx,
270  const ReadFunction& read_from_function) {
271  const int32_t ival =
272  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
273  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
274  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
275 }
276 
277 template <>
278 void ColumnarResults::writeBackCellDirect<double>(
279  const ResultSet& rows,
280  const size_t input_buffer_entry_idx,
281  const size_t output_buffer_entry_idx,
282  const size_t target_idx,
283  const size_t slot_idx,
284  const ReadFunction& read_from_function) {
285  const int64_t ival =
286  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
287  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
288  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
289 }
290 
301  const size_t num_columns) {
303  switch (rows.getQueryDescriptionType()) {
305  materializeAllColumnsProjection(rows, num_columns);
306  break;
307  }
310  materializeAllColumnsGroupBy(rows, num_columns);
311  break;
312  }
313  default:
314  UNREACHABLE()
315  << "Direct columnar conversion for this query type is not supported yet.";
316  }
317 }
318 
327  const size_t num_columns) {
328  CHECK(rows.query_mem_desc_.didOutputColumnar());
330  rows.query_mem_desc_.getQueryDescriptionType() ==
332 
333  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
334 
335  // We can directly copy each non-lazy column's content
336  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
337 
338  // Only lazy columns are iterated through first and then materialized
339  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
340 }
341 
342 /*
343  * For all non-lazy columns, we can directly copy back the results of each column's
344  * contents from different storages and put them into the corresponding output buffer.
345  *
346  * This function is parallelized through assigning each column to a CPU thread.
347  */
349  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
350  const ResultSet& rows,
351  const size_t num_columns) {
353  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
354  // Saman: make sure when this lazy_fetch_info is empty
355  if (lazy_fetch_info.empty()) {
356  return true;
357  } else {
358  return !lazy_fetch_info[col_idx].is_lazily_fetched;
359  }
360  };
361 
362  // parallelized by assigning each column to a thread
363  std::vector<std::future<void>> direct_copy_threads;
364  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
365  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
366  CHECK(!column_buffers_[col_idx]);
367  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
368  } else if (is_column_non_lazily_fetched(col_idx)) {
369  direct_copy_threads.push_back(std::async(
370  std::launch::async,
371  [&rows, this](const size_t column_index) {
372  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
373  rows.copyColumnIntoBuffer(
374  column_index, column_buffers_[column_index], column_size);
375  },
376  col_idx));
377  }
378  }
379 
380  for (auto& child : direct_copy_threads) {
381  child.wait();
382  }
383 }
384 
395  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
396  const ResultSet& rows,
397  const size_t num_columns) {
399  const auto do_work_just_lazy_columns = [num_columns, this](
400  const std::vector<TargetValue>& crt_row,
401  const size_t row_idx,
402  const std::vector<bool>& targets_to_skip) {
403  for (size_t i = 0; i < num_columns; ++i) {
404  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
405  writeBackCell(crt_row[i], row_idx, i);
406  }
407  }
408  };
409 
410  const auto contains_lazy_fetched_column =
411  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
412  for (auto& col_info : lazy_fetch_info) {
413  if (col_info.is_lazily_fetched) {
414  return true;
415  }
416  }
417  return false;
418  };
419 
420  // parallelized by assigning a chunk of rows to each thread)
421  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
422  if (contains_lazy_fetched_column(lazy_fetch_info)) {
423  const size_t worker_count =
425  std::vector<std::future<void>> conversion_threads;
426  std::vector<bool> targets_to_skip;
427  if (skip_non_lazy_columns) {
428  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
429  targets_to_skip.reserve(num_columns);
430  for (size_t i = 0; i < num_columns; i++) {
431  // we process lazy columns (i.e., skip non-lazy columns)
432  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
433  }
434  }
435  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
436  conversion_threads.push_back(std::async(
437  std::launch::async,
438  [&rows, &do_work_just_lazy_columns, &targets_to_skip](const size_t start,
439  const size_t end) {
440  for (size_t i = start; i < end; ++i) {
441  const auto crt_row = rows.getRowAtNoTranslations(i, targets_to_skip);
442  do_work_just_lazy_columns(crt_row, i, targets_to_skip);
443  }
444  },
445  interval.begin,
446  interval.end));
447  }
448 
449  for (auto& child : conversion_threads) {
450  child.wait();
451  }
452  }
453 }
454 
462  const size_t num_columns) {
464  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
465  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
466 
467  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
468  const size_t entry_count = rows.entryCount();
469  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
470 
471  // step 1: compute total non-empty elements and store a bitmap per thread
472  std::vector<size_t> non_empty_per_thread(num_threads,
473  0); // number of non-empty entries per thread
474 
475  ColumnBitmap bitmap(size_per_thread, num_threads);
476 
478  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
479 
480  // step 2: go through the generated bitmap and copy/decode corresponding entries
481  // into the output buffer
483  bitmap,
484  non_empty_per_thread,
485  num_columns,
486  entry_count,
487  num_threads,
488  size_per_thread);
489 }
490 
496 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
497  ColumnBitmap& bitmap,
498  std::vector<size_t>& non_empty_per_thread,
499  const size_t entry_count,
500  const size_t num_threads,
501  const size_t size_per_thread) const {
503  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
504  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
505  CHECK_EQ(num_threads, non_empty_per_thread.size());
506  auto locate_and_count_func =
507  [&rows, &bitmap, &non_empty_per_thread](
508  size_t start_index, size_t end_index, size_t thread_idx) {
509  size_t total_non_empty = 0;
510  size_t local_idx = 0;
511  for (size_t entry_idx = start_index; entry_idx < end_index;
512  entry_idx++, local_idx++) {
513  if (!rows.isRowAtEmpty(entry_idx)) {
514  total_non_empty++;
515  bitmap.set(local_idx, thread_idx, true);
516  }
517  }
518  non_empty_per_thread[thread_idx] = total_non_empty;
519  };
520 
521  std::vector<std::future<void>> conversion_threads;
522  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
523  const size_t start_entry = thread_idx * size_per_thread;
524  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
525  conversion_threads.push_back(std::async(
526  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
527  }
528 
529  for (auto& child : conversion_threads) {
530  child.wait();
531  }
532 }
533 
544  const ResultSet& rows,
545  const ColumnBitmap& bitmap,
546  const std::vector<size_t>& non_empty_per_thread,
547  const size_t num_columns,
548  const size_t entry_count,
549  const size_t num_threads,
550  const size_t size_per_thread) {
552  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
553  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
554  CHECK_EQ(num_threads, non_empty_per_thread.size());
555 
556  // compute the exclusive scan over all non-empty totals
557  std::vector<size_t> global_offsets(num_threads + 1, 0);
558  std::partial_sum(non_empty_per_thread.begin(),
559  non_empty_per_thread.end(),
560  std::next(global_offsets.begin()));
561 
562  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
563  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
564  rows.getSupportedSingleSlotTargetBitmap();
565 
566  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
567  // differently and accessed through result set's iterator
568  if (num_single_slot_targets < num_columns) {
570  bitmap,
571  non_empty_per_thread,
572  global_offsets,
573  single_slot_targets_to_skip,
574  slot_idx_per_target_idx,
575  num_columns,
576  entry_count,
577  num_threads,
578  size_per_thread);
579  } else {
581  bitmap,
582  non_empty_per_thread,
583  global_offsets,
584  slot_idx_per_target_idx,
585  num_columns,
586  entry_count,
587  num_threads,
588  size_per_thread);
589  }
590 }
591 
599  const ResultSet& rows,
600  const ColumnBitmap& bitmap,
601  const std::vector<size_t>& non_empty_per_thread,
602  const std::vector<size_t>& global_offsets,
603  const std::vector<bool>& targets_to_skip,
604  const std::vector<size_t>& slot_idx_per_target_idx,
605  const size_t num_columns,
606  const size_t entry_count,
607  const size_t num_threads,
608  const size_t size_per_thread) {
610  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
611  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
612 
613  const auto [write_functions, read_functions] =
614  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
615  CHECK_EQ(write_functions.size(), num_columns);
616  CHECK_EQ(read_functions.size(), num_columns);
617 
618  auto compact_buffer_func = [this,
619  &rows,
620  &bitmap,
621  &global_offsets,
622  &non_empty_per_thread,
623  &num_columns,
624  &targets_to_skip,
625  &slot_idx_per_target_idx,
626  &write_functions = write_functions,
627  &read_functions = read_functions](const size_t start_index,
628  const size_t end_index,
629  const size_t thread_idx) {
630  const size_t total_non_empty = non_empty_per_thread[thread_idx];
631  size_t non_empty_idx = 0;
632  size_t local_idx = 0;
633  for (size_t entry_idx = start_index; entry_idx < end_index;
634  entry_idx++, local_idx++) {
635  if (non_empty_idx >= total_non_empty) {
636  // all non-empty entries has been written back
637  break;
638  }
639  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
640  if (bitmap.get(local_idx, thread_idx)) {
641  // targets that are recovered from the result set iterators:
642  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
643  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
644  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
645  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
646  }
647  }
648  // targets that are copied directly without any translation/decoding from
649  // result set
650  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
651  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
652  continue;
653  }
654  write_functions[column_idx](rows,
655  entry_idx,
656  output_buffer_row_idx,
657  column_idx,
658  slot_idx_per_target_idx[column_idx],
659  read_functions[column_idx]);
660  }
661  non_empty_idx++;
662  } else {
663  continue;
664  }
665  }
666  };
667 
668  std::vector<std::future<void>> compaction_threads;
669  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
670  const size_t start_entry = thread_idx * size_per_thread;
671  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
672  compaction_threads.push_back(std::async(
673  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
674  }
675 
676  for (auto& child : compaction_threads) {
677  child.wait();
678  }
679 }
680 
688  const ResultSet& rows,
689  const ColumnBitmap& bitmap,
690  const std::vector<size_t>& non_empty_per_thread,
691  const std::vector<size_t>& global_offsets,
692  const std::vector<size_t>& slot_idx_per_target_idx,
693  const size_t num_columns,
694  const size_t entry_count,
695  const size_t num_threads,
696  const size_t size_per_thread) {
698  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
699  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
700 
701  const auto [write_functions, read_functions] =
702  initAllConversionFunctions(rows, slot_idx_per_target_idx);
703  CHECK_EQ(write_functions.size(), num_columns);
704  CHECK_EQ(read_functions.size(), num_columns);
705 
706  auto compact_buffer_func = [&rows,
707  &bitmap,
708  &global_offsets,
709  &non_empty_per_thread,
710  &num_columns,
711  &slot_idx_per_target_idx,
712  &write_functions = write_functions,
713  &read_functions = read_functions](const size_t start_index,
714  const size_t end_index,
715  const size_t thread_idx) {
716  const size_t total_non_empty = non_empty_per_thread[thread_idx];
717  size_t non_empty_idx = 0;
718  size_t local_idx = 0;
719  for (size_t entry_idx = start_index; entry_idx < end_index;
720  entry_idx++, local_idx++) {
721  if (non_empty_idx >= total_non_empty) {
722  // all non-empty entries has been written back
723  break;
724  }
725  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
726  if (bitmap.get(local_idx, thread_idx)) {
727  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
728  write_functions[column_idx](rows,
729  entry_idx,
730  output_buffer_row_idx,
731  column_idx,
732  slot_idx_per_target_idx[column_idx],
733  read_functions[column_idx]);
734  }
735  non_empty_idx++;
736  } else {
737  continue;
738  }
739  }
740  };
741 
742  std::vector<std::future<void>> compaction_threads;
743  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
744  const size_t start_entry = thread_idx * size_per_thread;
745  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
746  compaction_threads.push_back(std::async(
747  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
748  }
749 
750  for (auto& child : compaction_threads) {
751  child.wait();
752  }
753 }
754 
760 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
761  const ResultSet& rows,
762  const std::vector<bool>& targets_to_skip) {
764  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
765  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
766 
767  std::vector<WriteFunction> result;
768  result.reserve(target_types_.size());
769 
770  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
771  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
772  result.emplace_back([](const ResultSet& rows,
773  const size_t input_buffer_entry_idx,
774  const size_t output_buffer_entry_idx,
775  const size_t target_idx,
776  const size_t slot_idx,
777  const ReadFunction& read_function) {
778  UNREACHABLE() << "Invalid write back function used.";
779  });
780  continue;
781  }
782 
783  if (target_types_[target_idx].is_fp()) {
784  switch (target_types_[target_idx].get_size()) {
785  case 8:
786  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
787  this,
788  std::placeholders::_1,
789  std::placeholders::_2,
790  std::placeholders::_3,
791  std::placeholders::_4,
792  std::placeholders::_5,
793  std::placeholders::_6));
794  break;
795  case 4:
796  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
797  this,
798  std::placeholders::_1,
799  std::placeholders::_2,
800  std::placeholders::_3,
801  std::placeholders::_4,
802  std::placeholders::_5,
803  std::placeholders::_6));
804  break;
805  default:
806  UNREACHABLE() << "Invalid target type encountered.";
807  break;
808  }
809  } else {
810  switch (target_types_[target_idx].get_size()) {
811  case 8:
812  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
813  this,
814  std::placeholders::_1,
815  std::placeholders::_2,
816  std::placeholders::_3,
817  std::placeholders::_4,
818  std::placeholders::_5,
819  std::placeholders::_6));
820  break;
821  case 4:
822  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
823  this,
824  std::placeholders::_1,
825  std::placeholders::_2,
826  std::placeholders::_3,
827  std::placeholders::_4,
828  std::placeholders::_5,
829  std::placeholders::_6));
830  break;
831  case 2:
832  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
833  this,
834  std::placeholders::_1,
835  std::placeholders::_2,
836  std::placeholders::_3,
837  std::placeholders::_4,
838  std::placeholders::_5,
839  std::placeholders::_6));
840  break;
841  case 1:
842  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
843  this,
844  std::placeholders::_1,
845  std::placeholders::_2,
846  std::placeholders::_3,
847  std::placeholders::_4,
848  std::placeholders::_5,
849  std::placeholders::_6));
850  break;
851  default:
852  UNREACHABLE() << "Invalid target type encountered.";
853  break;
854  }
855  }
856  }
857  return result;
858 }
859 
860 namespace {
861 
862 int64_t invalid_read_func(const ResultSet& rows,
863  const size_t input_buffer_entry_idx,
864  const size_t target_idx,
865  const size_t slot_idx) {
866  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
867  return static_cast<int64_t>(0);
868 }
869 
870 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
871 int64_t read_float_key_baseline(const ResultSet& rows,
872  const size_t input_buffer_entry_idx,
873  const size_t target_idx,
874  const size_t slot_idx) {
875  // float keys in baseline hash are written as doubles in the buffer, so
876  // the result should properly be casted before being written in the output
877  // columns
878  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
879  input_buffer_entry_idx, target_idx, slot_idx));
880  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
881 }
882 
883 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
884 int64_t read_int64_func(const ResultSet& rows,
885  const size_t input_buffer_entry_idx,
886  const size_t target_idx,
887  const size_t slot_idx) {
888  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
889  input_buffer_entry_idx, target_idx, slot_idx);
890 }
891 
892 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
893 int64_t read_int32_func(const ResultSet& rows,
894  const size_t input_buffer_entry_idx,
895  const size_t target_idx,
896  const size_t slot_idx) {
897  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
898  input_buffer_entry_idx, target_idx, slot_idx);
899 }
900 
901 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
902 int64_t read_int16_func(const ResultSet& rows,
903  const size_t input_buffer_entry_idx,
904  const size_t target_idx,
905  const size_t slot_idx) {
906  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
907  input_buffer_entry_idx, target_idx, slot_idx);
908 }
909 
910 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
911 int64_t read_int8_func(const ResultSet& rows,
912  const size_t input_buffer_entry_idx,
913  const size_t target_idx,
914  const size_t slot_idx) {
915  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
916  input_buffer_entry_idx, target_idx, slot_idx);
917 }
918 
919 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
920 int64_t read_float_func(const ResultSet& rows,
921  const size_t input_buffer_entry_idx,
922  const size_t target_idx,
923  const size_t slot_idx) {
924  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
925  input_buffer_entry_idx, target_idx, slot_idx);
926  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
927 }
928 
929 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
930 int64_t read_double_func(const ResultSet& rows,
931  const size_t input_buffer_entry_idx,
932  const size_t target_idx,
933  const size_t slot_idx) {
934  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
935  input_buffer_entry_idx, target_idx, slot_idx);
936  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
937 }
938 
939 } // namespace
940 
947 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
948 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
949  const ResultSet& rows,
950  const std::vector<size_t>& slot_idx_per_target_idx,
951  const std::vector<bool>& targets_to_skip) {
953  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
954  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
955 
956  std::vector<ReadFunction> read_functions;
957  read_functions.reserve(target_types_.size());
958 
959  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
960  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
961  // for targets that should be skipped, we use a placeholder function that should
962  // never be called. The CHECKs inside it make sure that never happens.
963  read_functions.emplace_back(invalid_read_func);
964  continue;
965  }
966 
967  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
968  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
969  // for key columns only
970  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
971  if (target_types_[target_idx].is_fp()) {
972  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
973  switch (target_types_[target_idx].get_type()) {
974  case kFLOAT:
975  read_functions.emplace_back(
976  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
977  break;
978  case kDOUBLE:
979  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
980  break;
981  default:
982  UNREACHABLE()
983  << "Invalid data type encountered (BaselineHash, floating point key).";
984  break;
985  }
986  } else {
987  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
988  case 8:
989  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
990  break;
991  case 4:
992  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
993  break;
994  default:
995  UNREACHABLE()
996  << "Invalid data type encountered (BaselineHash, integer key).";
997  }
998  }
999  continue;
1000  }
1001  }
1002  if (target_types_[target_idx].is_fp()) {
1003  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1004  case 8:
1005  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1006  break;
1007  case 4:
1008  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1009  break;
1010  default:
1011  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1012  break;
1013  }
1014  } else {
1015  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1016  case 8:
1017  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1018  break;
1019  case 4:
1020  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1021  break;
1022  case 2:
1023  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1024  break;
1025  case 1:
1026  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1027  break;
1028  default:
1029  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1030  break;
1031  }
1032  }
1033  }
1034  return read_functions;
1035 }
1036 
1044 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1045  std::vector<ColumnarResults::ReadFunction>>
1047  const ResultSet& rows,
1048  const std::vector<size_t>& slot_idx_per_target_idx,
1049  const std::vector<bool>& targets_to_skip) {
1051  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1052  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1053 
1054  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1055  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1056  if (rows.didOutputColumnar()) {
1057  return std::make_tuple(
1058  std::move(write_functions),
1059  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1060  rows, slot_idx_per_target_idx, targets_to_skip));
1061  } else {
1062  return std::make_tuple(
1063  std::move(write_functions),
1064  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1065  rows, slot_idx_per_target_idx, targets_to_skip));
1066  }
1067  } else {
1068  if (rows.didOutputColumnar()) {
1069  return std::make_tuple(
1070  std::move(write_functions),
1071  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1072  rows, slot_idx_per_target_idx, targets_to_skip));
1073  } else {
1074  return std::make_tuple(
1075  std::move(write_functions),
1076  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1077  rows, slot_idx_per_target_idx, targets_to_skip));
1078  }
1079  }
1080 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int8_t * > column_buffers_
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:241
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:893
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:115
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1063
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const bool is_parallel_execution_enforced=false)
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:24
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)