OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 #include "Shared/Intervals.h"
20 #include "Shared/thread_count.h"
21 
22 #include <atomic>
23 #include <future>
24 #include <numeric>
25 
26 namespace {
27 
28 inline int64_t fixed_encoding_nullable_val(const int64_t val,
29  const SQLTypeInfo& type_info) {
30  if (type_info.get_compression() != kENCODING_NONE) {
31  CHECK(type_info.get_compression() == kENCODING_FIXED ||
32  type_info.get_compression() == kENCODING_DICT);
33  auto logical_ti = get_logical_type_info(type_info);
34  if (val == inline_int_null_val(logical_ti)) {
35  return inline_fixed_encoding_null_val(type_info);
36  }
37  }
38  return val;
39 }
40 
41 } // namespace
42 
43 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
44  const ResultSet& rows,
45  const size_t num_columns,
46  const std::vector<SQLTypeInfo>& target_types,
47  const bool is_parallel_execution_enforced)
48  : column_buffers_(num_columns)
49  , num_rows_(use_parallel_algorithms(rows) || rows.isDirectColumnarConversionPossible()
50  ? rows.entryCount()
51  : rows.rowCount())
52  , target_types_(target_types)
53  , parallel_conversion_(is_parallel_execution_enforced ? true
55  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
56  auto timer = DEBUG_TIMER(__func__);
57  column_buffers_.resize(num_columns);
58  for (size_t i = 0; i < num_columns; ++i) {
59  const bool is_varlen = target_types[i].is_array() ||
60  (target_types[i].is_string() &&
61  target_types[i].get_compression() == kENCODING_NONE) ||
62  target_types[i].is_geometry();
63  if (is_varlen) {
65  }
67  !rows.isZeroCopyColumnarConversionPossible(i)) {
68  column_buffers_[i] =
69  row_set_mem_owner->allocate(num_rows_ * target_types[i].get_size());
70  }
71  }
72 
73  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
74  materializeAllColumnsDirectly(rows, num_columns);
75  } else {
76  materializeAllColumnsThroughIteration(rows, num_columns);
77  }
78 }
79 
80 ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
81  const int8_t* one_col_buffer,
82  const size_t num_rows,
83  const SQLTypeInfo& target_type)
84  : column_buffers_(1)
85  , num_rows_(num_rows)
86  , target_types_{target_type}
87  , parallel_conversion_(false)
88  , direct_columnar_conversion_(false) {
89  auto timer = DEBUG_TIMER(__func__);
90  const bool is_varlen =
91  target_type.is_array() ||
92  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
93  target_type.is_geometry();
94  if (is_varlen) {
96  }
97  const auto buf_size = num_rows * target_type.get_size();
98  column_buffers_[0] = reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size));
99  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
100 }
101 
102 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
103  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
104  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
105  if (sub_results.empty()) {
106  return nullptr;
107  }
108  const auto total_row_count = std::accumulate(
109  sub_results.begin(),
110  sub_results.end(),
111  size_t(0),
112  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
113  return init + result->size();
114  });
115  std::unique_ptr<ColumnarResults> merged_results(
116  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
117  const auto col_count = sub_results[0]->column_buffers_.size();
118  const auto nonempty_it = std::find_if(
119  sub_results.begin(),
120  sub_results.end(),
121  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
122  if (nonempty_it == sub_results.end()) {
123  return nullptr;
124  }
125  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
126  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
127  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
128  merged_results->column_buffers_.push_back(write_ptr);
129  for (auto& rs : sub_results) {
130  CHECK_EQ(col_count, rs->column_buffers_.size());
131  if (!rs->size()) {
132  continue;
133  }
134  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
135  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
136  write_ptr += rs->size() * byte_width;
137  }
138  }
139  return merged_results;
140 }
141 
147  const size_t num_columns) {
148  std::atomic<size_t> row_idx{0};
149  const auto do_work = [num_columns, this](const std::vector<TargetValue>& crt_row,
150  const size_t row_idx) {
151  for (size_t i = 0; i < num_columns; ++i) {
152  writeBackCell(crt_row[i], row_idx, i);
153  }
154  };
155  if (isParallelConversion()) {
156  const size_t worker_count = cpu_threads();
157  std::vector<std::future<void>> conversion_threads;
158  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
159  conversion_threads.push_back(std::async(
160  std::launch::async,
161  [&rows, &do_work, &row_idx](const size_t start, const size_t end) {
162  for (size_t i = start; i < end; ++i) {
163  const auto crt_row = rows.getRowAtNoTranslations(i);
164  if (!crt_row.empty()) {
165  do_work(crt_row, row_idx.fetch_add(1));
166  }
167  }
168  },
169  interval.begin,
170  interval.end));
171  }
172  for (auto& child : conversion_threads) {
173  child.wait();
174  }
175 
176  num_rows_ = row_idx;
177  rows.setCachedRowCount(num_rows_);
178  return;
179  }
180  while (true) {
181  const auto crt_row = rows.getNextRow(false, false);
182  if (crt_row.empty()) {
183  break;
184  }
185  do_work(crt_row, row_idx);
186  ++row_idx;
187  }
188  rows.moveToBegin();
189 }
190 
191 /*
192  * This function processes and decodes its input TargetValue
193  * and write it into its corresponding column buffer's cell (with corresponding
194  * row and column indices)
195  *
196  * NOTE: this is not supposed to be processing varlen types, and they should be
197  * handled differently outside this function.
198  */
199 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
200  const size_t row_idx,
201  const size_t column_idx) {
202  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
203  CHECK(scalar_col_val);
204  auto i64_p = boost::get<int64_t>(scalar_col_val);
205  const auto& type_info = target_types_[column_idx];
206  if (i64_p) {
207  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
208  switch (target_types_[column_idx].get_size()) {
209  case 1:
210  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
211  break;
212  case 2:
213  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
214  break;
215  case 4:
216  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
217  break;
218  case 8:
219  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
220  break;
221  default:
222  CHECK(false);
223  }
224  } else {
225  CHECK(target_types_[column_idx].is_fp());
226  switch (target_types_[column_idx].get_type()) {
227  case kFLOAT: {
228  auto float_p = boost::get<float>(scalar_col_val);
229  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
230  break;
231  }
232  case kDOUBLE: {
233  auto double_p = boost::get<double>(scalar_col_val);
234  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
235  break;
236  }
237  default:
238  CHECK(false);
239  }
240  }
241 }
242 
248 template <typename DATA_TYPE>
249 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
250  const size_t input_buffer_entry_idx,
251  const size_t output_buffer_entry_idx,
252  const size_t target_idx,
253  const size_t slot_idx,
254  const ReadFunction& read_from_function) {
255  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
256  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
257  target_types_[target_idx]));
258  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
259  val;
260 }
261 
262 template <>
263 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
264  const size_t input_buffer_entry_idx,
265  const size_t output_buffer_entry_idx,
266  const size_t target_idx,
267  const size_t slot_idx,
268  const ReadFunction& read_from_function) {
269  const int32_t ival =
270  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
271  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
272  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
273 }
274 
275 template <>
276 void ColumnarResults::writeBackCellDirect<double>(
277  const ResultSet& rows,
278  const size_t input_buffer_entry_idx,
279  const size_t output_buffer_entry_idx,
280  const size_t target_idx,
281  const size_t slot_idx,
282  const ReadFunction& read_from_function) {
283  const int64_t ival =
284  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
285  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
286  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
287 }
288 
299  const size_t num_columns) {
301  switch (rows.getQueryDescriptionType()) {
303  materializeAllColumnsProjection(rows, num_columns);
304  break;
305  }
308  materializeAllColumnsGroupBy(rows, num_columns);
309  break;
310  }
311  default:
312  UNREACHABLE()
313  << "Direct columnar conversion for this query type is not supported yet.";
314  }
315 }
316 
325  const size_t num_columns) {
326  CHECK(rows.query_mem_desc_.didOutputColumnar());
328  rows.query_mem_desc_.getQueryDescriptionType() ==
330 
331  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
332 
333  // We can directly copy each non-lazy column's content
334  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
335 
336  // Only lazy columns are iterated through first and then materialized
337  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
338 }
339 
340 /*
341  * For all non-lazy columns, we can directly copy back the results of each column's
342  * contents from different storages and put them into the corresponding output buffer.
343  *
344  * This function is parallelized through assigning each column to a CPU thread.
345  */
347  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
348  const ResultSet& rows,
349  const size_t num_columns) {
351  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
352  // Saman: make sure when this lazy_fetch_info is empty
353  if (lazy_fetch_info.empty()) {
354  return true;
355  } else {
356  return !lazy_fetch_info[col_idx].is_lazily_fetched;
357  }
358  };
359 
360  // parallelized by assigning each column to a thread
361  std::vector<std::future<void>> direct_copy_threads;
362  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
363  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
364  CHECK(!column_buffers_[col_idx]);
365  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
366  } else if (is_column_non_lazily_fetched(col_idx)) {
367  direct_copy_threads.push_back(std::async(
368  std::launch::async,
369  [&rows, this](const size_t column_index) {
370  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
371  rows.copyColumnIntoBuffer(
372  column_index, column_buffers_[column_index], column_size);
373  },
374  col_idx));
375  }
376  }
377 
378  for (auto& child : direct_copy_threads) {
379  child.wait();
380  }
381 }
382 
393  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
394  const ResultSet& rows,
395  const size_t num_columns) {
397  const auto do_work_just_lazy_columns = [num_columns, this](
398  const std::vector<TargetValue>& crt_row,
399  const size_t row_idx,
400  const std::vector<bool>& targets_to_skip) {
401  for (size_t i = 0; i < num_columns; ++i) {
402  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
403  writeBackCell(crt_row[i], row_idx, i);
404  }
405  }
406  };
407 
408  const auto contains_lazy_fetched_column =
409  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
410  for (auto& col_info : lazy_fetch_info) {
411  if (col_info.is_lazily_fetched) {
412  return true;
413  }
414  }
415  return false;
416  };
417 
418  // parallelized by assigning a chunk of rows to each thread)
419  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
420  if (contains_lazy_fetched_column(lazy_fetch_info)) {
421  const size_t worker_count = use_parallel_algorithms(rows) ? cpu_threads() : 1;
422  std::vector<std::future<void>> conversion_threads;
423  std::vector<bool> targets_to_skip;
424  if (skip_non_lazy_columns) {
425  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
426  targets_to_skip.reserve(num_columns);
427  for (size_t i = 0; i < num_columns; i++) {
428  // we process lazy columns (i.e., skip non-lazy columns)
429  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
430  }
431  }
432  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
433  conversion_threads.push_back(std::async(
434  std::launch::async,
435  [&rows, &do_work_just_lazy_columns, &targets_to_skip](const size_t start,
436  const size_t end) {
437  for (size_t i = start; i < end; ++i) {
438  const auto crt_row = rows.getRowAtNoTranslations(i, targets_to_skip);
439  do_work_just_lazy_columns(crt_row, i, targets_to_skip);
440  }
441  },
442  interval.begin,
443  interval.end));
444  }
445 
446  for (auto& child : conversion_threads) {
447  child.wait();
448  }
449  }
450 }
451 
459  const size_t num_columns) {
461  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
462  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
463 
464  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
465  const size_t entry_count = rows.entryCount();
466  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
467 
468  // step 1: compute total non-empty elements and store a bitmap per thread
469  std::vector<size_t> non_empty_per_thread(num_threads,
470  0); // number of non-empty entries per thread
471 
472  ColumnBitmap bitmap(size_per_thread, num_threads);
473 
475  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
476 
477  // step 2: go through the generated bitmap and copy/decode corresponding entries
478  // into the output buffer
480  bitmap,
481  non_empty_per_thread,
482  num_columns,
483  entry_count,
484  num_threads,
485  size_per_thread);
486 }
487 
493 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
494  ColumnBitmap& bitmap,
495  std::vector<size_t>& non_empty_per_thread,
496  const size_t entry_count,
497  const size_t num_threads,
498  const size_t size_per_thread) const {
500  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
501  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
502  CHECK_EQ(num_threads, non_empty_per_thread.size());
503  auto locate_and_count_func =
504  [&rows, &bitmap, &non_empty_per_thread](
505  size_t start_index, size_t end_index, size_t thread_idx) {
506  size_t total_non_empty = 0;
507  size_t local_idx = 0;
508  for (size_t entry_idx = start_index; entry_idx < end_index;
509  entry_idx++, local_idx++) {
510  if (!rows.isRowAtEmpty(entry_idx)) {
511  total_non_empty++;
512  bitmap.set(local_idx, thread_idx, true);
513  }
514  }
515  non_empty_per_thread[thread_idx] = total_non_empty;
516  };
517 
518  std::vector<std::future<void>> conversion_threads;
519  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
520  const size_t start_entry = thread_idx * size_per_thread;
521  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
522  conversion_threads.push_back(std::async(
523  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
524  }
525 
526  for (auto& child : conversion_threads) {
527  child.wait();
528  }
529 }
530 
541  const ResultSet& rows,
542  const ColumnBitmap& bitmap,
543  const std::vector<size_t>& non_empty_per_thread,
544  const size_t num_columns,
545  const size_t entry_count,
546  const size_t num_threads,
547  const size_t size_per_thread) {
549  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
550  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
551  CHECK_EQ(num_threads, non_empty_per_thread.size());
552 
553  // compute the exclusive scan over all non-empty totals
554  std::vector<size_t> global_offsets(num_threads + 1, 0);
555  std::partial_sum(non_empty_per_thread.begin(),
556  non_empty_per_thread.end(),
557  std::next(global_offsets.begin()));
558 
559  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
560  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
561  rows.getSupportedSingleSlotTargetBitmap();
562 
563  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
564  // differently and accessed through result set's iterator
565  if (num_single_slot_targets < num_columns) {
567  bitmap,
568  non_empty_per_thread,
569  global_offsets,
570  single_slot_targets_to_skip,
571  slot_idx_per_target_idx,
572  num_columns,
573  entry_count,
574  num_threads,
575  size_per_thread);
576  } else {
578  bitmap,
579  non_empty_per_thread,
580  global_offsets,
581  slot_idx_per_target_idx,
582  num_columns,
583  entry_count,
584  num_threads,
585  size_per_thread);
586  }
587 }
588 
596  const ResultSet& rows,
597  const ColumnBitmap& bitmap,
598  const std::vector<size_t>& non_empty_per_thread,
599  const std::vector<size_t>& global_offsets,
600  const std::vector<bool>& targets_to_skip,
601  const std::vector<size_t>& slot_idx_per_target_idx,
602  const size_t num_columns,
603  const size_t entry_count,
604  const size_t num_threads,
605  const size_t size_per_thread) {
607  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
608  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
609 
610  const auto [write_functions, read_functions] =
611  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
612  CHECK_EQ(write_functions.size(), num_columns);
613  CHECK_EQ(read_functions.size(), num_columns);
614 
615  auto compact_buffer_func = [this,
616  &rows,
617  &bitmap,
618  &global_offsets,
619  &non_empty_per_thread,
620  &num_columns,
621  &targets_to_skip,
622  &slot_idx_per_target_idx,
623  &write_functions = write_functions,
624  &read_functions = read_functions](const size_t start_index,
625  const size_t end_index,
626  const size_t thread_idx) {
627  const size_t total_non_empty = non_empty_per_thread[thread_idx];
628  size_t non_empty_idx = 0;
629  size_t local_idx = 0;
630  for (size_t entry_idx = start_index; entry_idx < end_index;
631  entry_idx++, local_idx++) {
632  if (non_empty_idx >= total_non_empty) {
633  // all non-empty entries has been written back
634  break;
635  }
636  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
637  if (bitmap.get(local_idx, thread_idx)) {
638  // targets that are recovered from the result set iterators:
639  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
640  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
641  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
642  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
643  }
644  }
645  // targets that are copied directly without any translation/decoding from
646  // result set
647  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
648  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
649  continue;
650  }
651  write_functions[column_idx](rows,
652  entry_idx,
653  output_buffer_row_idx,
654  column_idx,
655  slot_idx_per_target_idx[column_idx],
656  read_functions[column_idx]);
657  }
658  non_empty_idx++;
659  } else {
660  continue;
661  }
662  }
663  };
664 
665  std::vector<std::future<void>> compaction_threads;
666  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
667  const size_t start_entry = thread_idx * size_per_thread;
668  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
669  compaction_threads.push_back(std::async(
670  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
671  }
672 
673  for (auto& child : compaction_threads) {
674  child.wait();
675  }
676 }
677 
685  const ResultSet& rows,
686  const ColumnBitmap& bitmap,
687  const std::vector<size_t>& non_empty_per_thread,
688  const std::vector<size_t>& global_offsets,
689  const std::vector<size_t>& slot_idx_per_target_idx,
690  const size_t num_columns,
691  const size_t entry_count,
692  const size_t num_threads,
693  const size_t size_per_thread) {
695  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
696  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
697 
698  const auto [write_functions, read_functions] =
699  initAllConversionFunctions(rows, slot_idx_per_target_idx);
700  CHECK_EQ(write_functions.size(), num_columns);
701  CHECK_EQ(read_functions.size(), num_columns);
702 
703  auto compact_buffer_func = [&rows,
704  &bitmap,
705  &global_offsets,
706  &non_empty_per_thread,
707  &num_columns,
708  &slot_idx_per_target_idx,
709  &write_functions = write_functions,
710  &read_functions = read_functions](const size_t start_index,
711  const size_t end_index,
712  const size_t thread_idx) {
713  const size_t total_non_empty = non_empty_per_thread[thread_idx];
714  size_t non_empty_idx = 0;
715  size_t local_idx = 0;
716  for (size_t entry_idx = start_index; entry_idx < end_index;
717  entry_idx++, local_idx++) {
718  if (non_empty_idx >= total_non_empty) {
719  // all non-empty entries has been written back
720  break;
721  }
722  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
723  if (bitmap.get(local_idx, thread_idx)) {
724  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
725  write_functions[column_idx](rows,
726  entry_idx,
727  output_buffer_row_idx,
728  column_idx,
729  slot_idx_per_target_idx[column_idx],
730  read_functions[column_idx]);
731  }
732  non_empty_idx++;
733  } else {
734  continue;
735  }
736  }
737  };
738 
739  std::vector<std::future<void>> compaction_threads;
740  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
741  const size_t start_entry = thread_idx * size_per_thread;
742  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
743  compaction_threads.push_back(std::async(
744  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
745  }
746 
747  for (auto& child : compaction_threads) {
748  child.wait();
749  }
750 }
751 
757 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
758  const ResultSet& rows,
759  const std::vector<bool>& targets_to_skip) {
761  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
762  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
763 
764  std::vector<WriteFunction> result;
765  result.reserve(target_types_.size());
766 
767  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
768  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
769  result.emplace_back([](const ResultSet& rows,
770  const size_t input_buffer_entry_idx,
771  const size_t output_buffer_entry_idx,
772  const size_t target_idx,
773  const size_t slot_idx,
774  const ReadFunction& read_function) {
775  UNREACHABLE() << "Invalid write back function used.";
776  });
777  continue;
778  }
779 
780  if (target_types_[target_idx].is_fp()) {
781  switch (target_types_[target_idx].get_size()) {
782  case 8:
783  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
784  this,
785  std::placeholders::_1,
786  std::placeholders::_2,
787  std::placeholders::_3,
788  std::placeholders::_4,
789  std::placeholders::_5,
790  std::placeholders::_6));
791  break;
792  case 4:
793  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
794  this,
795  std::placeholders::_1,
796  std::placeholders::_2,
797  std::placeholders::_3,
798  std::placeholders::_4,
799  std::placeholders::_5,
800  std::placeholders::_6));
801  break;
802  default:
803  UNREACHABLE() << "Invalid target type encountered.";
804  break;
805  }
806  } else {
807  switch (target_types_[target_idx].get_size()) {
808  case 8:
809  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
810  this,
811  std::placeholders::_1,
812  std::placeholders::_2,
813  std::placeholders::_3,
814  std::placeholders::_4,
815  std::placeholders::_5,
816  std::placeholders::_6));
817  break;
818  case 4:
819  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
820  this,
821  std::placeholders::_1,
822  std::placeholders::_2,
823  std::placeholders::_3,
824  std::placeholders::_4,
825  std::placeholders::_5,
826  std::placeholders::_6));
827  break;
828  case 2:
829  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
830  this,
831  std::placeholders::_1,
832  std::placeholders::_2,
833  std::placeholders::_3,
834  std::placeholders::_4,
835  std::placeholders::_5,
836  std::placeholders::_6));
837  break;
838  case 1:
839  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
840  this,
841  std::placeholders::_1,
842  std::placeholders::_2,
843  std::placeholders::_3,
844  std::placeholders::_4,
845  std::placeholders::_5,
846  std::placeholders::_6));
847  break;
848  default:
849  UNREACHABLE() << "Invalid target type encountered.";
850  break;
851  }
852  }
853  }
854  return result;
855 }
856 
857 namespace {
858 
859 int64_t invalid_read_func(const ResultSet& rows,
860  const size_t input_buffer_entry_idx,
861  const size_t target_idx,
862  const size_t slot_idx) {
863  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
864  return static_cast<int64_t>(0);
865 }
866 
867 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
868 int64_t read_float_key_baseline(const ResultSet& rows,
869  const size_t input_buffer_entry_idx,
870  const size_t target_idx,
871  const size_t slot_idx) {
872  // float keys in baseline hash are written as doubles in the buffer, so
873  // the result should properly be casted before being written in the output
874  // columns
875  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
876  input_buffer_entry_idx, target_idx, slot_idx));
877  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
878 }
879 
880 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
881 int64_t read_int64_func(const ResultSet& rows,
882  const size_t input_buffer_entry_idx,
883  const size_t target_idx,
884  const size_t slot_idx) {
885  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
886  input_buffer_entry_idx, target_idx, slot_idx);
887 }
888 
889 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
890 int64_t read_int32_func(const ResultSet& rows,
891  const size_t input_buffer_entry_idx,
892  const size_t target_idx,
893  const size_t slot_idx) {
894  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
895  input_buffer_entry_idx, target_idx, slot_idx);
896 }
897 
898 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
899 int64_t read_int16_func(const ResultSet& rows,
900  const size_t input_buffer_entry_idx,
901  const size_t target_idx,
902  const size_t slot_idx) {
903  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
904  input_buffer_entry_idx, target_idx, slot_idx);
905 }
906 
907 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
908 int64_t read_int8_func(const ResultSet& rows,
909  const size_t input_buffer_entry_idx,
910  const size_t target_idx,
911  const size_t slot_idx) {
912  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
913  input_buffer_entry_idx, target_idx, slot_idx);
914 }
915 
916 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
917 int64_t read_float_func(const ResultSet& rows,
918  const size_t input_buffer_entry_idx,
919  const size_t target_idx,
920  const size_t slot_idx) {
921  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
922  input_buffer_entry_idx, target_idx, slot_idx);
923  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
924 }
925 
926 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
927 int64_t read_double_func(const ResultSet& rows,
928  const size_t input_buffer_entry_idx,
929  const size_t target_idx,
930  const size_t slot_idx) {
931  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
932  input_buffer_entry_idx, target_idx, slot_idx);
933  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
934 }
935 
936 } // namespace
937 
944 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
945 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
946  const ResultSet& rows,
947  const std::vector<size_t>& slot_idx_per_target_idx,
948  const std::vector<bool>& targets_to_skip) {
950  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
951  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
952 
953  std::vector<ReadFunction> read_functions;
954  read_functions.reserve(target_types_.size());
955 
956  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
957  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
958  // for targets that should be skipped, we use a placeholder function that should
959  // never be called. The CHECKs inside it make sure that never happens.
960  read_functions.emplace_back(invalid_read_func);
961  continue;
962  }
963 
964  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
965  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
966  // for key columns only
967  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
968  if (target_types_[target_idx].is_fp()) {
969  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
970  switch (target_types_[target_idx].get_type()) {
971  case kFLOAT:
972  read_functions.emplace_back(
973  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
974  break;
975  case kDOUBLE:
976  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
977  break;
978  default:
979  UNREACHABLE()
980  << "Invalid data type encountered (BaselineHash, floating point key).";
981  break;
982  }
983  } else {
984  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
985  case 8:
986  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
987  break;
988  case 4:
989  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
990  break;
991  default:
992  UNREACHABLE()
993  << "Invalid data type encountered (BaselineHash, integer key).";
994  }
995  }
996  continue;
997  }
998  }
999  if (target_types_[target_idx].is_fp()) {
1000  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1001  case 8:
1002  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1003  break;
1004  case 4:
1005  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1006  break;
1007  default:
1008  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1009  break;
1010  }
1011  } else {
1012  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1013  case 8:
1014  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1015  break;
1016  case 4:
1017  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1018  break;
1019  case 2:
1020  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1021  break;
1022  case 1:
1023  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1024  break;
1025  default:
1026  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1027  break;
1028  }
1029  }
1030  }
1031  return read_functions;
1032 }
1033 
1041 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1042  std::vector<ColumnarResults::ReadFunction>>
1044  const ResultSet& rows,
1045  const std::vector<size_t>& slot_idx_per_target_idx,
1046  const std::vector<bool>& targets_to_skip) {
1048  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1049  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1050 
1051  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1052  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1053  if (rows.didOutputColumnar()) {
1054  return std::make_tuple(
1055  std::move(write_functions),
1056  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1057  rows, slot_idx_per_target_idx, targets_to_skip));
1058  } else {
1059  return std::make_tuple(
1060  std::move(write_functions),
1061  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1062  rows, slot_idx_per_target_idx, targets_to_skip));
1063  }
1064  } else {
1065  if (rows.didOutputColumnar()) {
1066  return std::make_tuple(
1067  std::move(write_functions),
1068  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1069  rows, slot_idx_per_target_idx, targets_to_skip));
1070  } else {
1071  return std::make_tuple(
1072  std::move(write_functions),
1073  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1074  rows, slot_idx_per_target_idx, targets_to_skip));
1075  }
1076  }
1077 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int8_t * > column_buffers_
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1000
const int8_t const int64_t * num_rows
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:108
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:241
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:818
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
CHECK(cgen_state)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:276
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:266
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define DEBUG_TIMER(name)
Definition: Logger.h:313
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const bool is_parallel_execution_enforced=false)
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)