OmniSciDB  04ee39c94c
ColumnarResults.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnarResults.h"
19 
20 #include "../Shared/thread_count.h"
21 
22 #include <atomic>
23 #include <future>
24 #include <numeric>
25 
26 namespace {
27 
28 inline int64_t fixed_encoding_nullable_val(const int64_t val,
29  const SQLTypeInfo& type_info) {
30  if (type_info.get_compression() != kENCODING_NONE) {
31  CHECK(type_info.get_compression() == kENCODING_FIXED ||
32  type_info.get_compression() == kENCODING_DICT);
33  auto logical_ti = get_logical_type_info(type_info);
34  if (val == inline_int_null_val(logical_ti)) {
35  return inline_fixed_encoding_null_val(type_info);
36  }
37  }
38  return val;
39 }
40 
41 } // namespace
42 
44  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
45  const ResultSet& rows,
46  const size_t num_columns,
47  const std::vector<SQLTypeInfo>& target_types)
48  : column_buffers_(num_columns)
49  , num_rows_(use_parallel_algorithms(rows) || rows.isDirectColumnarConversionPossible()
50  ? rows.entryCount()
51  : rows.rowCount())
52  , target_types_(target_types)
53  , parallel_conversion_(use_parallel_algorithms(rows))
54  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
55  column_buffers_.resize(num_columns);
56  for (size_t i = 0; i < num_columns; ++i) {
57  const bool is_varlen = target_types[i].is_array() ||
58  (target_types[i].is_string() &&
59  target_types[i].get_compression() == kENCODING_NONE) ||
60  target_types[i].is_geometry();
61  if (is_varlen) {
63  }
64  column_buffers_[i] =
65  reinterpret_cast<int8_t*>(checked_malloc(num_rows_ * target_types[i].get_size()));
66  row_set_mem_owner->addColBuffer(column_buffers_[i]);
67  }
68  std::atomic<size_t> row_idx{0};
69  const auto do_work = [num_columns, this](const std::vector<TargetValue>& crt_row,
70  const size_t row_idx) {
71  for (size_t i = 0; i < num_columns; ++i) {
72  writeBackCell(crt_row[i], row_idx, i);
73  }
74  };
75 
76  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
77  materializeAllColumns(rows, num_columns);
78  } else {
79  if (isParallelConversion()) {
80  const size_t worker_count = cpu_threads();
81  std::vector<std::future<void>> conversion_threads;
82  const auto entry_count = rows.entryCount();
83  for (size_t i = 0,
84  start_entry = 0,
85  stride = (entry_count + worker_count - 1) / worker_count;
86  i < worker_count && start_entry < entry_count;
87  ++i, start_entry += stride) {
88  const auto end_entry = std::min(start_entry + stride, entry_count);
89  conversion_threads.push_back(std::async(
90  std::launch::async,
91  [&rows, &do_work, &row_idx](const size_t start, const size_t end) {
92  for (size_t i = start; i < end; ++i) {
93  const auto crt_row = rows.getRowAtNoTranslations(i);
94  if (!crt_row.empty()) {
95  do_work(crt_row, row_idx.fetch_add(1));
96  }
97  }
98  },
99  start_entry,
100  end_entry));
101  }
102  for (auto& child : conversion_threads) {
103  child.wait();
104  }
105  for (auto& child : conversion_threads) {
106  child.get();
107  }
108  num_rows_ = row_idx;
109  rows.setCachedRowCount(num_rows_);
110  return;
111  }
112  while (true) {
113  const auto crt_row = rows.getNextRow(false, false);
114  if (crt_row.empty()) {
115  break;
116  }
117  do_work(crt_row, row_idx);
118  ++row_idx;
119  }
120  rows.moveToBegin();
121  }
122 }
123 
125  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
126  const int8_t* one_col_buffer,
127  const size_t num_rows,
128  const SQLTypeInfo& target_type)
129  : column_buffers_(1)
130  , num_rows_(num_rows)
131  , target_types_{target_type}
132  , parallel_conversion_(false)
133  , direct_columnar_conversion_(false) {
134  const bool is_varlen =
135  target_type.is_array() ||
136  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
137  target_type.is_geometry();
138  if (is_varlen) {
140  }
141  const auto buf_size = num_rows * target_type.get_size();
142  column_buffers_[0] = reinterpret_cast<int8_t*>(checked_malloc(buf_size));
143  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
144  row_set_mem_owner->addColBuffer(column_buffers_[0]);
145 }
146 
147 std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
148  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
149  const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
150  if (sub_results.empty()) {
151  return nullptr;
152  }
153  const auto total_row_count = std::accumulate(
154  sub_results.begin(),
155  sub_results.end(),
156  size_t(0),
157  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
158  return init + result->size();
159  });
160  std::unique_ptr<ColumnarResults> merged_results(
161  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
162  const auto col_count = sub_results[0]->column_buffers_.size();
163  const auto nonempty_it = std::find_if(
164  sub_results.begin(),
165  sub_results.end(),
166  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
167  if (nonempty_it == sub_results.end()) {
168  return nullptr;
169  }
170  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
171  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
172  auto write_ptr =
173  reinterpret_cast<int8_t*>(checked_malloc(byte_width * total_row_count));
174  merged_results->column_buffers_.push_back(write_ptr);
175  row_set_mem_owner->addColBuffer(write_ptr);
176  for (auto& rs : sub_results) {
177  CHECK_EQ(col_count, rs->column_buffers_.size());
178  if (!rs->size()) {
179  continue;
180  }
181  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
182  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
183  write_ptr += rs->size() * byte_width;
184  }
185  }
186  return merged_results;
187 }
188 
189 /*
190  * This function processes and decodes its input TargetValue
191  * and write it into its corresponding column buffer's cell (with corresponding
192  * row and column indices)
193  *
194  * NOTE: this is not supposed to be processing varlen types, and they should be
195  * handled differently outside this function.
196  */
197 inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
198  const size_t row_idx,
199  const size_t column_idx) {
200  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
201  CHECK(scalar_col_val);
202  auto i64_p = boost::get<int64_t>(scalar_col_val);
203  const auto& type_info = target_types_[column_idx];
204  if (i64_p) {
205  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
206  switch (target_types_[column_idx].get_size()) {
207  case 1:
208  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
209  break;
210  case 2:
211  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
212  break;
213  case 4:
214  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
215  break;
216  case 8:
217  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
218  break;
219  default:
220  CHECK(false);
221  }
222  } else {
223  CHECK(target_types_[column_idx].is_fp());
224  switch (target_types_[column_idx].get_type()) {
225  case kFLOAT: {
226  auto float_p = boost::get<float>(scalar_col_val);
227  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
228  break;
229  }
230  case kDOUBLE: {
231  auto double_p = boost::get<double>(scalar_col_val);
232  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
233  break;
234  }
235  default:
236  CHECK(false);
237  }
238  }
239 }
240 
246 template <typename DATA_TYPE>
248  const ResultSet& rows,
249  const size_t input_buffer_entry_idx,
250  const size_t output_buffer_entry_idx,
251  const size_t target_idx,
252  const size_t slot_idx,
253  const ReadFunctionPerfectHash& read_from_function) {
254  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
255  read_from_function(rows, input_buffer_entry_idx, slot_idx),
256  target_types_[target_idx]));
257  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
258  val;
259 }
260 
261 template <>
262 void ColumnarResults::writeBackCellDirect<float>(
263  const ResultSet& rows,
264  const size_t input_buffer_entry_idx,
265  const size_t output_buffer_entry_idx,
266  const size_t target_idx,
267  const size_t slot_idx,
268  const ReadFunctionPerfectHash& read_from_function) {
269  const int32_t ival = read_from_function(rows, input_buffer_entry_idx, slot_idx);
270  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
271  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
272 }
273 
274 template <>
275 void ColumnarResults::writeBackCellDirect<double>(
276  const ResultSet& rows,
277  const size_t input_buffer_entry_idx,
278  const size_t output_buffer_entry_idx,
279  const size_t target_idx,
280  const size_t slot_idx,
281  const ReadFunctionPerfectHash& read_from_function) {
282  const int64_t ival = read_from_function(rows, input_buffer_entry_idx, slot_idx);
283  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
284  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
285 }
286 
287 /*
288  * This function materializes all columns from the main storage and all appended storages
289  * and form a single continguous column for each output column. Depending on whether the
290  * column is lazily fetched or not, it will treat them differently.
291  *
292  * NOTE: this function should
293  * only be used when the result set is columnar and completely compacted (e.g., in
294  * columnar projections).
295  */
296 void ColumnarResults::materializeAllColumns(const ResultSet& rows,
297  const size_t num_columns) {
299  switch (rows.getQueryDescriptionType()) {
301  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
302 
303  // We can directly copy each non-lazy column's content
304  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
305 
306  // Only lazy columns are iterated through first and then materialized
307  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
308  } break;
310  materializeAllColumnsPerfectHash(rows, num_columns);
311  } break;
312  default:
313  UNREACHABLE()
314  << "Direct columnar conversion for this query type not supported yet.";
315  }
316 }
317 
318 /*
319  * For all non-lazy columns, we can directly copy back the results of each column's
320  * contents from different storages and put them into the corresponding output buffer.
321  *
322  * This function is parallelized through assigning each column to a CPU thread.
323  */
325  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
326  const ResultSet& rows,
327  const size_t num_columns) {
328  CHECK(rows.isDirectColumnarConversionPossible());
329  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
330  // Saman: make sure when this lazy_fetch_info is empty
331  if (lazy_fetch_info.empty()) {
332  return true;
333  } else {
334  return !lazy_fetch_info[col_idx].is_lazily_fetched;
335  }
336  };
337 
338  // parallelized by assigning each column to a thread
339  std::vector<std::future<void>> direct_copy_threads;
340  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
341  if (is_column_non_lazily_fetched(col_idx)) {
342  direct_copy_threads.push_back(std::async(
343  std::launch::async,
344  [&rows, this](const size_t column_index) {
345  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
346  rows.copyColumnIntoBuffer(
347  column_index, column_buffers_[column_index], column_size);
348  },
349  col_idx));
350  }
351  }
352 
353  for (auto& child : direct_copy_threads) {
354  child.wait();
355  }
356  for (auto& child : direct_copy_threads) {
357  child.get();
358  }
359 }
360 
361 /*
362  * For all lazy fetched columns, we should iterate through the column's content and
363  * properly materialize it.
364  *
365  * This function is parallelized through dividing total rows among all existing threads.
366  * Since there's no invalid element in the result set (e.g., columnar projections), the
367  * output buffer will have as many rows as there are in the result set, removing the need
368  * for atomicly incrementing the output buffer position.
369  */
371  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
372  const ResultSet& rows,
373  const size_t num_columns) {
374  CHECK(rows.isDirectColumnarConversionPossible());
375  const auto do_work_just_lazy_columns = [num_columns, this](
376  const std::vector<TargetValue>& crt_row,
377  const size_t row_idx,
378  const std::vector<bool>& targets_to_skip) {
379  for (size_t i = 0; i < num_columns; ++i) {
380  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
381  writeBackCell(crt_row[i], row_idx, i);
382  }
383  }
384  };
385 
386  const auto contains_lazy_fetched_column =
387  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
388  for (auto& col_info : lazy_fetch_info) {
389  if (col_info.is_lazily_fetched) {
390  return true;
391  }
392  }
393  return false;
394  };
395 
396  // parallelized by assigning a chunk of rows to each thread)
397  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty() ? true : false;
398  if (contains_lazy_fetched_column(lazy_fetch_info)) {
399  const size_t worker_count = use_parallel_algorithms(rows) ? cpu_threads() : 1;
400  std::vector<std::future<void>> conversion_threads;
401  const auto entry_count = rows.entryCount();
402  std::vector<bool> targets_to_skip;
403  if (skip_non_lazy_columns) {
404  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
405  targets_to_skip.reserve(num_columns);
406  for (size_t i = 0; i < num_columns; i++) {
407  // we process lazy columns (i.e., skip non-lazy columns)
408  targets_to_skip.push_back(lazy_fetch_info[i].is_lazily_fetched ? false : true);
409  }
410  }
411  for (size_t i = 0,
412  start_entry = 0,
413  stride = (entry_count + worker_count - 1) / worker_count;
414  i < worker_count && start_entry < entry_count;
415  ++i, start_entry += stride) {
416  const auto end_entry = std::min(start_entry + stride, entry_count);
417  conversion_threads.push_back(std::async(
418  std::launch::async,
419  [&rows, &do_work_just_lazy_columns, &targets_to_skip](const size_t start,
420  const size_t end) {
421  for (size_t i = start; i < end; ++i) {
422  const auto crt_row = rows.getRowAtNoTranslations(i, targets_to_skip);
423  do_work_just_lazy_columns(crt_row, i, targets_to_skip);
424  }
425  },
426  start_entry,
427  end_entry));
428  }
429 
430  for (auto& child : conversion_threads) {
431  child.wait();
432  }
433  for (auto& child : conversion_threads) {
434  child.get();
435  }
436  }
437 }
438 
439 /*
440  * It returns the corresponding entry in the generated column buffers
441  * These get functions are to be used for unit tests, and should not be used
442  * where performance matters.
443  */
444 template <typename ENTRY_TYPE>
445 ENTRY_TYPE ColumnarResults::getEntryAt(const size_t row_idx,
446  const size_t column_idx) const {
447  CHECK_LT(column_idx, column_buffers_.size());
448  CHECK_LT(row_idx, num_rows_);
449  return reinterpret_cast<ENTRY_TYPE*>(column_buffers_[column_idx])[row_idx];
450 }
451 template int64_t ColumnarResults::getEntryAt<int64_t>(const size_t row_idx,
452  const size_t column_idx) const;
453 template int32_t ColumnarResults::getEntryAt<int32_t>(const size_t row_idx,
454  const size_t column_idx) const;
455 template int16_t ColumnarResults::getEntryAt<int16_t>(const size_t row_idx,
456  const size_t column_idx) const;
457 template int8_t ColumnarResults::getEntryAt<int8_t>(const size_t row_idx,
458  const size_t column_idx) const;
459 
460 template <>
461 float ColumnarResults::getEntryAt<float>(const size_t row_idx,
462  const size_t column_idx) const {
463  CHECK_LT(column_idx, column_buffers_.size());
464  CHECK_LT(row_idx, num_rows_);
465  return reinterpret_cast<float*>(column_buffers_[column_idx])[row_idx];
466 }
467 
468 template <>
469 double ColumnarResults::getEntryAt<double>(const size_t row_idx,
470  const size_t column_idx) const {
471  CHECK_LT(column_idx, column_buffers_.size());
472  CHECK_LT(row_idx, num_rows_);
473  return reinterpret_cast<double*>(column_buffers_[column_idx])[row_idx];
474 }
475 
482  const size_t num_columns) {
483  CHECK(rows.isDirectColumnarConversionPossible() &&
484  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
485  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
486  const size_t entry_count = rows.entryCount();
487  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
488 
489  // step 1: compute total non-empty elements and store a bitmap per thread
490  std::vector<size_t> non_empty_per_thread(num_threads,
491  0); // number of non-empty entries per thread
492 
493  ColumnBitmap bitmap(num_threads * size_per_thread);
494 
496  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
497 
498  // step 2: go through the generated bitmap and copy/decode corresponding entries
499  // into the output buffer
501  bitmap,
502  non_empty_per_thread,
503  num_columns,
504  entry_count,
505  num_threads,
506  size_per_thread);
507 }
508 
510  const ResultSet& rows,
511  ColumnBitmap& bitmap,
512  std::vector<size_t>& non_empty_per_thread,
513  const size_t entry_count,
514  const size_t num_threads,
515  const size_t size_per_thread) const {
516  CHECK(rows.isDirectColumnarConversionPossible() &&
517  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
518  CHECK_EQ(num_threads, non_empty_per_thread.size());
519  auto locate_and_count_func =
520  [&rows, &bitmap, &non_empty_per_thread](
521  size_t start_index, size_t end_index, size_t thread_idx) {
522  size_t total_non_empty = 0;
523  size_t local_idx = 0;
524  for (size_t entry_idx = start_index; entry_idx < end_index;
525  entry_idx++, local_idx++) {
526  if (!rows.isRowAtEmpty(entry_idx)) {
527  total_non_empty++;
528  bitmap.set(entry_idx, true);
529  }
530  }
531  non_empty_per_thread[thread_idx] = total_non_empty;
532  };
533 
534  std::vector<std::future<void>> conversion_threads;
535  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
536  const size_t start_entry = thread_idx * size_per_thread;
537  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
538  conversion_threads.push_back(std::async(
539  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
540  }
541 
542  for (auto& child : conversion_threads) {
543  child.wait();
544  }
545  for (auto& child : conversion_threads) {
546  child.get();
547  }
548 }
549 
550 // TODO(Saman): if necessary, we can look into the distribution of non-empty entries
551 // and choose a different load-balanced strategy (assigning equal number of non-empties
552 // to each thread) as opposed to equal partitioning of the bitmap
554  const ResultSet& rows,
555  const ColumnBitmap& bitmap,
556  const std::vector<size_t>& non_empty_per_thread,
557  const size_t num_columns,
558  const size_t entry_count,
559  const size_t num_threads,
560  const size_t size_per_thread) {
561  CHECK(rows.isDirectColumnarConversionPossible() &&
562  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
563  CHECK_EQ(num_threads, non_empty_per_thread.size());
564 
565  // compute the exclusive scan over all non-empty totals
566  std::vector<size_t> global_offsets(num_threads + 1, 0);
567  std::partial_sum(non_empty_per_thread.begin(),
568  non_empty_per_thread.end(),
569  std::next(global_offsets.begin()));
570 
571  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
572  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
573  rows.getSingleSlotTargetBitmap();
574 
575  // We skip multi-slot targets (e.g., AVG), or single-slot targets where logical sized
576  // slot is not used (e.g., COUNT hidden in STDDEV) Those skipped targets are treated
577  // differently and accessed through result set's iterator
578  if (num_single_slot_targets < num_columns) {
580  bitmap,
581  non_empty_per_thread,
582  global_offsets,
583  single_slot_targets_to_skip,
584  slot_idx_per_target_idx,
585  num_columns,
586  entry_count,
587  num_threads,
588  size_per_thread);
589  } else {
591  bitmap,
592  non_empty_per_thread,
593  global_offsets,
594  slot_idx_per_target_idx,
595  num_columns,
596  entry_count,
597  num_threads,
598  size_per_thread);
599  }
600 }
601 
609  const ResultSet& rows,
610  const ColumnBitmap& bitmap,
611  const std::vector<size_t>& non_empty_per_thread,
612  const std::vector<size_t>& global_offsets,
613  const std::vector<bool>& targets_to_skip,
614  const std::vector<size_t>& slot_idx_per_target_idx,
615  const size_t num_columns,
616  const size_t entry_count,
617  const size_t num_threads,
618  const size_t size_per_thread) {
619  CHECK(rows.isDirectColumnarConversionPossible() &&
620  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
621 
622  const auto [write_functions, read_functions] = initAllConversionFunctionsPerfectHash(
623  rows, slot_idx_per_target_idx, targets_to_skip);
624  CHECK_EQ(write_functions.size(), num_columns);
625  CHECK_EQ(read_functions.size(), num_columns);
626 
627  auto compact_buffer_func = [this,
628  &rows,
629  &bitmap,
630  &global_offsets,
631  &non_empty_per_thread,
632  &num_columns,
633  &targets_to_skip,
634  &slot_idx_per_target_idx,
635  &write_functions,
636  &read_functions](const size_t start_index,
637  const size_t end_index,
638  const size_t thread_idx) {
639  const size_t total_non_empty = non_empty_per_thread[thread_idx];
640  size_t non_empty_idx = 0;
641  size_t local_idx = 0;
642  for (size_t entry_idx = start_index; entry_idx < end_index;
643  entry_idx++, local_idx++) {
644  if (non_empty_idx >= total_non_empty) {
645  // all non-empty entries has been written back
646  break;
647  }
648  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
649  if (bitmap.get(entry_idx)) {
650  // targets that are recovered from the result set iterators:
651  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
652  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
653  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
654  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
655  }
656  }
657  // targets that are copied directly without any translation/decoding from
658  // result set
659  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
660  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
661  continue;
662  }
663  write_functions[column_idx](rows,
664  entry_idx,
665  output_buffer_row_idx,
666  column_idx,
667  slot_idx_per_target_idx[column_idx],
668  read_functions[column_idx]);
669  }
670  non_empty_idx++;
671  } else {
672  continue;
673  }
674  }
675  };
676 
677  std::vector<std::future<void>> compaction_threads;
678  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
679  const size_t start_entry = thread_idx * size_per_thread;
680  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
681  compaction_threads.push_back(std::async(
682  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
683  }
684 
685  for (auto& child : compaction_threads) {
686  child.wait();
687  }
688  for (auto& child : compaction_threads) {
689  child.get();
690  }
691 }
692 
700  const ResultSet& rows,
701  const ColumnBitmap& bitmap,
702  const std::vector<size_t>& non_empty_per_thread,
703  const std::vector<size_t>& global_offsets,
704  const std::vector<size_t>& slot_idx_per_target_idx,
705  const size_t num_columns,
706  const size_t entry_count,
707  const size_t num_threads,
708  const size_t size_per_thread) {
709  CHECK(rows.isDirectColumnarConversionPossible() &&
710  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
711 
712  const auto [write_functions, read_functions] =
713  initAllConversionFunctionsPerfectHash(rows, slot_idx_per_target_idx);
714  CHECK_EQ(write_functions.size(), num_columns);
715  CHECK_EQ(read_functions.size(), num_columns);
716 
717  auto compact_buffer_func = [this,
718  &rows,
719  &bitmap,
720  &global_offsets,
721  &non_empty_per_thread,
722  &num_columns,
723  &slot_idx_per_target_idx,
724  &write_functions,
725  &read_functions](const size_t start_index,
726  const size_t end_index,
727  const size_t thread_idx) {
728  const size_t total_non_empty = non_empty_per_thread[thread_idx];
729 
730  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
731  size_t non_empty_idx = 0;
732  size_t local_idx = 0;
733  for (size_t entry_idx = start_index; entry_idx < end_index;
734  entry_idx++, local_idx++) {
735  if (non_empty_idx >= total_non_empty) {
736  // all non-empty entries has been written back
737  break;
738  }
739  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
740  if (bitmap.get(entry_idx)) {
741  write_functions[column_idx](rows,
742  entry_idx,
743  output_buffer_row_idx,
744  column_idx,
745  slot_idx_per_target_idx[column_idx],
746  read_functions[column_idx]);
747  non_empty_idx++;
748  }
749  }
750  }
751  };
752 
753  std::vector<std::future<void>> compaction_threads;
754  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
755  const size_t start_entry = thread_idx * size_per_thread;
756  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
757  compaction_threads.push_back(std::async(
758  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
759  }
760 
761  for (auto& child : compaction_threads) {
762  child.wait();
763  }
764  for (auto& child : compaction_threads) {
765  child.get();
766  }
767 }
768 
774 std::vector<ColumnarResults::WriteFunctionPerfectHash>
776  const std::vector<bool>& targets_to_skip) {
777  CHECK(rows.isDirectColumnarConversionPossible() &&
778  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
779  std::vector<WriteFunctionPerfectHash> result;
780  result.reserve(target_types_.size());
781 
782  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
783  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
784  result.emplace_back([](const ResultSet& rows,
785  const size_t input_buffer_entry_idx,
786  const size_t output_buffer_entry_idx,
787  const size_t target_idx,
788  const size_t slot_idx,
789  const ReadFunctionPerfectHash& read_function) {
790  UNREACHABLE() << "Invalid write back function used.";
791  });
792  continue;
793  }
794 
795  if (target_types_[target_idx].is_fp()) {
796  switch (target_types_[target_idx].get_size()) {
797  case 8:
798  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
799  this,
800  std::placeholders::_1,
801  std::placeholders::_2,
802  std::placeholders::_3,
803  std::placeholders::_4,
804  std::placeholders::_5,
805  std::placeholders::_6));
806  break;
807  case 4:
808  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
809  this,
810  std::placeholders::_1,
811  std::placeholders::_2,
812  std::placeholders::_3,
813  std::placeholders::_4,
814  std::placeholders::_5,
815  std::placeholders::_6));
816  break;
817  default:
818  UNREACHABLE() << "Invalid target type encountered.";
819  break;
820  }
821  } else {
822  switch (target_types_[target_idx].get_size()) {
823  case 8:
824  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
825  this,
826  std::placeholders::_1,
827  std::placeholders::_2,
828  std::placeholders::_3,
829  std::placeholders::_4,
830  std::placeholders::_5,
831  std::placeholders::_6));
832  break;
833  case 4:
834  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
835  this,
836  std::placeholders::_1,
837  std::placeholders::_2,
838  std::placeholders::_3,
839  std::placeholders::_4,
840  std::placeholders::_5,
841  std::placeholders::_6));
842  break;
843  case 2:
844  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
845  this,
846  std::placeholders::_1,
847  std::placeholders::_2,
848  std::placeholders::_3,
849  std::placeholders::_4,
850  std::placeholders::_5,
851  std::placeholders::_6));
852  break;
853  case 1:
854  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
855  this,
856  std::placeholders::_1,
857  std::placeholders::_2,
858  std::placeholders::_3,
859  std::placeholders::_4,
860  std::placeholders::_5,
861  std::placeholders::_6));
862  break;
863  default:
864  UNREACHABLE() << "Invalid target type encountered.";
865  break;
866  }
867  }
868  }
869  return result;
870 }
871 
876 std::vector<ColumnarResults::ReadFunctionPerfectHash>
878  const ResultSet& rows,
879  const std::vector<size_t>& slot_idx_per_target_idx,
880  const std::vector<bool>& targets_to_skip) {
881  CHECK(rows.isDirectColumnarConversionPossible() &&
882  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
883  std::vector<ReadFunctionPerfectHash> read_functions;
884  read_functions.reserve(target_types_.size());
885 
886  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
887  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
888  read_functions.emplace_back([](const ResultSet& rows,
889  const size_t input_buffer_entry_idx,
890  const size_t column_idx) {
891  UNREACHABLE() << "Invalid read function used, target should have been skipped.";
892  return static_cast<int64_t>(0);
893  });
894  continue;
895  }
896  if (target_types_[target_idx].is_fp()) {
897  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
898  case 8:
899  read_functions.emplace_back([](const ResultSet& rows,
900  const size_t input_buffer_entry_idx,
901  const size_t column_idx) {
902  auto dval = rows.getEntryAt<double>(input_buffer_entry_idx, column_idx);
903  return *reinterpret_cast<int64_t*>(may_alias_ptr(&dval));
904  });
905  break;
906  case 4:
907  read_functions.emplace_back([](const ResultSet& rows,
908  const size_t input_buffer_entry_idx,
909  const size_t column_idx) {
910  auto fval = rows.getEntryAt<float>(input_buffer_entry_idx, column_idx);
911  return *reinterpret_cast<int32_t*>(may_alias_ptr(&fval));
912  });
913  break;
914  default:
915  UNREACHABLE() << "Invalid target type encountered.";
916  break;
917  }
918  } else {
919  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
920  case 8:
921  read_functions.emplace_back([](const ResultSet& rows,
922  const size_t input_buffer_entry_idx,
923  const size_t column_idx) {
924  return rows.getEntryAt<int64_t>(input_buffer_entry_idx, column_idx);
925  });
926  break;
927  case 4:
928  read_functions.emplace_back([](const ResultSet& rows,
929  const size_t input_buffer_entry_idx,
930  const size_t column_idx) {
931  return rows.getEntryAt<int32_t>(input_buffer_entry_idx, column_idx);
932  });
933  break;
934  case 2:
935  read_functions.emplace_back([](const ResultSet& rows,
936  const size_t input_buffer_entry_idx,
937  const size_t column_idx) {
938  return rows.getEntryAt<int16_t>(input_buffer_entry_idx, column_idx);
939  });
940  break;
941  case 1:
942  read_functions.emplace_back([](const ResultSet& rows,
943  const size_t input_buffer_entry_idx,
944  const size_t column_idx) {
945  return rows.getEntryAt<int8_t>(input_buffer_entry_idx, column_idx);
946  });
947  break;
948  default:
949  UNREACHABLE() << "Invalid slot size encountered.";
950  break;
951  }
952  }
953  }
954  return read_functions;
955 }
956 
957 std::tuple<std::vector<ColumnarResults::WriteFunctionPerfectHash>,
958  std::vector<ColumnarResults::ReadFunctionPerfectHash>>
960  const ResultSet& rows,
961  const std::vector<size_t>& slot_idx_per_target_idx,
962  const std::vector<bool>& targets_to_skip) {
963  CHECK(rows.isDirectColumnarConversionPossible() &&
964  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash);
965  return std::make_tuple(std::move(initWriteFunctionsPerfectHash(rows, targets_to_skip)),
967  rows, slot_idx_per_target_idx, targets_to_skip)));
968 }
void compactAndCopyEntriesPHWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
void materializeAllColumnsPerfectHash(const ResultSet &rows, const size_t num_columns)
std::vector< int8_t * > column_buffers_
bool isParallelConversion() const
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
const int8_t const int64_t * num_rows
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
void compactAndCopyEntriesPerfectHash(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunctionPerfectHash &read_function)
#define UNREACHABLE()
Definition: Logger.h:231
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
std::vector< ReadFunctionPerfectHash > initReadFunctionsPerfectHash(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool direct_columnar_conversion_
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
std::vector< WriteFunctionPerfectHash > initWriteFunctionsPerfectHash(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
void materializeAllColumns(const ResultSet &rows, const size_t num_columns)
std::tuple< std::vector< WriteFunctionPerfectHash >, std::vector< ReadFunctionPerfectHash > > initAllConversionFunctionsPerfectHash(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void init(LogOptions const &log_opts)
Definition: Logger.cpp:260
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::function< int64_t(const ResultSet &, const size_t, const size_t)> ReadFunctionPerfectHash
EntryT getEntryAt(const size_t row_idx, const size_t column_idx) const
#define CHECK_LT(x, y)
Definition: Logger.h:197
void compactAndCopyEntriesPHWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void locateAndCountEntriesPerfectHash(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types)
bool isDirectColumnarConversionPossible() const
int cpu_threads()
Definition: thread_count.h:23
const std::vector< SQLTypeInfo > target_types_
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)