OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 
20 #include "../Shared/thread_count.h"
21 
22 #include <atomic>
23 #include <future>
24 #include <numeric>
25 
26 namespace {
27 
28 inline int64_t fixed_encoding_nullable_val(const int64_t val,
29  const SQLTypeInfo& type_info) {
30  if (type_info.get_compression() != kENCODING_NONE) {
31  CHECK(type_info.get_compression() == kENCODING_FIXED ||
32  type_info.get_compression() == kENCODING_DICT);
33  auto logical_ti = get_logical_type_info(type_info);
34  if (val == inline_int_null_val(logical_ti)) {
35  return inline_fixed_encoding_null_val(type_info);
36  }
37  }
38  return val;
39 }
40 
41 } // namespace
42 
44  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
45  const ResultSet& rows,
46  const size_t num_columns,
47  const std::vector<SQLTypeInfo>& target_types,
48  const bool is_parallel_execution_enforced)
49  : column_buffers_(num_columns)
50  , num_rows_(use_parallel_algorithms(rows) || rows.isDirectColumnarConversionPossible()
51  ? rows.entryCount()
52  : rows.rowCount())
53  , target_types_(target_types)
54  , parallel_conversion_(is_parallel_execution_enforced ? true
56  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
57  auto timer = DEBUG_TIMER(__func__);
58  column_buffers_.resize(num_columns);
59  for (size_t i = 0; i < num_columns; ++i) {
60  const bool is_varlen = target_types[i].is_array() ||
61  (target_types[i].is_string() &&
62  target_types[i].get_compression() == kENCODING_NONE) ||
63  target_types[i].is_geometry();
64  if (is_varlen) {
66  }
67  column_buffers_[i] =
68  reinterpret_cast<int8_t*>(checked_malloc(num_rows_ * target_types[i].get_size()));
69  row_set_mem_owner->addColBuffer(column_buffers_[i]);
70  }
71 
72  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
73  materializeAllColumnsDirectly(rows, num_columns);
74  } else {
75  materializeAllColumnsThroughIteration(rows, num_columns);
76  }
77 }
78 
80  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
81  const int8_t* one_col_buffer,
82  const size_t num_rows,
83  const SQLTypeInfo& target_type)
84  : column_buffers_(1)
85  , num_rows_(num_rows)
86  , target_types_{target_type}
87  , parallel_conversion_(false)
88  , direct_columnar_conversion_(false) {
89  auto timer = DEBUG_TIMER(__func__);
90  const bool is_varlen =
91  target_type.is_array() ||
92  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
93  target_type.is_geometry();
94  if (is_varlen) {
96  }
97  const auto buf_size = num_rows * target_type.get_size();
98  column_buffers_[0] = reinterpret_cast<int8_t*>(checked_malloc(buf_size));
99  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
100  row_set_mem_owner->addColBuffer(column_buffers_[0]);
101 }
102 
103 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
104  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
105  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
106  if (sub_results.empty()) {
107  return nullptr;
108  }
109  const auto total_row_count = std::accumulate(
110  sub_results.begin(),
111  sub_results.end(),
112  size_t(0),
113  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
114  return init + result->size();
115  });
116  std::unique_ptr<ColumnarResults> merged_results(
117  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
118  const auto col_count = sub_results[0]->column_buffers_.size();
119  const auto nonempty_it = std::find_if(
120  sub_results.begin(),
121  sub_results.end(),
122  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
123  if (nonempty_it == sub_results.end()) {
124  return nullptr;
125  }
126  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
127  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
128  auto write_ptr =
129  reinterpret_cast<int8_t*>(checked_malloc(byte_width * total_row_count));
130  merged_results->column_buffers_.push_back(write_ptr);
131  row_set_mem_owner->addColBuffer(write_ptr);
132  for (auto& rs : sub_results) {
133  CHECK_EQ(col_count, rs->column_buffers_.size());
134  if (!rs->size()) {
135  continue;
136  }
137  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
138  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
139  write_ptr += rs->size() * byte_width;
140  }
141  }
142  return merged_results;
143 }
144 
150  const size_t num_columns) {
151  std::atomic<size_t> row_idx{0};
152  const auto do_work = [num_columns, this](const std::vector<TargetValue>& crt_row,
153  const size_t row_idx) {
154  for (size_t i = 0; i < num_columns; ++i) {
155  writeBackCell(crt_row[i], row_idx, i);
156  }
157  };
158  if (isParallelConversion()) {
159  const size_t worker_count = cpu_threads();
160  std::vector<std::future<void>> conversion_threads;
161  const auto entry_count = rows.entryCount();
162  for (size_t i = 0,
163  start_entry = 0,
164  stride = (entry_count + worker_count - 1) / worker_count;
165  i < worker_count && start_entry < entry_count;
166  ++i, start_entry += stride) {
167  const auto end_entry = std::min(start_entry + stride, entry_count);
168  conversion_threads.push_back(std::async(
169  std::launch::async,
170  [&rows, &do_work, &row_idx](const size_t start, const size_t end) {
171  for (size_t i = start; i < end; ++i) {
172  const auto crt_row = rows.getRowAtNoTranslations(i);
173  if (!crt_row.empty()) {
174  do_work(crt_row, row_idx.fetch_add(1));
175  }
176  }
177  },
178  start_entry,
179  end_entry));
180  }
181  for (auto& child : conversion_threads) {
182  child.wait();
183  }
184 
185  num_rows_ = row_idx;
186  rows.setCachedRowCount(num_rows_);
187  return;
188  }
189  while (true) {
190  const auto crt_row = rows.getNextRow(false, false);
191  if (crt_row.empty()) {
192  break;
193  }
194  do_work(crt_row, row_idx);
195  ++row_idx;
196  }
197  rows.moveToBegin();
198 }
199 
200 /*
201  * This function processes and decodes its input TargetValue
202  * and write it into its corresponding column buffer's cell (with corresponding
203  * row and column indices)
204  *
205  * NOTE: this is not supposed to be processing varlen types, and they should be
206  * handled differently outside this function.
207  */
208 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
209  const size_t row_idx,
210  const size_t column_idx) {
211  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
212  CHECK(scalar_col_val);
213  auto i64_p = boost::get<int64_t>(scalar_col_val);
214  const auto& type_info = target_types_[column_idx];
215  if (i64_p) {
216  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
217  switch (target_types_[column_idx].get_size()) {
218  case 1:
219  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
220  break;
221  case 2:
222  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
223  break;
224  case 4:
225  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
226  break;
227  case 8:
228  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
229  break;
230  default:
231  CHECK(false);
232  }
233  } else {
234  CHECK(target_types_[column_idx].is_fp());
235  switch (target_types_[column_idx].get_type()) {
236  case kFLOAT: {
237  auto float_p = boost::get<float>(scalar_col_val);
238  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
239  break;
240  }
241  case kDOUBLE: {
242  auto double_p = boost::get<double>(scalar_col_val);
243  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
244  break;
245  }
246  default:
247  CHECK(false);
248  }
249  }
250 }
251 
257 template <typename DATA_TYPE>
258 void ColumnarResults::writeBackCellDirect(const ResultSet& rows,
259  const size_t input_buffer_entry_idx,
260  const size_t output_buffer_entry_idx,
261  const size_t target_idx,
262  const size_t slot_idx,
263  const ReadFunction& read_from_function) {
264  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
265  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
266  target_types_[target_idx]));
267  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
268  val;
269 }
270 
271 template <>
272 void ColumnarResults::writeBackCellDirect<float>(const ResultSet& rows,
273  const size_t input_buffer_entry_idx,
274  const size_t output_buffer_entry_idx,
275  const size_t target_idx,
276  const size_t slot_idx,
277  const ReadFunction& read_from_function) {
278  const int32_t ival =
279  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
280  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
281  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
282 }
283 
284 template <>
285 void ColumnarResults::writeBackCellDirect<double>(
286  const ResultSet& rows,
287  const size_t input_buffer_entry_idx,
288  const size_t output_buffer_entry_idx,
289  const size_t target_idx,
290  const size_t slot_idx,
291  const ReadFunction& read_from_function) {
292  const int64_t ival =
293  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
294  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
295  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
296 }
297 
308  const size_t num_columns) {
310  switch (rows.getQueryDescriptionType()) {
312  materializeAllColumnsProjection(rows, num_columns);
313  break;
314  }
317  materializeAllColumnsGroupBy(rows, num_columns);
318  break;
319  }
320  default:
321  UNREACHABLE()
322  << "Direct columnar conversion for this query type is not supported yet.";
323  }
324 }
325 
334  const size_t num_columns) {
335  CHECK(rows.query_mem_desc_.didOutputColumnar());
337  rows.query_mem_desc_.getQueryDescriptionType() ==
339 
340  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
341 
342  // We can directly copy each non-lazy column's content
343  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
344 
345  // Only lazy columns are iterated through first and then materialized
346  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
347 }
348 
349 /*
350  * For all non-lazy columns, we can directly copy back the results of each column's
351  * contents from different storages and put them into the corresponding output buffer.
352  *
353  * This function is parallelized through assigning each column to a CPU thread.
354  */
356  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
357  const ResultSet& rows,
358  const size_t num_columns) {
360  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
361  // Saman: make sure when this lazy_fetch_info is empty
362  if (lazy_fetch_info.empty()) {
363  return true;
364  } else {
365  return !lazy_fetch_info[col_idx].is_lazily_fetched;
366  }
367  };
368 
369  // parallelized by assigning each column to a thread
370  std::vector<std::future<void>> direct_copy_threads;
371  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
372  if (is_column_non_lazily_fetched(col_idx)) {
373  direct_copy_threads.push_back(std::async(
374  std::launch::async,
375  [&rows, this](const size_t column_index) {
376  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
377  rows.copyColumnIntoBuffer(
378  column_index, column_buffers_[column_index], column_size);
379  },
380  col_idx));
381  }
382  }
383 
384  for (auto& child : direct_copy_threads) {
385  child.wait();
386  }
387 }
388 
399  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
400  const ResultSet& rows,
401  const size_t num_columns) {
403  const auto do_work_just_lazy_columns = [num_columns, this](
404  const std::vector<TargetValue>& crt_row,
405  const size_t row_idx,
406  const std::vector<bool>& targets_to_skip) {
407  for (size_t i = 0; i < num_columns; ++i) {
408  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
409  writeBackCell(crt_row[i], row_idx, i);
410  }
411  }
412  };
413 
414  const auto contains_lazy_fetched_column =
415  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
416  for (auto& col_info : lazy_fetch_info) {
417  if (col_info.is_lazily_fetched) {
418  return true;
419  }
420  }
421  return false;
422  };
423 
424  // parallelized by assigning a chunk of rows to each thread)
425  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty() ? true : false;
426  if (contains_lazy_fetched_column(lazy_fetch_info)) {
427  const size_t worker_count = use_parallel_algorithms(rows) ? cpu_threads() : 1;
428  std::vector<std::future<void>> conversion_threads;
429  const auto entry_count = rows.entryCount();
430  std::vector<bool> targets_to_skip;
431  if (skip_non_lazy_columns) {
432  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
433  targets_to_skip.reserve(num_columns);
434  for (size_t i = 0; i < num_columns; i++) {
435  // we process lazy columns (i.e., skip non-lazy columns)
436  targets_to_skip.push_back(lazy_fetch_info[i].is_lazily_fetched ? false : true);
437  }
438  }
439  for (size_t i = 0,
440  start_entry = 0,
441  stride = (entry_count + worker_count - 1) / worker_count;
442  i < worker_count && start_entry < entry_count;
443  ++i, start_entry += stride) {
444  const auto end_entry = std::min(start_entry + stride, entry_count);
445  conversion_threads.push_back(std::async(
446  std::launch::async,
447  [&rows, &do_work_just_lazy_columns, &targets_to_skip](const size_t start,
448  const size_t end) {
449  for (size_t i = start; i < end; ++i) {
450  const auto crt_row = rows.getRowAtNoTranslations(i, targets_to_skip);
451  do_work_just_lazy_columns(crt_row, i, targets_to_skip);
452  }
453  },
454  start_entry,
455  end_entry));
456  }
457 
458  for (auto& child : conversion_threads) {
459  child.wait();
460  }
461  }
462 }
463 
471  const size_t num_columns) {
473  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
474  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
475 
476  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
477  const size_t entry_count = rows.entryCount();
478  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
479 
480  // step 1: compute total non-empty elements and store a bitmap per thread
481  std::vector<size_t> non_empty_per_thread(num_threads,
482  0); // number of non-empty entries per thread
483 
484  ColumnBitmap bitmap(size_per_thread, num_threads);
485 
487  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
488 
489  // step 2: go through the generated bitmap and copy/decode corresponding entries
490  // into the output buffer
492  bitmap,
493  non_empty_per_thread,
494  num_columns,
495  entry_count,
496  num_threads,
497  size_per_thread);
498 }
499 
505 void ColumnarResults::locateAndCountEntries(const ResultSet& rows,
506  ColumnBitmap& bitmap,
507  std::vector<size_t>& non_empty_per_thread,
508  const size_t entry_count,
509  const size_t num_threads,
510  const size_t size_per_thread) const {
512  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
513  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
514  CHECK_EQ(num_threads, non_empty_per_thread.size());
515  auto locate_and_count_func =
516  [&rows, &bitmap, &non_empty_per_thread](
517  size_t start_index, size_t end_index, size_t thread_idx) {
518  size_t total_non_empty = 0;
519  size_t local_idx = 0;
520  for (size_t entry_idx = start_index; entry_idx < end_index;
521  entry_idx++, local_idx++) {
522  if (!rows.isRowAtEmpty(entry_idx)) {
523  total_non_empty++;
524  bitmap.set(local_idx, thread_idx, true);
525  }
526  }
527  non_empty_per_thread[thread_idx] = total_non_empty;
528  };
529 
530  std::vector<std::future<void>> conversion_threads;
531  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
532  const size_t start_entry = thread_idx * size_per_thread;
533  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
534  conversion_threads.push_back(std::async(
535  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
536  }
537 
538  for (auto& child : conversion_threads) {
539  child.wait();
540  }
541 }
542 
553  const ResultSet& rows,
554  const ColumnBitmap& bitmap,
555  const std::vector<size_t>& non_empty_per_thread,
556  const size_t num_columns,
557  const size_t entry_count,
558  const size_t num_threads,
559  const size_t size_per_thread) {
561  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
562  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
563  CHECK_EQ(num_threads, non_empty_per_thread.size());
564 
565  // compute the exclusive scan over all non-empty totals
566  std::vector<size_t> global_offsets(num_threads + 1, 0);
567  std::partial_sum(non_empty_per_thread.begin(),
568  non_empty_per_thread.end(),
569  std::next(global_offsets.begin()));
570 
571  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
572  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
573  rows.getSupportedSingleSlotTargetBitmap();
574 
575  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
576  // differently and accessed through result set's iterator
577  if (num_single_slot_targets < num_columns) {
579  bitmap,
580  non_empty_per_thread,
581  global_offsets,
582  single_slot_targets_to_skip,
583  slot_idx_per_target_idx,
584  num_columns,
585  entry_count,
586  num_threads,
587  size_per_thread);
588  } else {
590  bitmap,
591  non_empty_per_thread,
592  global_offsets,
593  slot_idx_per_target_idx,
594  num_columns,
595  entry_count,
596  num_threads,
597  size_per_thread);
598  }
599 }
600 
608  const ResultSet& rows,
609  const ColumnBitmap& bitmap,
610  const std::vector<size_t>& non_empty_per_thread,
611  const std::vector<size_t>& global_offsets,
612  const std::vector<bool>& targets_to_skip,
613  const std::vector<size_t>& slot_idx_per_target_idx,
614  const size_t num_columns,
615  const size_t entry_count,
616  const size_t num_threads,
617  const size_t size_per_thread) {
619  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
620  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
621 
622  const auto [write_functions, read_functions] =
623  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
624  CHECK_EQ(write_functions.size(), num_columns);
625  CHECK_EQ(read_functions.size(), num_columns);
626 
627  auto compact_buffer_func = [this,
628  &rows,
629  &bitmap,
630  &global_offsets,
631  &non_empty_per_thread,
632  &num_columns,
633  &targets_to_skip,
634  &slot_idx_per_target_idx,
635  &write_functions = write_functions,
636  &read_functions = read_functions](const size_t start_index,
637  const size_t end_index,
638  const size_t thread_idx) {
639  const size_t total_non_empty = non_empty_per_thread[thread_idx];
640  size_t non_empty_idx = 0;
641  size_t local_idx = 0;
642  for (size_t entry_idx = start_index; entry_idx < end_index;
643  entry_idx++, local_idx++) {
644  if (non_empty_idx >= total_non_empty) {
645  // all non-empty entries has been written back
646  break;
647  }
648  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
649  if (bitmap.get(local_idx, thread_idx)) {
650  // targets that are recovered from the result set iterators:
651  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
652  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
653  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
654  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
655  }
656  }
657  // targets that are copied directly without any translation/decoding from
658  // result set
659  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
660  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
661  continue;
662  }
663  write_functions[column_idx](rows,
664  entry_idx,
665  output_buffer_row_idx,
666  column_idx,
667  slot_idx_per_target_idx[column_idx],
668  read_functions[column_idx]);
669  }
670  non_empty_idx++;
671  } else {
672  continue;
673  }
674  }
675  };
676 
677  std::vector<std::future<void>> compaction_threads;
678  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
679  const size_t start_entry = thread_idx * size_per_thread;
680  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
681  compaction_threads.push_back(std::async(
682  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
683  }
684 
685  for (auto& child : compaction_threads) {
686  child.wait();
687  }
688 }
689 
697  const ResultSet& rows,
698  const ColumnBitmap& bitmap,
699  const std::vector<size_t>& non_empty_per_thread,
700  const std::vector<size_t>& global_offsets,
701  const std::vector<size_t>& slot_idx_per_target_idx,
702  const size_t num_columns,
703  const size_t entry_count,
704  const size_t num_threads,
705  const size_t size_per_thread) {
707  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
708  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
709 
710  const auto [write_functions, read_functions] =
711  initAllConversionFunctions(rows, slot_idx_per_target_idx);
712  CHECK_EQ(write_functions.size(), num_columns);
713  CHECK_EQ(read_functions.size(), num_columns);
714 
715  auto compact_buffer_func = [&rows,
716  &bitmap,
717  &global_offsets,
718  &non_empty_per_thread,
719  &num_columns,
720  &slot_idx_per_target_idx,
721  &write_functions = write_functions,
722  &read_functions = read_functions](const size_t start_index,
723  const size_t end_index,
724  const size_t thread_idx) {
725  const size_t total_non_empty = non_empty_per_thread[thread_idx];
726  size_t non_empty_idx = 0;
727  size_t local_idx = 0;
728  for (size_t entry_idx = start_index; entry_idx < end_index;
729  entry_idx++, local_idx++) {
730  if (non_empty_idx >= total_non_empty) {
731  // all non-empty entries has been written back
732  break;
733  }
734  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
735  if (bitmap.get(local_idx, thread_idx)) {
736  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
737  write_functions[column_idx](rows,
738  entry_idx,
739  output_buffer_row_idx,
740  column_idx,
741  slot_idx_per_target_idx[column_idx],
742  read_functions[column_idx]);
743  }
744  non_empty_idx++;
745  } else {
746  continue;
747  }
748  }
749  };
750 
751  std::vector<std::future<void>> compaction_threads;
752  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
753  const size_t start_entry = thread_idx * size_per_thread;
754  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
755  compaction_threads.push_back(std::async(
756  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
757  }
758 
759  for (auto& child : compaction_threads) {
760  child.wait();
761  }
762 }
763 
769 std::vector<ColumnarResults::WriteFunction> ColumnarResults::initWriteFunctions(
770  const ResultSet& rows,
771  const std::vector<bool>& targets_to_skip) {
773  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
774  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
775 
776  std::vector<WriteFunction> result;
777  result.reserve(target_types_.size());
778 
779  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
780  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
781  result.emplace_back([](const ResultSet& rows,
782  const size_t input_buffer_entry_idx,
783  const size_t output_buffer_entry_idx,
784  const size_t target_idx,
785  const size_t slot_idx,
786  const ReadFunction& read_function) {
787  UNREACHABLE() << "Invalid write back function used.";
788  });
789  continue;
790  }
791 
792  if (target_types_[target_idx].is_fp()) {
793  switch (target_types_[target_idx].get_size()) {
794  case 8:
795  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
796  this,
797  std::placeholders::_1,
798  std::placeholders::_2,
799  std::placeholders::_3,
800  std::placeholders::_4,
801  std::placeholders::_5,
802  std::placeholders::_6));
803  break;
804  case 4:
805  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
806  this,
807  std::placeholders::_1,
808  std::placeholders::_2,
809  std::placeholders::_3,
810  std::placeholders::_4,
811  std::placeholders::_5,
812  std::placeholders::_6));
813  break;
814  default:
815  UNREACHABLE() << "Invalid target type encountered.";
816  break;
817  }
818  } else {
819  switch (target_types_[target_idx].get_size()) {
820  case 8:
821  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
822  this,
823  std::placeholders::_1,
824  std::placeholders::_2,
825  std::placeholders::_3,
826  std::placeholders::_4,
827  std::placeholders::_5,
828  std::placeholders::_6));
829  break;
830  case 4:
831  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
832  this,
833  std::placeholders::_1,
834  std::placeholders::_2,
835  std::placeholders::_3,
836  std::placeholders::_4,
837  std::placeholders::_5,
838  std::placeholders::_6));
839  break;
840  case 2:
841  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
842  this,
843  std::placeholders::_1,
844  std::placeholders::_2,
845  std::placeholders::_3,
846  std::placeholders::_4,
847  std::placeholders::_5,
848  std::placeholders::_6));
849  break;
850  case 1:
851  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
852  this,
853  std::placeholders::_1,
854  std::placeholders::_2,
855  std::placeholders::_3,
856  std::placeholders::_4,
857  std::placeholders::_5,
858  std::placeholders::_6));
859  break;
860  default:
861  UNREACHABLE() << "Invalid target type encountered.";
862  break;
863  }
864  }
865  }
866  return result;
867 }
868 
869 namespace {
870 
871 int64_t invalid_read_func(const ResultSet& rows,
872  const size_t input_buffer_entry_idx,
873  const size_t target_idx,
874  const size_t slot_idx) {
875  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
876  return static_cast<int64_t>(0);
877 }
878 
879 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
880 int64_t read_float_key_baseline(const ResultSet& rows,
881  const size_t input_buffer_entry_idx,
882  const size_t target_idx,
883  const size_t slot_idx) {
884  // float keys in baseline hash are written as doubles in the buffer, so
885  // the result should properly be casted before being written in the output
886  // columns
887  auto fval = static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
888  input_buffer_entry_idx, target_idx, slot_idx));
889  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
890 }
891 
892 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
893 int64_t read_int64_func(const ResultSet& rows,
894  const size_t input_buffer_entry_idx,
895  const size_t target_idx,
896  const size_t slot_idx) {
897  return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
898  input_buffer_entry_idx, target_idx, slot_idx);
899 }
900 
901 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
902 int64_t read_int32_func(const ResultSet& rows,
903  const size_t input_buffer_entry_idx,
904  const size_t target_idx,
905  const size_t slot_idx) {
906  return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
907  input_buffer_entry_idx, target_idx, slot_idx);
908 }
909 
910 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
911 int64_t read_int16_func(const ResultSet& rows,
912  const size_t input_buffer_entry_idx,
913  const size_t target_idx,
914  const size_t slot_idx) {
915  return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
916  input_buffer_entry_idx, target_idx, slot_idx);
917 }
918 
919 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
920 int64_t read_int8_func(const ResultSet& rows,
921  const size_t input_buffer_entry_idx,
922  const size_t target_idx,
923  const size_t slot_idx) {
924  return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
925  input_buffer_entry_idx, target_idx, slot_idx);
926 }
927 
928 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
929 int64_t read_float_func(const ResultSet& rows,
930  const size_t input_buffer_entry_idx,
931  const size_t target_idx,
932  const size_t slot_idx) {
933  auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
934  input_buffer_entry_idx, target_idx, slot_idx);
935  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
936 }
937 
938 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
939 int64_t read_double_func(const ResultSet& rows,
940  const size_t input_buffer_entry_idx,
941  const size_t target_idx,
942  const size_t slot_idx) {
943  auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
944  input_buffer_entry_idx, target_idx, slot_idx);
945  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
946 }
947 
948 } // namespace
949 
956 template <QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
957 std::vector<ColumnarResults::ReadFunction> ColumnarResults::initReadFunctions(
958  const ResultSet& rows,
959  const std::vector<size_t>& slot_idx_per_target_idx,
960  const std::vector<bool>& targets_to_skip) {
962  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
963  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
964 
965  std::vector<ReadFunction> read_functions;
966  read_functions.reserve(target_types_.size());
967 
968  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
969  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
970  // for targets that should be skipped, we use a placeholder function that should
971  // never be called. The CHECKs inside it make sure that never happens.
972  read_functions.emplace_back(invalid_read_func);
973  continue;
974  }
975 
976  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
977  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
978  // for key columns only
979  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
980  if (target_types_[target_idx].is_fp()) {
981  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
982  switch (target_types_[target_idx].get_type()) {
983  case kFLOAT:
984  read_functions.emplace_back(
985  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
986  break;
987  case kDOUBLE:
988  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
989  break;
990  default:
991  UNREACHABLE()
992  << "Invalid data type encountered (BaselineHash, floating point key).";
993  break;
994  }
995  } else {
996  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
997  case 8:
998  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
999  break;
1000  case 4:
1001  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1002  break;
1003  default:
1004  UNREACHABLE()
1005  << "Invalid data type encountered (BaselineHash, integer key).";
1006  }
1007  }
1008  continue;
1009  }
1010  }
1011  if (target_types_[target_idx].is_fp()) {
1012  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1013  case 8:
1014  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1015  break;
1016  case 4:
1017  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1018  break;
1019  default:
1020  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1021  break;
1022  }
1023  } else {
1024  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1025  case 8:
1026  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1027  break;
1028  case 4:
1029  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1030  break;
1031  case 2:
1032  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1033  break;
1034  case 1:
1035  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1036  break;
1037  default:
1038  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1039  break;
1040  }
1041  }
1042  }
1043  return read_functions;
1044 }
1045 
1053 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1054  std::vector<ColumnarResults::ReadFunction>>
1056  const ResultSet& rows,
1057  const std::vector<size_t>& slot_idx_per_target_idx,
1058  const std::vector<bool>& targets_to_skip) {
1060  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1061  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1062 
1063  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1064  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1065  if (rows.didOutputColumnar()) {
1066  return std::make_tuple(
1067  std::move(write_functions),
1068  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1069  rows, slot_idx_per_target_idx, targets_to_skip));
1070  } else {
1071  return std::make_tuple(
1072  std::move(write_functions),
1073  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1074  rows, slot_idx_per_target_idx, targets_to_skip));
1075  }
1076  } else {
1077  if (rows.didOutputColumnar()) {
1078  return std::make_tuple(
1079  std::move(write_functions),
1080  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1081  rows, slot_idx_per_target_idx, targets_to_skip));
1082  } else {
1083  return std::make_tuple(
1084  std::move(write_functions),
1085  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1086  rows, slot_idx_per_target_idx, targets_to_skip));
1087  }
1088  }
1089 }
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< int8_t * > column_buffers_
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
const int8_t const int64_t * num_rows
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
#define UNREACHABLE()
Definition: Logger.h:234
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
CHECK(cgen_state)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define DEBUG_TIMER(name)
Definition: Logger.h:296
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const bool is_parallel_execution_enforced=false)
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
int cpu_threads()
Definition: thread_count.h:25
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)