OmniSciDB  ba1bac9284
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ColumnarResults Class Reference

#include <ColumnarResults.h>

Public Types

using ReadFunction = std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)>
 
using WriteFunction = std::function< void(const ResultSet &, const size_t, const size_t, const size_t, const size_t, const ReadFunction &)>
 

Public Member Functions

 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
 
 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const int8_t *one_col_buffer, const size_t num_rows, const SQLTypeInfo &target_type, const size_t thread_idx)
 
const std::vector< int8_t * > & getColumnBuffers () const
 
const size_t size () const
 
const SQLTypeInfogetColumnType (const int col_id) const
 
bool isParallelConversion () const
 
bool isDirectColumnarConversionPossible () const
 

Static Public Member Functions

static std::unique_ptr
< ColumnarResults
mergeResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
 

Protected Attributes

std::vector< int8_t * > column_buffers_
 
size_t num_rows_
 

Private Member Functions

 ColumnarResults (const size_t num_rows, const std::vector< SQLTypeInfo > &target_types)
 
void writeBackCell (const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
 
void materializeAllColumnsDirectly (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsThroughIteration (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsGroupBy (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsProjection (const ResultSet &rows, const size_t num_columns)
 
void copyAllNonLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void materializeAllLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void locateAndCountEntries (const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
 
void compactAndCopyEntries (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithoutTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
template<typename DATA_TYPE >
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
 
std::vector< WriteFunctioninitWriteFunctions (const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
 
template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ReadFunctioninitReadFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
std::tuple< std::vector
< WriteFunction >, std::vector
< ReadFunction > > 
initAllConversionFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 

Private Attributes

const std::vector< SQLTypeInfotarget_types_
 
bool parallel_conversion_
 
bool direct_columnar_conversion_
 
size_t thread_idx_
 

Detailed Description

Definition at line 61 of file ColumnarResults.h.

Member Typedef Documentation

using ColumnarResults::ReadFunction = std::function<int64_t(const ResultSet&, const size_t, const size_t, const size_t)>

Definition at line 96 of file ColumnarResults.h.

using ColumnarResults::WriteFunction = std::function<void(const ResultSet&, const size_t, const size_t, const size_t, const size_t, const ReadFunction&)>

Definition at line 105 of file ColumnarResults.h.

Constructor & Destructor Documentation

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const ResultSet rows,
const size_t  num_columns,
const std::vector< SQLTypeInfo > &  target_types,
const size_t  thread_idx,
const bool  is_parallel_execution_enforced = false 
)

Definition at line 46 of file ColumnarResults.cpp.

References column_buffers_, DEBUG_TIMER, i, isDirectColumnarConversionPossible(), kENCODING_NONE, materializeAllColumnsDirectly(), materializeAllColumnsThroughIteration(), num_rows_, and thread_idx_.

Referenced by mergeResults().

52  : column_buffers_(num_columns)
54  rows.isDirectColumnarConversionPossible()
55  ? rows.entryCount()
56  : rows.rowCount())
57  , target_types_(target_types)
58  , parallel_conversion_(is_parallel_execution_enforced
59  ? true
61  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
62  , thread_idx_(thread_idx) {
63  auto timer = DEBUG_TIMER(__func__);
64  column_buffers_.resize(num_columns);
65  for (size_t i = 0; i < num_columns; ++i) {
66  const bool is_varlen = target_types[i].is_array() ||
67  (target_types[i].is_string() &&
68  target_types[i].get_compression() == kENCODING_NONE) ||
69  target_types[i].is_geometry();
70  if (is_varlen) {
72  }
74  !rows.isZeroCopyColumnarConversionPossible(i)) {
75  column_buffers_[i] = row_set_mem_owner->allocate(
76  num_rows_ * target_types[i].get_size(), thread_idx_);
77  }
78  }
79 
80  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
81  materializeAllColumnsDirectly(rows, num_columns);
82  } else {
83  materializeAllColumnsThroughIteration(rows, num_columns);
84  }
85 }
std::vector< int8_t * > column_buffers_
bool direct_columnar_conversion_
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1122
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
#define DEBUG_TIMER(name)
Definition: Logger.h:322
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const int8_t *  one_col_buffer,
const size_t  num_rows,
const SQLTypeInfo target_type,
const size_t  thread_idx 
)

Definition at line 87 of file ColumnarResults.cpp.

92  : column_buffers_(1)
93  , num_rows_(num_rows)
94  , target_types_{target_type}
95  , parallel_conversion_(false)
97  , thread_idx_(thread_idx) {
98  auto timer = DEBUG_TIMER(__func__);
99  const bool is_varlen =
100  target_type.is_array() ||
101  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
102  target_type.is_geometry();
103  if (is_varlen) {
105  }
106  const auto buf_size = num_rows * target_type.get_size();
107  column_buffers_[0] =
108  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
109  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
110 }
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
bool direct_columnar_conversion_
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
bool is_geometry() const
Definition: sqltypes.h:501
#define DEBUG_TIMER(name)
Definition: Logger.h:322
bool is_string() const
Definition: sqltypes.h:489
const std::vector< SQLTypeInfo > target_types_
bool is_array() const
Definition: sqltypes.h:497
ColumnarResults::ColumnarResults ( const size_t  num_rows,
const std::vector< SQLTypeInfo > &  target_types 
)
inlineprivate

Definition at line 112 of file ColumnarResults.h.

113  : num_rows_(num_rows), target_types_(target_types) {}
const std::vector< SQLTypeInfo > target_types_

Member Function Documentation

void ColumnarResults::compactAndCopyEntries ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This function goes through all non-empty elements marked in the bitmap data structure, and store them back into output column buffers. The output column buffers are compacted without any holes in it.

TODO(Saman): if necessary, we can look into the distribution of non-empty entries and choose a different load-balanced strategy (assigning equal number of non-empties to each thread) as opposed to equal partitioning of the bitmap

Definition at line 631 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, compactAndCopyEntriesWithoutTargetSkipping(), compactAndCopyEntriesWithTargetSkipping(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), and gpu_enabled::partial_sum().

Referenced by materializeAllColumnsGroupBy().

638  {
640  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
641  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
642  CHECK_EQ(num_threads, non_empty_per_thread.size());
643 
644  // compute the exclusive scan over all non-empty totals
645  std::vector<size_t> global_offsets(num_threads + 1, 0);
646  std::partial_sum(non_empty_per_thread.begin(),
647  non_empty_per_thread.end(),
648  std::next(global_offsets.begin()));
649 
650  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
651  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
652  rows.getSupportedSingleSlotTargetBitmap();
653 
654  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
655  // differently and accessed through result set's iterator
656  if (num_single_slot_targets < num_columns) {
658  bitmap,
659  non_empty_per_thread,
660  global_offsets,
661  single_slot_targets_to_skip,
662  slot_idx_per_target_idx,
663  num_columns,
664  entry_count,
665  num_threads,
666  size_per_thread);
667  } else {
669  bitmap,
670  non_empty_per_thread,
671  global_offsets,
672  slot_idx_per_target_idx,
673  num_columns,
674  entry_count,
675  num_threads,
676  size_per_thread);
677  }
678 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithoutTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, all targets are assumed to be single-slot and thus can be directly columnarized.

Definition at line 801 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, check_interrupt(), Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), and UNLIKELY.

Referenced by compactAndCopyEntries().

810  {
812  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
813  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
814 
815  const auto [write_functions, read_functions] =
816  initAllConversionFunctions(rows, slot_idx_per_target_idx);
817  CHECK_EQ(write_functions.size(), num_columns);
818  CHECK_EQ(read_functions.size(), num_columns);
819  auto do_work = [&rows,
820  &bitmap,
821  &global_offsets,
822  &num_columns,
823  &slot_idx_per_target_idx,
824  &write_functions = write_functions,
825  &read_functions = read_functions](size_t& entry_idx,
826  size_t& non_empty_idx,
827  const size_t total_non_empty,
828  const size_t local_idx,
829  const size_t thread_idx,
830  const size_t end_idx) {
831  if (non_empty_idx >= total_non_empty) {
832  // all non-empty entries has been written back
833  entry_idx = end_idx;
834  return;
835  }
836  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
837  if (bitmap.get(local_idx, thread_idx)) {
838  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
839  write_functions[column_idx](rows,
840  entry_idx,
841  output_buffer_row_idx,
842  column_idx,
843  slot_idx_per_target_idx[column_idx],
844  read_functions[column_idx]);
845  }
846  non_empty_idx++;
847  }
848  };
849  auto compact_buffer_func = [&non_empty_per_thread, &do_work](const size_t start_index,
850  const size_t end_index,
851  const size_t thread_idx) {
852  const size_t total_non_empty = non_empty_per_thread[thread_idx];
853  size_t non_empty_idx = 0;
854  size_t local_idx = 0;
856  for (size_t entry_idx = start_index; entry_idx < end_index;
857  entry_idx++, local_idx++) {
858  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
860  }
861  do_work(
862  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
863  }
864  } else {
865  for (size_t entry_idx = start_index; entry_idx < end_index;
866  entry_idx++, local_idx++) {
867  do_work(
868  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
869  }
870  }
871  };
872 
873  std::vector<std::future<void>> compaction_threads;
874  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
875  const size_t start_entry = thread_idx * size_per_thread;
876  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
877  compaction_threads.push_back(std::async(
878  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
879  }
880 
881  try {
882  for (auto& child : compaction_threads) {
883  child.wait();
884  }
885  } catch (QueryExecutionError& e) {
888  }
889  throw e;
890  } catch (...) {
891  throw;
892  }
893 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
__device__ bool check_interrupt()
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< bool > &  targets_to_skip,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, multi-slot targets (e.g., AVG) are treated with the existing result set's iterations, but everything else is directly columnarized.

Definition at line 686 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, check_interrupt(), Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, ColumnBitmap::get(), QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), UNLIKELY, and writeBackCell().

Referenced by compactAndCopyEntries().

696  {
698  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
699  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
700 
701  const auto [write_functions, read_functions] =
702  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
703  CHECK_EQ(write_functions.size(), num_columns);
704  CHECK_EQ(read_functions.size(), num_columns);
705  auto do_work = [this,
706  &bitmap,
707  &rows,
708  &slot_idx_per_target_idx,
709  &global_offsets,
710  &targets_to_skip,
711  &num_columns,
712  &write_functions = write_functions,
713  &read_functions = read_functions](size_t& non_empty_idx,
714  const size_t total_non_empty,
715  const size_t local_idx,
716  size_t& entry_idx,
717  const size_t thread_idx,
718  const size_t end_idx) {
719  if (non_empty_idx >= total_non_empty) {
720  // all non-empty entries has been written back
721  entry_idx = end_idx;
722  }
723  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
724  if (bitmap.get(local_idx, thread_idx)) {
725  // targets that are recovered from the result set iterators:
726  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
727  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
728  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
729  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
730  }
731  }
732  // targets that are copied directly without any translation/decoding from
733  // result set
734  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
735  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
736  continue;
737  }
738  write_functions[column_idx](rows,
739  entry_idx,
740  output_buffer_row_idx,
741  column_idx,
742  slot_idx_per_target_idx[column_idx],
743  read_functions[column_idx]);
744  }
745  non_empty_idx++;
746  }
747  };
748 
749  auto compact_buffer_func = [&non_empty_per_thread, &do_work](const size_t start_index,
750  const size_t end_index,
751  const size_t thread_idx) {
752  const size_t total_non_empty = non_empty_per_thread[thread_idx];
753  size_t non_empty_idx = 0;
754  size_t local_idx = 0;
756  for (size_t entry_idx = start_index; entry_idx < end_index;
757  entry_idx++, local_idx++) {
758  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
760  }
761  do_work(
762  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
763  }
764  } else {
765  for (size_t entry_idx = start_index; entry_idx < end_index;
766  entry_idx++, local_idx++) {
767  do_work(
768  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
769  }
770  }
771  };
772 
773  std::vector<std::future<void>> compaction_threads;
774  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
775  const size_t start_entry = thread_idx * size_per_thread;
776  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
777  compaction_threads.push_back(std::async(
778  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
779  }
780 
781  try {
782  for (auto& child : compaction_threads) {
783  child.wait();
784  }
785  } catch (QueryExecutionError& e) {
788  }
789  throw e;
790  } catch (...) {
791  throw;
792  }
793 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
__device__ bool check_interrupt()
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
bool get(const size_t index, const size_t bank_index) const
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::copyAllNonLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 393 of file ColumnarResults.cpp.

References CHECK, column_buffers_, isDirectColumnarConversionPossible(), num_rows_, and target_types_.

Referenced by materializeAllColumnsProjection().

396  {
398  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
399  // Saman: make sure when this lazy_fetch_info is empty
400  if (lazy_fetch_info.empty()) {
401  return true;
402  } else {
403  return !lazy_fetch_info[col_idx].is_lazily_fetched;
404  }
405  };
406 
407  // parallelized by assigning each column to a thread
408  std::vector<std::future<void>> direct_copy_threads;
409  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
410  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
411  CHECK(!column_buffers_[col_idx]);
412  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
413  } else if (is_column_non_lazily_fetched(col_idx)) {
414  direct_copy_threads.push_back(std::async(
415  std::launch::async,
416  [&rows, this](const size_t column_index) {
417  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
418  rows.copyColumnIntoBuffer(
419  column_index, column_buffers_[column_index], column_size);
420  },
421  col_idx));
422  }
423  }
424 
425  for (auto& child : direct_copy_threads) {
426  child.wait();
427  }
428 }
std::vector< int8_t * > column_buffers_
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::vector<int8_t*>& ColumnarResults::getColumnBuffers ( ) const
inline

Definition at line 80 of file ColumnarResults.h.

References column_buffers_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

80 { return column_buffers_; }
std::vector< int8_t * > column_buffers_

+ Here is the caller graph for this function:

const SQLTypeInfo& ColumnarResults::getColumnType ( const int  col_id) const
inline

Definition at line 84 of file ColumnarResults.h.

References CHECK_GE, CHECK_LT, and target_types_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

84  {
85  CHECK_GE(col_id, 0);
86  CHECK_LT(static_cast<size_t>(col_id), target_types_.size());
87  return target_types_[col_id];
88  }
#define CHECK_GE(x, y)
Definition: Logger.h:219
#define CHECK_LT(x, y)
Definition: Logger.h:216
const std::vector< SQLTypeInfo > target_types_

+ Here is the caller graph for this function:

std::tuple< std::vector< ColumnarResults::WriteFunction >, std::vector< ColumnarResults::ReadFunction > > ColumnarResults::initAllConversionFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

This function goes through all target types in the output, and chooses appropriate write and read functions per target. The goal is then to simply use these functions for each row and per target. Read functions are used to read each cell's data content (particular target in a row), and write functions are used to properly write back the cell's content into the output column buffers.

Definition at line 1186 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, initWriteFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntriesWithoutTargetSkipping(), and compactAndCopyEntriesWithTargetSkipping().

1189  {
1191  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1192  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1193 
1194  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1195  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1196  if (rows.didOutputColumnar()) {
1197  return std::make_tuple(
1198  std::move(write_functions),
1199  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1200  rows, slot_idx_per_target_idx, targets_to_skip));
1201  } else {
1202  return std::make_tuple(
1203  std::move(write_functions),
1204  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1205  rows, slot_idx_per_target_idx, targets_to_skip));
1206  }
1207  } else {
1208  if (rows.didOutputColumnar()) {
1209  return std::make_tuple(
1210  std::move(write_functions),
1211  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1212  rows, slot_idx_per_target_idx, targets_to_skip));
1213  } else {
1214  return std::make_tuple(
1215  std::move(write_functions),
1216  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1217  rows, slot_idx_per_target_idx, targets_to_skip));
1218  }
1219  }
1220 }
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ColumnarResults::ReadFunction > ColumnarResults::initReadFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initializes a set of read funtions to properly access the contents of the result set's storage buffer. Each particular read function is chosen based on the data type and data size used to store that target in the result set's storage buffer. These functions are then used for each row in the result set.

Definition at line 1088 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, GroupByBaselineHash, anonymous_namespace{ColumnarResults.cpp}::invalid_read_func(), isDirectColumnarConversionPossible(), kDOUBLE, kFLOAT, target_types_, and UNREACHABLE.

1091  {
1093  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1094  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1095 
1096  std::vector<ReadFunction> read_functions;
1097  read_functions.reserve(target_types_.size());
1098 
1099  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1100  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1101  // for targets that should be skipped, we use a placeholder function that should
1102  // never be called. The CHECKs inside it make sure that never happens.
1103  read_functions.emplace_back(invalid_read_func);
1104  continue;
1105  }
1106 
1107  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1108  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1109  // for key columns only
1110  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1111  if (target_types_[target_idx].is_fp()) {
1112  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1113  switch (target_types_[target_idx].get_type()) {
1114  case kFLOAT:
1115  read_functions.emplace_back(
1116  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1117  break;
1118  case kDOUBLE:
1119  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1120  break;
1121  default:
1122  UNREACHABLE()
1123  << "Invalid data type encountered (BaselineHash, floating point key).";
1124  break;
1125  }
1126  } else {
1127  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1128  case 8:
1129  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1130  break;
1131  case 4:
1132  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1133  break;
1134  default:
1135  UNREACHABLE()
1136  << "Invalid data type encountered (BaselineHash, integer key).";
1137  }
1138  }
1139  continue;
1140  }
1141  }
1142  if (target_types_[target_idx].is_fp()) {
1143  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1144  case 8:
1145  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1146  break;
1147  case 4:
1148  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1149  break;
1150  default:
1151  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1152  break;
1153  }
1154  } else {
1155  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1156  case 8:
1157  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1158  break;
1159  case 4:
1160  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1161  break;
1162  case 2:
1163  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1164  break;
1165  case 1:
1166  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1167  break;
1168  default:
1169  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1170  break;
1171  }
1172  }
1173  }
1174  return read_functions;
1175 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
#define UNREACHABLE()
Definition: Logger.h:250
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

std::vector< ColumnarResults::WriteFunction > ColumnarResults::initWriteFunctions ( const ResultSet rows,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initialize a set of write functions per target (i.e., column). Target types' logical size are used to categorize the correct write function per target. These functions are then used for every row in the result set.

Definition at line 900 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), run_benchmark_import::result, target_types_, and UNREACHABLE.

Referenced by initAllConversionFunctions().

902  {
904  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
905  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
906 
907  std::vector<WriteFunction> result;
908  result.reserve(target_types_.size());
909 
910  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
911  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
912  result.emplace_back([](const ResultSet& rows,
913  const size_t input_buffer_entry_idx,
914  const size_t output_buffer_entry_idx,
915  const size_t target_idx,
916  const size_t slot_idx,
917  const ReadFunction& read_function) {
918  UNREACHABLE() << "Invalid write back function used.";
919  });
920  continue;
921  }
922 
923  if (target_types_[target_idx].is_fp()) {
924  switch (target_types_[target_idx].get_size()) {
925  case 8:
926  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
927  this,
928  std::placeholders::_1,
929  std::placeholders::_2,
930  std::placeholders::_3,
931  std::placeholders::_4,
932  std::placeholders::_5,
933  std::placeholders::_6));
934  break;
935  case 4:
936  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
937  this,
938  std::placeholders::_1,
939  std::placeholders::_2,
940  std::placeholders::_3,
941  std::placeholders::_4,
942  std::placeholders::_5,
943  std::placeholders::_6));
944  break;
945  default:
946  UNREACHABLE() << "Invalid target type encountered.";
947  break;
948  }
949  } else {
950  switch (target_types_[target_idx].get_size()) {
951  case 8:
952  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
953  this,
954  std::placeholders::_1,
955  std::placeholders::_2,
956  std::placeholders::_3,
957  std::placeholders::_4,
958  std::placeholders::_5,
959  std::placeholders::_6));
960  break;
961  case 4:
962  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
963  this,
964  std::placeholders::_1,
965  std::placeholders::_2,
966  std::placeholders::_3,
967  std::placeholders::_4,
968  std::placeholders::_5,
969  std::placeholders::_6));
970  break;
971  case 2:
972  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
973  this,
974  std::placeholders::_1,
975  std::placeholders::_2,
976  std::placeholders::_3,
977  std::placeholders::_4,
978  std::placeholders::_5,
979  std::placeholders::_6));
980  break;
981  case 1:
982  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
983  this,
984  std::placeholders::_1,
985  std::placeholders::_2,
986  std::placeholders::_3,
987  std::placeholders::_4,
988  std::placeholders::_5,
989  std::placeholders::_6));
990  break;
991  default:
992  UNREACHABLE() << "Invalid target type encountered.";
993  break;
994  }
995  }
996  }
997  return result;
998 }
#define UNREACHABLE()
Definition: Logger.h:250
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ColumnarResults::isDirectColumnarConversionPossible ( ) const
inline
bool ColumnarResults::isParallelConversion ( ) const
inline

Definition at line 90 of file ColumnarResults.h.

References parallel_conversion_.

Referenced by materializeAllColumnsGroupBy(), and materializeAllColumnsThroughIteration().

90 { return parallel_conversion_; }

+ Here is the caller graph for this function:

void ColumnarResults::locateAndCountEntries ( const ResultSet rows,
ColumnBitmap bitmap,
std::vector< size_t > &  non_empty_per_thread,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
) const
private

This function goes through all the keys in the result set, and count the total number of non-empty keys. It also store the location of non-empty keys in a bitmap data structure for later faster access.

Definition at line 559 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, check_interrupt(), Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), ColumnBitmap::set(), and UNLIKELY.

Referenced by materializeAllColumnsGroupBy().

564  {
566  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
567  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
568  CHECK_EQ(num_threads, non_empty_per_thread.size());
569  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
570  const size_t local_idx,
571  const size_t entry_idx,
572  const size_t thread_idx) {
573  if (!rows.isRowAtEmpty(entry_idx)) {
574  total_non_empty++;
575  bitmap.set(local_idx, thread_idx, true);
576  }
577  };
578  auto locate_and_count_func = [&do_work, &non_empty_per_thread](size_t start_index,
579  size_t end_index,
580  size_t thread_idx) {
581  size_t total_non_empty = 0;
582  size_t local_idx = 0;
584  for (size_t entry_idx = start_index; entry_idx < end_index;
585  entry_idx++, local_idx++) {
586  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
588  }
589  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
590  }
591  } else {
592  for (size_t entry_idx = start_index; entry_idx < end_index;
593  entry_idx++, local_idx++) {
594  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
595  }
596  }
597  non_empty_per_thread[thread_idx] = total_non_empty;
598  };
599 
600  std::vector<std::future<void>> conversion_threads;
601  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
602  const size_t start_entry = thread_idx * size_per_thread;
603  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
604  conversion_threads.push_back(std::async(
605  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
606  }
607 
608  try {
609  for (auto& child : conversion_threads) {
610  child.wait();
611  }
612  } catch (QueryExecutionError& e) {
615  }
616  throw e;
617  } catch (...) {
618  throw;
619  }
620 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
void set(const size_t index, const size_t bank_index, const bool val)
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
__device__ bool check_interrupt()
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
#define CHECK(condition)
Definition: Logger.h:206

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsDirectly ( const ResultSet rows,
const size_t  num_columns 
)
private

This function materializes all columns from the main storage and all appended storages and form a single continguous column for each output column. Depending on whether the column is lazily fetched or not, it will treat them differently.

NOTE: this function should only be used when the result set is columnar and completely compacted (e.g., in columnar projections).

Definition at line 345 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), materializeAllColumnsGroupBy(), materializeAllColumnsProjection(), Projection, and UNREACHABLE.

Referenced by ColumnarResults().

346  {
348  switch (rows.getQueryDescriptionType()) {
350  materializeAllColumnsProjection(rows, num_columns);
351  break;
352  }
355  materializeAllColumnsGroupBy(rows, num_columns);
356  break;
357  }
358  default:
359  UNREACHABLE()
360  << "Direct columnar conversion for this query type is not supported yet.";
361  }
362 }
#define UNREACHABLE()
Definition: Logger.h:250
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsGroupBy ( const ResultSet rows,
const size_t  num_columns 
)
private

This function is to directly columnarize a result set for group by queries. Its main difference with the traditional alternative is that it directly reads non-empty entries from the result set, and then writes them into output column buffers, rather than using the result set's iterators.

Definition at line 524 of file ColumnarResults.cpp.

References CHECK, compactAndCopyEntries(), cpu_threads(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), isParallelConversion(), and locateAndCountEntries().

Referenced by materializeAllColumnsDirectly().

525  {
527  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
528  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
529 
530  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
531  const size_t entry_count = rows.entryCount();
532  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
533 
534  // step 1: compute total non-empty elements and store a bitmap per thread
535  std::vector<size_t> non_empty_per_thread(num_threads,
536  0); // number of non-empty entries per thread
537 
538  ColumnBitmap bitmap(size_per_thread, num_threads);
539 
541  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
542 
543  // step 2: go through the generated bitmap and copy/decode corresponding entries
544  // into the output buffer
546  bitmap,
547  non_empty_per_thread,
548  num_columns,
549  entry_count,
550  num_threads,
551  size_per_thread);
552 }
bool isParallelConversion() const
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:206
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsProjection ( const ResultSet rows,
const size_t  num_columns 
)
private

This function handles materialization for two types of columns in columnar projections:

  1. for all non-lazy columns, it directly copies the results from the result set's storage into the output column buffers
  2. for all lazy fetched columns, it uses result set's iterators to decode the proper values before storing them into the output column buffers

Definition at line 371 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), materializeAllLazyColumns(), and Projection.

Referenced by materializeAllColumnsDirectly().

372  {
373  CHECK(rows.query_mem_desc_.didOutputColumnar());
375  rows.query_mem_desc_.getQueryDescriptionType() ==
377 
378  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
379 
380  // We can directly copy each non-lazy column's content
381  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
382 
383  // Only lazy columns are iterated through first and then materialized
384  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
385 }
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:206
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsThroughIteration ( const ResultSet rows,
const size_t  num_columns 
)
private

This function iterates through the result set (using the getRowAtNoTranslation and getNextRow family of functions) and writes back the results into output column buffers.

Definition at line 156 of file ColumnarResults.cpp.

References check_interrupt(), cpu_threads(), Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), i, isParallelConversion(), makeIntervals(), num_rows_, UNLIKELY, and writeBackCell().

Referenced by ColumnarResults().

157  {
158  std::atomic<size_t> row_idx{0};
159  if (isParallelConversion()) {
160  const size_t worker_count = cpu_threads();
161  std::vector<std::future<void>> conversion_threads;
162  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
163  const auto crt_row = rows.getRowAtNoTranslations(i);
164  if (!crt_row.empty()) {
165  auto cur_row_idx = row_idx.fetch_add(1);
166  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
167  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
168  }
169  }
170  };
171  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
172  conversion_threads.push_back(std::async(
173  std::launch::async,
174  [&do_work](const size_t start, const size_t end) {
176  size_t local_idx = 0;
177  for (size_t i = start; i < end; ++i, ++local_idx) {
178  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
180  }
181  do_work(i);
182  }
183  } else {
184  for (size_t i = start; i < end; ++i) {
185  do_work(i);
186  }
187  }
188  },
189  interval.begin,
190  interval.end));
191  }
192 
193  try {
194  for (auto& child : conversion_threads) {
195  child.wait();
196  }
197  } catch (QueryExecutionError& e) {
200  }
201  throw e;
202  } catch (...) {
203  throw;
204  }
205 
206  num_rows_ = row_idx;
207  rows.setCachedRowCount(num_rows_);
208  return;
209  }
210  bool done = false;
211  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
212  const auto crt_row = rows.getNextRow(false, false);
213  if (crt_row.empty()) {
214  done = true;
215  return;
216  }
217  for (size_t i = 0; i < num_columns; ++i) {
218  writeBackCell(crt_row[i], row_idx, i);
219  }
220  ++row_idx;
221  };
223  while (!done) {
224  if (UNLIKELY((row_idx & 0xFFFF) == 0 && check_interrupt())) {
226  }
227  do_work();
228  }
229  } else {
230  while (!done) {
231  do_work();
232  }
233  }
234 
235  rows.moveToBegin();
236 }
bool isParallelConversion() const
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
__device__ bool check_interrupt()
#define UNLIKELY(x)
Definition: likely.h:25
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

For all lazy fetched columns, we should iterate through the column's content and properly materialize it.

This function is parallelized through dividing total rows among all existing threads. Since there's no invalid element in the result set (e.g., columnar projections), the output buffer will have as many rows as there are in the result set, removing the need for atomicly incrementing the output buffer position.

Definition at line 439 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, check_interrupt(), cpu_threads(), Executor::ERR_INTERRUPTED, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), i, isDirectColumnarConversionPossible(), makeIntervals(), UNLIKELY, result_set::use_parallel_algorithms(), and writeBackCell().

Referenced by materializeAllColumnsProjection().

442  {
444  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
445  const size_t row_idx,
446  const std::vector<bool>& targets_to_skip) {
447  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
448  for (size_t i = 0; i < num_columns; ++i) {
449  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
450  writeBackCell(crt_row[i], row_idx, i);
451  }
452  }
453  };
454 
455  const auto contains_lazy_fetched_column =
456  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
457  for (auto& col_info : lazy_fetch_info) {
458  if (col_info.is_lazily_fetched) {
459  return true;
460  }
461  }
462  return false;
463  };
464 
465  // parallelized by assigning a chunk of rows to each thread)
466  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
467  if (contains_lazy_fetched_column(lazy_fetch_info)) {
468  const size_t worker_count =
470  std::vector<std::future<void>> conversion_threads;
471  std::vector<bool> targets_to_skip;
472  if (skip_non_lazy_columns) {
473  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
474  targets_to_skip.reserve(num_columns);
475  for (size_t i = 0; i < num_columns; i++) {
476  // we process lazy columns (i.e., skip non-lazy columns)
477  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
478  }
479  }
480  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
481  conversion_threads.push_back(std::async(
482  std::launch::async,
483  [&do_work_just_lazy_columns, &targets_to_skip](const size_t start,
484  const size_t end) {
486  size_t local_idx = 0;
487  for (size_t i = start; i < end; ++i, ++local_idx) {
488  if (UNLIKELY((local_idx & 0xFFFF) == 0 && check_interrupt())) {
490  }
491  do_work_just_lazy_columns(i, targets_to_skip);
492  }
493  } else {
494  for (size_t i = start; i < end; ++i) {
495  do_work_just_lazy_columns(i, targets_to_skip);
496  }
497  }
498  },
499  interval.begin,
500  interval.end));
501  }
502 
503  try {
504  for (auto& child : conversion_threads) {
505  child.wait();
506  }
507  } catch (QueryExecutionError& e) {
510  }
511  throw e;
512  } catch (...) {
513  throw;
514  }
515  }
516 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1121
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:116
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1122
__device__ bool check_interrupt()
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
#define CHECK(condition)
Definition: Logger.h:206
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< ColumnarResults > ColumnarResults::mergeResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const std::vector< std::unique_ptr< ColumnarResults >> &  sub_results 
)
static

Definition at line 112 of file ColumnarResults.cpp.

References gpu_enabled::accumulate(), CHECK_EQ, ColumnarResults(), logger::init(), run_benchmark_import::result, and target_types_.

Referenced by ColumnFetcher::getAllTableColumnFragments().

114  {
115  if (sub_results.empty()) {
116  return nullptr;
117  }
118  const auto total_row_count = std::accumulate(
119  sub_results.begin(),
120  sub_results.end(),
121  size_t(0),
122  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
123  return init + result->size();
124  });
125  std::unique_ptr<ColumnarResults> merged_results(
126  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
127  const auto col_count = sub_results[0]->column_buffers_.size();
128  const auto nonempty_it = std::find_if(
129  sub_results.begin(),
130  sub_results.end(),
131  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
132  if (nonempty_it == sub_results.end()) {
133  return nullptr;
134  }
135  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
136  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
137  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
138  merged_results->column_buffers_.push_back(write_ptr);
139  for (auto& rs : sub_results) {
140  CHECK_EQ(col_count, rs->column_buffers_.size());
141  if (!rs->size()) {
142  continue;
143  }
144  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
145  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
146  write_ptr += rs->size() * byte_width;
147  }
148  }
149  return merged_results;
150 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const size_t ColumnarResults::size ( ) const
inline

Definition at line 82 of file ColumnarResults.h.

References num_rows_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

82 { return num_rows_; }

+ Here is the caller graph for this function:

void ColumnarResults::writeBackCell ( const TargetValue col_val,
const size_t  row_idx,
const size_t  column_idx 
)
inlineprivate

Definition at line 246 of file ColumnarResults.cpp.

References CHECK, column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), kDOUBLE, kFLOAT, and target_types_.

Referenced by compactAndCopyEntriesWithTargetSkipping(), materializeAllColumnsThroughIteration(), and materializeAllLazyColumns().

248  {
249  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
250  CHECK(scalar_col_val);
251  auto i64_p = boost::get<int64_t>(scalar_col_val);
252  const auto& type_info = target_types_[column_idx];
253  if (i64_p) {
254  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
255  switch (target_types_[column_idx].get_size()) {
256  case 1:
257  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
258  break;
259  case 2:
260  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
261  break;
262  case 4:
263  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
264  break;
265  case 8:
266  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
267  break;
268  default:
269  CHECK(false);
270  }
271  } else {
272  CHECK(target_types_[column_idx].is_fp());
273  switch (target_types_[column_idx].get_type()) {
274  case kFLOAT: {
275  auto float_p = boost::get<float>(scalar_col_val);
276  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
277  break;
278  }
279  case kDOUBLE: {
280  auto double_p = boost::get<double>(scalar_col_val);
281  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
282  break;
283  }
284  default:
285  CHECK(false);
286  }
287  }
288 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
#define CHECK(condition)
Definition: Logger.h:206
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DATA_TYPE >
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

A set of write functions to be used to directly write into final column_buffers_. The read_from_function is used to read from the input result set's storage NOTE: currently only used for direct columnarizations

Definition at line 296 of file ColumnarResults.cpp.

References column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), and target_types_.

301  {
302  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
303  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
304  target_types_[target_idx]));
305  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
306  val;
307 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 310 of file ColumnarResults.cpp.

315  {
316  const int32_t ival =
317  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
318  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
319  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
320 }
std::vector< int8_t * > column_buffers_
template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 323 of file ColumnarResults.cpp.

329  {
330  const int64_t ival =
331  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
332  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
333  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
334 }
std::vector< int8_t * > column_buffers_

Member Data Documentation

std::vector<int8_t*> ColumnarResults::column_buffers_
protected
bool ColumnarResults::direct_columnar_conversion_
private

Definition at line 194 of file ColumnarResults.h.

Referenced by isDirectColumnarConversionPossible().

size_t ColumnarResults::num_rows_
protected
bool ColumnarResults::parallel_conversion_
private

Definition at line 193 of file ColumnarResults.h.

Referenced by isParallelConversion().

const std::vector<SQLTypeInfo> ColumnarResults::target_types_
private
size_t ColumnarResults::thread_idx_
private

Definition at line 196 of file ColumnarResults.h.

Referenced by ColumnarResults().


The documentation for this class was generated from the following files: