OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults Class Reference

#include <ColumnarResults.h>

Public Types

using ReadFunction = std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)>
 
using WriteFunction = std::function< void(const ResultSet &, const size_t, const size_t, const size_t, const size_t, const ReadFunction &)>
 

Public Member Functions

 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const bool is_parallel_execution_enforced=false)
 
 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const int8_t *one_col_buffer, const size_t num_rows, const SQLTypeInfo &target_type)
 
const std::vector< int8_t * > & getColumnBuffers () const
 
const size_t size () const
 
const SQLTypeInfogetColumnType (const int col_id) const
 
bool isParallelConversion () const
 
bool isDirectColumnarConversionPossible () const
 

Static Public Member Functions

static std::unique_ptr
< ColumnarResults
mergeResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
 

Protected Attributes

std::vector< int8_t * > column_buffers_
 
size_t num_rows_
 

Private Member Functions

 ColumnarResults (const size_t num_rows, const std::vector< SQLTypeInfo > &target_types)
 
void writeBackCell (const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
 
void materializeAllColumnsDirectly (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsThroughIteration (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsGroupBy (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsProjection (const ResultSet &rows, const size_t num_columns)
 
void copyAllNonLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void materializeAllLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void locateAndCountEntries (const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
 
void compactAndCopyEntries (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithoutTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
template<typename DATA_TYPE >
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
 
std::vector< WriteFunctioninitWriteFunctions (const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
 
template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ReadFunctioninitReadFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
std::tuple< std::vector
< WriteFunction >, std::vector
< ReadFunction > > 
initAllConversionFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 

Private Attributes

const std::vector< SQLTypeInfotarget_types_
 
bool parallel_conversion_
 
bool direct_columnar_conversion_
 

Detailed Description

Definition at line 61 of file ColumnarResults.h.

Member Typedef Documentation

using ColumnarResults::ReadFunction = std::function<int64_t(const ResultSet&, const size_t, const size_t, const size_t)>

Definition at line 94 of file ColumnarResults.h.

using ColumnarResults::WriteFunction = std::function<void(const ResultSet&, const size_t, const size_t, const size_t, const size_t, const ReadFunction&)>

Definition at line 103 of file ColumnarResults.h.

Constructor & Destructor Documentation

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const ResultSet rows,
const size_t  num_columns,
const std::vector< SQLTypeInfo > &  target_types,
const bool  is_parallel_execution_enforced = false 
)

Definition at line 43 of file ColumnarResults.cpp.

References checked_malloc(), column_buffers_, DEBUG_TIMER, isDirectColumnarConversionPossible(), kENCODING_NONE, materializeAllColumnsDirectly(), materializeAllColumnsThroughIteration(), and num_rows_.

Referenced by mergeResults().

49  : column_buffers_(num_columns)
50  , num_rows_(use_parallel_algorithms(rows) || rows.isDirectColumnarConversionPossible()
51  ? rows.entryCount()
52  : rows.rowCount())
53  , target_types_(target_types)
54  , parallel_conversion_(is_parallel_execution_enforced ? true
56  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
57  auto timer = DEBUG_TIMER(__func__);
58  column_buffers_.resize(num_columns);
59  for (size_t i = 0; i < num_columns; ++i) {
60  const bool is_varlen = target_types[i].is_array() ||
61  (target_types[i].is_string() &&
62  target_types[i].get_compression() == kENCODING_NONE) ||
63  target_types[i].is_geometry();
64  if (is_varlen) {
66  }
67  column_buffers_[i] =
68  reinterpret_cast<int8_t*>(checked_malloc(num_rows_ * target_types[i].get_size()));
69  row_set_mem_owner->addColBuffer(column_buffers_[i]);
70  }
71 
72  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
73  materializeAllColumnsDirectly(rows, num_columns);
74  } else {
75  materializeAllColumnsThroughIteration(rows, num_columns);
76  }
77 }
std::vector< int8_t * > column_buffers_
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
bool direct_columnar_conversion_
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
bool isDirectColumnarConversionPossible() const
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
#define DEBUG_TIMER(name)
Definition: Logger.h:296
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const int8_t *  one_col_buffer,
const size_t  num_rows,
const SQLTypeInfo target_type 
)

Definition at line 79 of file ColumnarResults.cpp.

84  : column_buffers_(1)
86  , target_types_{target_type}
87  , parallel_conversion_(false)
89  auto timer = DEBUG_TIMER(__func__);
90  const bool is_varlen =
91  target_type.is_array() ||
92  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
93  target_type.is_geometry();
94  if (is_varlen) {
96  }
97  const auto buf_size = num_rows * target_type.get_size();
98  column_buffers_[0] = reinterpret_cast<int8_t*>(checked_malloc(buf_size));
99  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
100  row_set_mem_owner->addColBuffer(column_buffers_[0]);
101 }
std::vector< int8_t * > column_buffers_
const int8_t const int64_t * num_rows
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
bool direct_columnar_conversion_
bool is_array() const
Definition: sqltypes.h:485
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
bool is_geometry() const
Definition: sqltypes.h:489
bool is_string() const
Definition: sqltypes.h:477
#define DEBUG_TIMER(name)
Definition: Logger.h:296
const std::vector< SQLTypeInfo > target_types_
ColumnarResults::ColumnarResults ( const size_t  num_rows,
const std::vector< SQLTypeInfo > &  target_types 
)
inlineprivate

Definition at line 110 of file ColumnarResults.h.

111  : num_rows_(num_rows), target_types_(target_types) {}
const int8_t const int64_t * num_rows
const std::vector< SQLTypeInfo > target_types_

Member Function Documentation

void ColumnarResults::compactAndCopyEntries ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This function goes through all non-empty elements marked in the bitmap data structure, and store them back into output column buffers. The output column buffers are compacted without any holes in it.

TODO(Saman): if necessary, we can look into the distribution of non-empty entries and choose a different load-balanced strategy (assigning equal number of non-empties to each thread) as opposed to equal partitioning of the bitmap

Definition at line 552 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, compactAndCopyEntriesWithoutTargetSkipping(), compactAndCopyEntriesWithTargetSkipping(), GroupByBaselineHash, GroupByPerfectHash, and isDirectColumnarConversionPossible().

Referenced by materializeAllColumnsGroupBy().

559  {
561  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
562  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
563  CHECK_EQ(num_threads, non_empty_per_thread.size());
564 
565  // compute the exclusive scan over all non-empty totals
566  std::vector<size_t> global_offsets(num_threads + 1, 0);
567  std::partial_sum(non_empty_per_thread.begin(),
568  non_empty_per_thread.end(),
569  std::next(global_offsets.begin()));
570 
571  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
572  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
573  rows.getSupportedSingleSlotTargetBitmap();
574 
575  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
576  // differently and accessed through result set's iterator
577  if (num_single_slot_targets < num_columns) {
579  bitmap,
580  non_empty_per_thread,
581  global_offsets,
582  single_slot_targets_to_skip,
583  slot_idx_per_target_idx,
584  num_columns,
585  entry_count,
586  num_threads,
587  size_per_thread);
588  } else {
590  bitmap,
591  non_empty_per_thread,
592  global_offsets,
593  slot_idx_per_target_idx,
594  num_columns,
595  entry_count,
596  num_threads,
597  size_per_thread);
598  }
599 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithoutTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, all targets are assumed to be single-slot and thus can be directly columnarized.

Definition at line 696 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntries().

705  {
707  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
708  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
709 
710  const auto [write_functions, read_functions] =
711  initAllConversionFunctions(rows, slot_idx_per_target_idx);
712  CHECK_EQ(write_functions.size(), num_columns);
713  CHECK_EQ(read_functions.size(), num_columns);
714 
715  auto compact_buffer_func = [&rows,
716  &bitmap,
717  &global_offsets,
718  &non_empty_per_thread,
719  &num_columns,
720  &slot_idx_per_target_idx,
721  &write_functions = write_functions,
722  &read_functions = read_functions](const size_t start_index,
723  const size_t end_index,
724  const size_t thread_idx) {
725  const size_t total_non_empty = non_empty_per_thread[thread_idx];
726  size_t non_empty_idx = 0;
727  size_t local_idx = 0;
728  for (size_t entry_idx = start_index; entry_idx < end_index;
729  entry_idx++, local_idx++) {
730  if (non_empty_idx >= total_non_empty) {
731  // all non-empty entries has been written back
732  break;
733  }
734  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
735  if (bitmap.get(local_idx, thread_idx)) {
736  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
737  write_functions[column_idx](rows,
738  entry_idx,
739  output_buffer_row_idx,
740  column_idx,
741  slot_idx_per_target_idx[column_idx],
742  read_functions[column_idx]);
743  }
744  non_empty_idx++;
745  } else {
746  continue;
747  }
748  }
749  };
750 
751  std::vector<std::future<void>> compaction_threads;
752  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
753  const size_t start_entry = thread_idx * size_per_thread;
754  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
755  compaction_threads.push_back(std::async(
756  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
757  }
758 
759  for (auto& child : compaction_threads) {
760  child.wait();
761  }
762 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
CHECK(cgen_state)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< bool > &  targets_to_skip,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, multi-slot targets (e.g., AVG) are treated with the existing result set's iterations, but everything else is directly columnarized.

Definition at line 607 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), and writeBackCell().

Referenced by compactAndCopyEntries().

617  {
619  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
620  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
621 
622  const auto [write_functions, read_functions] =
623  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
624  CHECK_EQ(write_functions.size(), num_columns);
625  CHECK_EQ(read_functions.size(), num_columns);
626 
627  auto compact_buffer_func = [this,
628  &rows,
629  &bitmap,
630  &global_offsets,
631  &non_empty_per_thread,
632  &num_columns,
633  &targets_to_skip,
634  &slot_idx_per_target_idx,
635  &write_functions = write_functions,
636  &read_functions = read_functions](const size_t start_index,
637  const size_t end_index,
638  const size_t thread_idx) {
639  const size_t total_non_empty = non_empty_per_thread[thread_idx];
640  size_t non_empty_idx = 0;
641  size_t local_idx = 0;
642  for (size_t entry_idx = start_index; entry_idx < end_index;
643  entry_idx++, local_idx++) {
644  if (non_empty_idx >= total_non_empty) {
645  // all non-empty entries has been written back
646  break;
647  }
648  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
649  if (bitmap.get(local_idx, thread_idx)) {
650  // targets that are recovered from the result set iterators:
651  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
652  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
653  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
654  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
655  }
656  }
657  // targets that are copied directly without any translation/decoding from
658  // result set
659  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
660  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
661  continue;
662  }
663  write_functions[column_idx](rows,
664  entry_idx,
665  output_buffer_row_idx,
666  column_idx,
667  slot_idx_per_target_idx[column_idx],
668  read_functions[column_idx]);
669  }
670  non_empty_idx++;
671  } else {
672  continue;
673  }
674  }
675  };
676 
677  std::vector<std::future<void>> compaction_threads;
678  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
679  const size_t start_entry = thread_idx * size_per_thread;
680  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
681  compaction_threads.push_back(std::async(
682  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
683  }
684 
685  for (auto& child : compaction_threads) {
686  child.wait();
687  }
688 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
CHECK(cgen_state)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::copyAllNonLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 355 of file ColumnarResults.cpp.

References CHECK(), column_buffers_, isDirectColumnarConversionPossible(), num_rows_, and target_types_.

Referenced by materializeAllColumnsProjection().

358  {
360  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
361  // Saman: make sure when this lazy_fetch_info is empty
362  if (lazy_fetch_info.empty()) {
363  return true;
364  } else {
365  return !lazy_fetch_info[col_idx].is_lazily_fetched;
366  }
367  };
368 
369  // parallelized by assigning each column to a thread
370  std::vector<std::future<void>> direct_copy_threads;
371  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
372  if (is_column_non_lazily_fetched(col_idx)) {
373  direct_copy_threads.push_back(std::async(
374  std::launch::async,
375  [&rows, this](const size_t column_index) {
376  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
377  rows.copyColumnIntoBuffer(
378  column_index, column_buffers_[column_index], column_size);
379  },
380  col_idx));
381  }
382  }
383 
384  for (auto& child : direct_copy_threads) {
385  child.wait();
386  }
387 }
std::vector< int8_t * > column_buffers_
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::vector<int8_t*>& ColumnarResults::getColumnBuffers ( ) const
inline

Definition at line 78 of file ColumnarResults.h.

References column_buffers_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

78 { return column_buffers_; }
std::vector< int8_t * > column_buffers_

+ Here is the caller graph for this function:

const SQLTypeInfo& ColumnarResults::getColumnType ( const int  col_id) const
inline

Definition at line 82 of file ColumnarResults.h.

References CHECK_GE, CHECK_LT, and target_types_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

82  {
83  CHECK_GE(col_id, 0);
84  CHECK_LT(static_cast<size_t>(col_id), target_types_.size());
85  return target_types_[col_id];
86  }
#define CHECK_GE(x, y)
Definition: Logger.h:203
#define CHECK_LT(x, y)
Definition: Logger.h:200
const std::vector< SQLTypeInfo > target_types_

+ Here is the caller graph for this function:

std::tuple< std::vector< ColumnarResults::WriteFunction >, std::vector< ColumnarResults::ReadFunction > > ColumnarResults::initAllConversionFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

This function goes through all target types in the output, and chooses appropriate write and read functions per target. The goal is then to simply use these functions for each row and per target. Read functions are used to read each cell's data content (particular target in a row), and write functions are used to properly write back the cell's content into the output column buffers.

Definition at line 1055 of file ColumnarResults.cpp.

References CHECK(), GroupByBaselineHash, GroupByPerfectHash, initWriteFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntriesWithoutTargetSkipping(), and compactAndCopyEntriesWithTargetSkipping().

1058  {
1060  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1061  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1062 
1063  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1064  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1065  if (rows.didOutputColumnar()) {
1066  return std::make_tuple(
1067  std::move(write_functions),
1068  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1069  rows, slot_idx_per_target_idx, targets_to_skip));
1070  } else {
1071  return std::make_tuple(
1072  std::move(write_functions),
1073  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1074  rows, slot_idx_per_target_idx, targets_to_skip));
1075  }
1076  } else {
1077  if (rows.didOutputColumnar()) {
1078  return std::make_tuple(
1079  std::move(write_functions),
1080  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1081  rows, slot_idx_per_target_idx, targets_to_skip));
1082  } else {
1083  return std::make_tuple(
1084  std::move(write_functions),
1085  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1086  rows, slot_idx_per_target_idx, targets_to_skip));
1087  }
1088  }
1089 }
CHECK(cgen_state)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ColumnarResults::ReadFunction > ColumnarResults::initReadFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initializes a set of read funtions to properly access the contents of the result set's storage buffer. Each particular read function is chosen based on the data type and data size used to store that target in the result set's storage buffer. These functions are then used for each row in the result set.

Definition at line 957 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, GroupByBaselineHash, anonymous_namespace{ColumnarResults.cpp}::invalid_read_func(), isDirectColumnarConversionPossible(), kDOUBLE, kFLOAT, target_types_, and UNREACHABLE.

960  {
962  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
963  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
964 
965  std::vector<ReadFunction> read_functions;
966  read_functions.reserve(target_types_.size());
967 
968  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
969  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
970  // for targets that should be skipped, we use a placeholder function that should
971  // never be called. The CHECKs inside it make sure that never happens.
972  read_functions.emplace_back(invalid_read_func);
973  continue;
974  }
975 
976  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
977  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
978  // for key columns only
979  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
980  if (target_types_[target_idx].is_fp()) {
981  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
982  switch (target_types_[target_idx].get_type()) {
983  case kFLOAT:
984  read_functions.emplace_back(
985  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
986  break;
987  case kDOUBLE:
988  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
989  break;
990  default:
991  UNREACHABLE()
992  << "Invalid data type encountered (BaselineHash, floating point key).";
993  break;
994  }
995  } else {
996  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
997  case 8:
998  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
999  break;
1000  case 4:
1001  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1002  break;
1003  default:
1004  UNREACHABLE()
1005  << "Invalid data type encountered (BaselineHash, integer key).";
1006  }
1007  }
1008  continue;
1009  }
1010  }
1011  if (target_types_[target_idx].is_fp()) {
1012  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1013  case 8:
1014  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1015  break;
1016  case 4:
1017  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1018  break;
1019  default:
1020  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1021  break;
1022  }
1023  } else {
1024  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1025  case 8:
1026  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1027  break;
1028  case 4:
1029  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1030  break;
1031  case 2:
1032  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1033  break;
1034  case 1:
1035  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1036  break;
1037  default:
1038  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1039  break;
1040  }
1041  }
1042  }
1043  return read_functions;
1044 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
#define UNREACHABLE()
Definition: Logger.h:234
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

std::vector< ColumnarResults::WriteFunction > ColumnarResults::initWriteFunctions ( const ResultSet rows,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initialize a set of write functions per target (i.e., column). Target types' logical size are used to categorize the correct write function per target. These functions are then used for every row in the result set.

Definition at line 769 of file ColumnarResults.cpp.

References CHECK(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), run_benchmark_import::result, target_types_, and UNREACHABLE.

Referenced by initAllConversionFunctions().

771  {
773  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
774  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
775 
776  std::vector<WriteFunction> result;
777  result.reserve(target_types_.size());
778 
779  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
780  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
781  result.emplace_back([](const ResultSet& rows,
782  const size_t input_buffer_entry_idx,
783  const size_t output_buffer_entry_idx,
784  const size_t target_idx,
785  const size_t slot_idx,
786  const ReadFunction& read_function) {
787  UNREACHABLE() << "Invalid write back function used.";
788  });
789  continue;
790  }
791 
792  if (target_types_[target_idx].is_fp()) {
793  switch (target_types_[target_idx].get_size()) {
794  case 8:
795  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
796  this,
797  std::placeholders::_1,
798  std::placeholders::_2,
799  std::placeholders::_3,
800  std::placeholders::_4,
801  std::placeholders::_5,
802  std::placeholders::_6));
803  break;
804  case 4:
805  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
806  this,
807  std::placeholders::_1,
808  std::placeholders::_2,
809  std::placeholders::_3,
810  std::placeholders::_4,
811  std::placeholders::_5,
812  std::placeholders::_6));
813  break;
814  default:
815  UNREACHABLE() << "Invalid target type encountered.";
816  break;
817  }
818  } else {
819  switch (target_types_[target_idx].get_size()) {
820  case 8:
821  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
822  this,
823  std::placeholders::_1,
824  std::placeholders::_2,
825  std::placeholders::_3,
826  std::placeholders::_4,
827  std::placeholders::_5,
828  std::placeholders::_6));
829  break;
830  case 4:
831  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
832  this,
833  std::placeholders::_1,
834  std::placeholders::_2,
835  std::placeholders::_3,
836  std::placeholders::_4,
837  std::placeholders::_5,
838  std::placeholders::_6));
839  break;
840  case 2:
841  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
842  this,
843  std::placeholders::_1,
844  std::placeholders::_2,
845  std::placeholders::_3,
846  std::placeholders::_4,
847  std::placeholders::_5,
848  std::placeholders::_6));
849  break;
850  case 1:
851  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
852  this,
853  std::placeholders::_1,
854  std::placeholders::_2,
855  std::placeholders::_3,
856  std::placeholders::_4,
857  std::placeholders::_5,
858  std::placeholders::_6));
859  break;
860  default:
861  UNREACHABLE() << "Invalid target type encountered.";
862  break;
863  }
864  }
865  }
866  return result;
867 }
#define UNREACHABLE()
Definition: Logger.h:234
CHECK(cgen_state)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
bool isDirectColumnarConversionPossible() const
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ColumnarResults::isDirectColumnarConversionPossible ( ) const
inline
bool ColumnarResults::isParallelConversion ( ) const
inline

Definition at line 88 of file ColumnarResults.h.

References parallel_conversion_.

Referenced by materializeAllColumnsGroupBy(), and materializeAllColumnsThroughIteration().

88 { return parallel_conversion_; }

+ Here is the caller graph for this function:

void ColumnarResults::locateAndCountEntries ( const ResultSet rows,
ColumnBitmap bitmap,
std::vector< size_t > &  non_empty_per_thread,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
) const
private

This function goes through all the keys in the result set, and count the total number of non-empty keys. It also store the location of non-empty keys in a bitmap data structure for later faster access.

Definition at line 505 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, GroupByBaselineHash, GroupByPerfectHash, and isDirectColumnarConversionPossible().

Referenced by materializeAllColumnsGroupBy().

510  {
512  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
513  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
514  CHECK_EQ(num_threads, non_empty_per_thread.size());
515  auto locate_and_count_func =
516  [&rows, &bitmap, &non_empty_per_thread](
517  size_t start_index, size_t end_index, size_t thread_idx) {
518  size_t total_non_empty = 0;
519  size_t local_idx = 0;
520  for (size_t entry_idx = start_index; entry_idx < end_index;
521  entry_idx++, local_idx++) {
522  if (!rows.isRowAtEmpty(entry_idx)) {
523  total_non_empty++;
524  bitmap.set(local_idx, thread_idx, true);
525  }
526  }
527  non_empty_per_thread[thread_idx] = total_non_empty;
528  };
529 
530  std::vector<std::future<void>> conversion_threads;
531  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
532  const size_t start_entry = thread_idx * size_per_thread;
533  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
534  conversion_threads.push_back(std::async(
535  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
536  }
537 
538  for (auto& child : conversion_threads) {
539  child.wait();
540  }
541 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsDirectly ( const ResultSet rows,
const size_t  num_columns 
)
private

This function materializes all columns from the main storage and all appended storages and form a single continguous column for each output column. Depending on whether the column is lazily fetched or not, it will treat them differently.

NOTE: this function should only be used when the result set is columnar and completely compacted (e.g., in columnar projections).

Definition at line 307 of file ColumnarResults.cpp.

References CHECK(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), materializeAllColumnsGroupBy(), materializeAllColumnsProjection(), Projection, and UNREACHABLE.

Referenced by ColumnarResults().

308  {
310  switch (rows.getQueryDescriptionType()) {
312  materializeAllColumnsProjection(rows, num_columns);
313  break;
314  }
317  materializeAllColumnsGroupBy(rows, num_columns);
318  break;
319  }
320  default:
321  UNREACHABLE()
322  << "Direct columnar conversion for this query type is not supported yet.";
323  }
324 }
#define UNREACHABLE()
Definition: Logger.h:234
CHECK(cgen_state)
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsGroupBy ( const ResultSet rows,
const size_t  num_columns 
)
private

This function is to directly columnarize a result set for group by queries. Its main difference with the traditional alternative is that it directly reads non-empty entries from the result set, and then writes them into output column buffers, rather than using the result set's iterators.

Definition at line 470 of file ColumnarResults.cpp.

References CHECK(), compactAndCopyEntries(), cpu_threads(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), isParallelConversion(), and locateAndCountEntries().

Referenced by materializeAllColumnsDirectly().

471  {
473  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
474  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
475 
476  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
477  const size_t entry_count = rows.entryCount();
478  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
479 
480  // step 1: compute total non-empty elements and store a bitmap per thread
481  std::vector<size_t> non_empty_per_thread(num_threads,
482  0); // number of non-empty entries per thread
483 
484  ColumnBitmap bitmap(size_per_thread, num_threads);
485 
487  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
488 
489  // step 2: go through the generated bitmap and copy/decode corresponding entries
490  // into the output buffer
492  bitmap,
493  non_empty_per_thread,
494  num_columns,
495  entry_count,
496  num_threads,
497  size_per_thread);
498 }
bool isParallelConversion() const
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsProjection ( const ResultSet rows,
const size_t  num_columns 
)
private

This function handles materialization for two types of columns in columnar projections:

  1. for all non-lazy columns, it directly copies the results from the result set's storage into the output column buffers
  2. for all lazy fetched columns, it uses result set's iterators to decode the proper values before storing them into the output column buffers

Definition at line 333 of file ColumnarResults.cpp.

References CHECK(), copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), materializeAllLazyColumns(), and Projection.

Referenced by materializeAllColumnsDirectly().

334  {
335  CHECK(rows.query_mem_desc_.didOutputColumnar());
337  rows.query_mem_desc_.getQueryDescriptionType() ==
339 
340  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
341 
342  // We can directly copy each non-lazy column's content
343  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
344 
345  // Only lazy columns are iterated through first and then materialized
346  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
347 }
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsThroughIteration ( const ResultSet rows,
const size_t  num_columns 
)
private

This function iterates through the result set (using the getRowAtNoTranslation and getNextRow family of functions) and writes back the results into output column buffers.

Definition at line 149 of file ColumnarResults.cpp.

References cpu_threads(), isParallelConversion(), num_rows_, and writeBackCell().

Referenced by ColumnarResults().

150  {
151  std::atomic<size_t> row_idx{0};
152  const auto do_work = [num_columns, this](const std::vector<TargetValue>& crt_row,
153  const size_t row_idx) {
154  for (size_t i = 0; i < num_columns; ++i) {
155  writeBackCell(crt_row[i], row_idx, i);
156  }
157  };
158  if (isParallelConversion()) {
159  const size_t worker_count = cpu_threads();
160  std::vector<std::future<void>> conversion_threads;
161  const auto entry_count = rows.entryCount();
162  for (size_t i = 0,
163  start_entry = 0,
164  stride = (entry_count + worker_count - 1) / worker_count;
165  i < worker_count && start_entry < entry_count;
166  ++i, start_entry += stride) {
167  const auto end_entry = std::min(start_entry + stride, entry_count);
168  conversion_threads.push_back(std::async(
169  std::launch::async,
170  [&rows, &do_work, &row_idx](const size_t start, const size_t end) {
171  for (size_t i = start; i < end; ++i) {
172  const auto crt_row = rows.getRowAtNoTranslations(i);
173  if (!crt_row.empty()) {
174  do_work(crt_row, row_idx.fetch_add(1));
175  }
176  }
177  },
178  start_entry,
179  end_entry));
180  }
181  for (auto& child : conversion_threads) {
182  child.wait();
183  }
184 
185  num_rows_ = row_idx;
186  rows.setCachedRowCount(num_rows_);
187  return;
188  }
189  while (true) {
190  const auto crt_row = rows.getNextRow(false, false);
191  if (crt_row.empty()) {
192  break;
193  }
194  do_work(crt_row, row_idx);
195  ++row_idx;
196  }
197  rows.moveToBegin();
198 }
bool isParallelConversion() const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

For all lazy fetched columns, we should iterate through the column's content and properly materialize it.

This function is parallelized through dividing total rows among all existing threads. Since there's no invalid element in the result set (e.g., columnar projections), the output buffer will have as many rows as there are in the result set, removing the need for atomicly incrementing the output buffer position.

Definition at line 398 of file ColumnarResults.cpp.

References CHECK(), CHECK_EQ, cpu_threads(), isDirectColumnarConversionPossible(), use_parallel_algorithms(), and writeBackCell().

Referenced by materializeAllColumnsProjection().

401  {
403  const auto do_work_just_lazy_columns = [num_columns, this](
404  const std::vector<TargetValue>& crt_row,
405  const size_t row_idx,
406  const std::vector<bool>& targets_to_skip) {
407  for (size_t i = 0; i < num_columns; ++i) {
408  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
409  writeBackCell(crt_row[i], row_idx, i);
410  }
411  }
412  };
413 
414  const auto contains_lazy_fetched_column =
415  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
416  for (auto& col_info : lazy_fetch_info) {
417  if (col_info.is_lazily_fetched) {
418  return true;
419  }
420  }
421  return false;
422  };
423 
424  // parallelized by assigning a chunk of rows to each thread)
425  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty() ? true : false;
426  if (contains_lazy_fetched_column(lazy_fetch_info)) {
427  const size_t worker_count = use_parallel_algorithms(rows) ? cpu_threads() : 1;
428  std::vector<std::future<void>> conversion_threads;
429  const auto entry_count = rows.entryCount();
430  std::vector<bool> targets_to_skip;
431  if (skip_non_lazy_columns) {
432  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
433  targets_to_skip.reserve(num_columns);
434  for (size_t i = 0; i < num_columns; i++) {
435  // we process lazy columns (i.e., skip non-lazy columns)
436  targets_to_skip.push_back(lazy_fetch_info[i].is_lazily_fetched ? false : true);
437  }
438  }
439  for (size_t i = 0,
440  start_entry = 0,
441  stride = (entry_count + worker_count - 1) / worker_count;
442  i < worker_count && start_entry < entry_count;
443  ++i, start_entry += stride) {
444  const auto end_entry = std::min(start_entry + stride, entry_count);
445  conversion_threads.push_back(std::async(
446  std::launch::async,
447  [&rows, &do_work_just_lazy_columns, &targets_to_skip](const size_t start,
448  const size_t end) {
449  for (size_t i = start; i < end; ++i) {
450  const auto crt_row = rows.getRowAtNoTranslations(i, targets_to_skip);
451  do_work_just_lazy_columns(crt_row, i, targets_to_skip);
452  }
453  },
454  start_entry,
455  end_entry));
456  }
457 
458  for (auto& child : conversion_threads) {
459  child.wait();
460  }
461  }
462 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
CHECK(cgen_state)
bool isDirectColumnarConversionPossible() const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< ColumnarResults > ColumnarResults::mergeResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const std::vector< std::unique_ptr< ColumnarResults >> &  sub_results 
)
static

Definition at line 103 of file ColumnarResults.cpp.

References CHECK_EQ, checked_malloc(), ColumnarResults(), logger::init(), run_benchmark_import::result, and target_types_.

Referenced by ColumnFetcher::getAllTableColumnFragments().

105  {
106  if (sub_results.empty()) {
107  return nullptr;
108  }
109  const auto total_row_count = std::accumulate(
110  sub_results.begin(),
111  sub_results.end(),
112  size_t(0),
113  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
114  return init + result->size();
115  });
116  std::unique_ptr<ColumnarResults> merged_results(
117  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
118  const auto col_count = sub_results[0]->column_buffers_.size();
119  const auto nonempty_it = std::find_if(
120  sub_results.begin(),
121  sub_results.end(),
122  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
123  if (nonempty_it == sub_results.end()) {
124  return nullptr;
125  }
126  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
127  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
128  auto write_ptr =
129  reinterpret_cast<int8_t*>(checked_malloc(byte_width * total_row_count));
130  merged_results->column_buffers_.push_back(write_ptr);
131  row_set_mem_owner->addColBuffer(write_ptr);
132  for (auto& rs : sub_results) {
133  CHECK_EQ(col_count, rs->column_buffers_.size());
134  if (!rs->size()) {
135  continue;
136  }
137  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
138  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
139  write_ptr += rs->size() * byte_width;
140  }
141  }
142  return merged_results;
143 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const bool is_parallel_execution_enforced=false)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const size_t ColumnarResults::size ( ) const
inline

Definition at line 80 of file ColumnarResults.h.

References num_rows_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

80 { return num_rows_; }

+ Here is the caller graph for this function:

void ColumnarResults::writeBackCell ( const TargetValue col_val,
const size_t  row_idx,
const size_t  column_idx 
)
inlineprivate

Definition at line 208 of file ColumnarResults.cpp.

References CHECK(), column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), kDOUBLE, kFLOAT, and target_types_.

Referenced by compactAndCopyEntriesWithTargetSkipping(), materializeAllColumnsThroughIteration(), and materializeAllLazyColumns().

210  {
211  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
212  CHECK(scalar_col_val);
213  auto i64_p = boost::get<int64_t>(scalar_col_val);
214  const auto& type_info = target_types_[column_idx];
215  if (i64_p) {
216  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
217  switch (target_types_[column_idx].get_size()) {
218  case 1:
219  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
220  break;
221  case 2:
222  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
223  break;
224  case 4:
225  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
226  break;
227  case 8:
228  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
229  break;
230  default:
231  CHECK(false);
232  }
233  } else {
234  CHECK(target_types_[column_idx].is_fp());
235  switch (target_types_[column_idx].get_type()) {
236  case kFLOAT: {
237  auto float_p = boost::get<float>(scalar_col_val);
238  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
239  break;
240  }
241  case kDOUBLE: {
242  auto double_p = boost::get<double>(scalar_col_val);
243  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
244  break;
245  }
246  default:
247  CHECK(false);
248  }
249  }
250 }
std::vector< int8_t * > column_buffers_
CHECK(cgen_state)
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DATA_TYPE >
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

A set of write functions to be used to directly write into final column_buffers_. The read_from_function is used to read from the input result set's storage NOTE: currently only used for direct columnarizations

Definition at line 258 of file ColumnarResults.cpp.

References column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), and target_types_.

263  {
264  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
265  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
266  target_types_[target_idx]));
267  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
268  val;
269 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 272 of file ColumnarResults.cpp.

277  {
278  const int32_t ival =
279  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
280  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
281  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
282 }
std::vector< int8_t * > column_buffers_
template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 285 of file ColumnarResults.cpp.

291  {
292  const int64_t ival =
293  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
294  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
295  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
296 }
std::vector< int8_t * > column_buffers_

Member Data Documentation

std::vector<int8_t*> ColumnarResults::column_buffers_
protected
bool ColumnarResults::direct_columnar_conversion_
private

Definition at line 193 of file ColumnarResults.h.

Referenced by isDirectColumnarConversionPossible().

size_t ColumnarResults::num_rows_
protected
bool ColumnarResults::parallel_conversion_
private

Definition at line 191 of file ColumnarResults.h.

Referenced by isParallelConversion().

const std::vector<SQLTypeInfo> ColumnarResults::target_types_
private

The documentation for this class was generated from the following files: