OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults Class Reference

#include <ColumnarResults.h>

Public Types

using ReadFunction = std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)>
 
using WriteFunction = std::function< void(const ResultSet &, const size_t, const size_t, const size_t, const size_t, const ReadFunction &)>
 

Public Member Functions

 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
 
 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const int8_t *one_col_buffer, const size_t num_rows, const SQLTypeInfo &target_type, const size_t executor_id, const size_t thread_idx)
 
const std::vector< int8_t * > & getColumnBuffers () const
 
const size_t size () const
 
const SQLTypeInfogetColumnType (const int col_id) const
 
bool isParallelConversion () const
 
bool isDirectColumnarConversionPossible () const
 

Static Public Member Functions

static std::unique_ptr
< ColumnarResults
mergeResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
 

Protected Attributes

std::vector< int8_t * > column_buffers_
 
size_t num_rows_
 

Private Member Functions

 ColumnarResults (const size_t num_rows, const std::vector< SQLTypeInfo > &target_types)
 
void writeBackCell (const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
 
void materializeAllColumnsDirectly (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsThroughIteration (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsGroupBy (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsProjection (const ResultSet &rows, const size_t num_columns)
 
void copyAllNonLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void materializeAllLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void locateAndCountEntries (const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
 
void compactAndCopyEntries (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithoutTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
template<typename DATA_TYPE >
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
 
std::vector< WriteFunctioninitWriteFunctions (const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
 
template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ReadFunctioninitReadFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
std::tuple< std::vector
< WriteFunction >, std::vector
< ReadFunction > > 
initAllConversionFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 

Private Attributes

const std::vector< SQLTypeInfotarget_types_
 
bool parallel_conversion_
 
bool direct_columnar_conversion_
 
size_t thread_idx_
 
std::shared_ptr< Executorexecutor_
 

Detailed Description

Definition at line 61 of file ColumnarResults.h.

Member Typedef Documentation

using ColumnarResults::ReadFunction = std::function<int64_t(const ResultSet&, const size_t, const size_t, const size_t)>

Definition at line 98 of file ColumnarResults.h.

using ColumnarResults::WriteFunction = std::function<void(const ResultSet&, const size_t, const size_t, const size_t, const size_t, const ReadFunction&)>

Definition at line 107 of file ColumnarResults.h.

Constructor & Destructor Documentation

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const ResultSet rows,
const size_t  num_columns,
const std::vector< SQLTypeInfo > &  target_types,
const size_t  executor_id,
const size_t  thread_idx,
const bool  is_parallel_execution_enforced = false 
)

Definition at line 46 of file ColumnarResults.cpp.

References CHECK, column_buffers_, DEBUG_TIMER, executor_, Executor::getExecutor(), i, isDirectColumnarConversionPossible(), kENCODING_NONE, materializeAllColumnsDirectly(), materializeAllColumnsThroughIteration(), num_rows_, and thread_idx_.

Referenced by mergeResults().

53  : column_buffers_(num_columns)
55  rows.isDirectColumnarConversionPossible()
56  ? rows.entryCount()
57  : rows.rowCount())
58  , target_types_(target_types)
59  , parallel_conversion_(is_parallel_execution_enforced
60  ? true
62  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
63  , thread_idx_(thread_idx) {
64  auto timer = DEBUG_TIMER(__func__);
65  column_buffers_.resize(num_columns);
66  executor_ = Executor::getExecutor(executor_id);
68  for (size_t i = 0; i < num_columns; ++i) {
69  const bool is_varlen = target_types[i].is_array() ||
70  (target_types[i].is_string() &&
71  target_types[i].get_compression() == kENCODING_NONE) ||
72  target_types[i].is_geometry();
73  if (is_varlen) {
75  }
77  !rows.isZeroCopyColumnarConversionPossible(i)) {
78  column_buffers_[i] = row_set_mem_owner->allocate(
79  num_rows_ * target_types[i].get_size(), thread_idx_);
80  }
81  }
82 
83  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
84  materializeAllColumnsDirectly(rows, num_columns);
85  } else {
86  materializeAllColumnsThroughIteration(rows, num_columns);
87  }
88 }
std::vector< int8_t * > column_buffers_
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:176
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1139
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const int8_t *  one_col_buffer,
const size_t  num_rows,
const SQLTypeInfo target_type,
const size_t  executor_id,
const size_t  thread_idx 
)

Definition at line 90 of file ColumnarResults.cpp.

96  : column_buffers_(1)
97  , num_rows_(num_rows)
98  , target_types_{target_type}
99  , parallel_conversion_(false)
101  , thread_idx_(thread_idx) {
102  auto timer = DEBUG_TIMER(__func__);
103  const bool is_varlen =
104  target_type.is_array() ||
105  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
106  target_type.is_geometry();
107  if (is_varlen) {
109  }
110  executor_ = Executor::getExecutor(executor_id);
111  CHECK(executor_);
112  const auto buf_size = num_rows * target_type.get_size();
113  column_buffers_[0] =
114  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
115  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
116 }
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:176
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:521
#define DEBUG_TIMER(name)
Definition: Logger.h:352
bool is_string() const
Definition: sqltypes.h:509
const std::vector< SQLTypeInfo > target_types_
bool is_array() const
Definition: sqltypes.h:517
ColumnarResults::ColumnarResults ( const size_t  num_rows,
const std::vector< SQLTypeInfo > &  target_types 
)
inlineprivate

Definition at line 114 of file ColumnarResults.h.

115  : num_rows_(num_rows), target_types_(target_types) {}
const std::vector< SQLTypeInfo > target_types_

Member Function Documentation

void ColumnarResults::compactAndCopyEntries ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This function goes through all non-empty elements marked in the bitmap data structure, and store them back into output column buffers. The output column buffers are compacted without any holes in it.

TODO(Saman): if necessary, we can look into the distribution of non-empty entries and choose a different load-balanced strategy (assigning equal number of non-empties to each thread) as opposed to equal partitioning of the bitmap

Definition at line 641 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, compactAndCopyEntriesWithoutTargetSkipping(), compactAndCopyEntriesWithTargetSkipping(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), and gpu_enabled::partial_sum().

Referenced by materializeAllColumnsGroupBy().

648  {
650  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
651  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
652  CHECK_EQ(num_threads, non_empty_per_thread.size());
653 
654  // compute the exclusive scan over all non-empty totals
655  std::vector<size_t> global_offsets(num_threads + 1, 0);
656  std::partial_sum(non_empty_per_thread.begin(),
657  non_empty_per_thread.end(),
658  std::next(global_offsets.begin()));
659 
660  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
661  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
662  rows.getSupportedSingleSlotTargetBitmap();
663 
664  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
665  // differently and accessed through result set's iterator
666  if (num_single_slot_targets < num_columns) {
668  bitmap,
669  non_empty_per_thread,
670  global_offsets,
671  single_slot_targets_to_skip,
672  slot_idx_per_target_idx,
673  num_columns,
674  entry_count,
675  num_threads,
676  size_per_thread);
677  } else {
679  bitmap,
680  non_empty_per_thread,
681  global_offsets,
682  slot_idx_per_target_idx,
683  num_columns,
684  entry_count,
685  num_threads,
686  size_per_thread);
687  }
688 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithoutTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, all targets are assumed to be single-slot and thus can be directly columnarized.

Definition at line 813 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), and UNLIKELY.

Referenced by compactAndCopyEntries().

822  {
824  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
825  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
826 
827  const auto [write_functions, read_functions] =
828  initAllConversionFunctions(rows, slot_idx_per_target_idx);
829  CHECK_EQ(write_functions.size(), num_columns);
830  CHECK_EQ(read_functions.size(), num_columns);
831  auto do_work = [&rows,
832  &bitmap,
833  &global_offsets,
834  &num_columns,
835  &slot_idx_per_target_idx,
836  &write_functions = write_functions,
837  &read_functions = read_functions](size_t& entry_idx,
838  size_t& non_empty_idx,
839  const size_t total_non_empty,
840  const size_t local_idx,
841  const size_t thread_idx,
842  const size_t end_idx) {
843  if (non_empty_idx >= total_non_empty) {
844  // all non-empty entries has been written back
845  entry_idx = end_idx;
846  return;
847  }
848  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
849  if (bitmap.get(local_idx, thread_idx)) {
850  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
851  write_functions[column_idx](rows,
852  entry_idx,
853  output_buffer_row_idx,
854  column_idx,
855  slot_idx_per_target_idx[column_idx],
856  read_functions[column_idx]);
857  }
858  non_empty_idx++;
859  }
860  };
861  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
862  const size_t start_index,
863  const size_t end_index,
864  const size_t thread_idx) {
865  const size_t total_non_empty = non_empty_per_thread[thread_idx];
866  size_t non_empty_idx = 0;
867  size_t local_idx = 0;
869  for (size_t entry_idx = start_index; entry_idx < end_index;
870  entry_idx++, local_idx++) {
871  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
872  executor_->checkNonKernelTimeInterrupted())) {
874  }
875  do_work(
876  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
877  }
878  } else {
879  for (size_t entry_idx = start_index; entry_idx < end_index;
880  entry_idx++, local_idx++) {
881  do_work(
882  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
883  }
884  }
885  };
886 
887  std::vector<std::future<void>> compaction_threads;
888  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
889  const size_t start_entry = thread_idx * size_per_thread;
890  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
891  compaction_threads.push_back(std::async(
892  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
893  }
894 
895  try {
896  for (auto& child : compaction_threads) {
897  child.wait();
898  }
899  } catch (QueryExecutionError& e) {
902  }
903  throw e;
904  } catch (...) {
905  throw;
906  }
907 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
future< Result > async(Fn &&fn, Args &&...args)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< bool > &  targets_to_skip,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, multi-slot targets (e.g., AVG) are treated with the existing result set's iterations, but everything else is directly columnarized.

Definition at line 696 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, ColumnBitmap::get(), QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), UNLIKELY, and writeBackCell().

Referenced by compactAndCopyEntries().

706  {
708  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
709  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
710 
711  const auto [write_functions, read_functions] =
712  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
713  CHECK_EQ(write_functions.size(), num_columns);
714  CHECK_EQ(read_functions.size(), num_columns);
715  auto do_work = [this,
716  &bitmap,
717  &rows,
718  &slot_idx_per_target_idx,
719  &global_offsets,
720  &targets_to_skip,
721  &num_columns,
722  &write_functions = write_functions,
723  &read_functions = read_functions](size_t& non_empty_idx,
724  const size_t total_non_empty,
725  const size_t local_idx,
726  size_t& entry_idx,
727  const size_t thread_idx,
728  const size_t end_idx) {
729  if (non_empty_idx >= total_non_empty) {
730  // all non-empty entries has been written back
731  entry_idx = end_idx;
732  }
733  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
734  if (bitmap.get(local_idx, thread_idx)) {
735  // targets that are recovered from the result set iterators:
736  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
737  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
738  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
739  writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
740  }
741  }
742  // targets that are copied directly without any translation/decoding from
743  // result set
744  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
745  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
746  continue;
747  }
748  write_functions[column_idx](rows,
749  entry_idx,
750  output_buffer_row_idx,
751  column_idx,
752  slot_idx_per_target_idx[column_idx],
753  read_functions[column_idx]);
754  }
755  non_empty_idx++;
756  }
757  };
758 
759  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
760  const size_t start_index,
761  const size_t end_index,
762  const size_t thread_idx) {
763  const size_t total_non_empty = non_empty_per_thread[thread_idx];
764  size_t non_empty_idx = 0;
765  size_t local_idx = 0;
767  for (size_t entry_idx = start_index; entry_idx < end_index;
768  entry_idx++, local_idx++) {
769  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
770  executor_->checkNonKernelTimeInterrupted())) {
772  }
773  do_work(
774  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
775  }
776  } else {
777  for (size_t entry_idx = start_index; entry_idx < end_index;
778  entry_idx++, local_idx++) {
779  do_work(
780  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
781  }
782  }
783  };
784 
785  std::vector<std::future<void>> compaction_threads;
786  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
787  const size_t start_entry = thread_idx * size_per_thread;
788  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
789  compaction_threads.push_back(std::async(
790  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
791  }
792 
793  try {
794  for (auto& child : compaction_threads) {
795  child.wait();
796  }
797  } catch (QueryExecutionError& e) {
800  }
801  throw e;
802  } catch (...) {
803  throw;
804  }
805 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
future< Result > async(Fn &&fn, Args &&...args)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::copyAllNonLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 401 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, column_buffers_, isDirectColumnarConversionPossible(), num_rows_, and target_types_.

Referenced by materializeAllColumnsProjection().

404  {
406  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
407  // Saman: make sure when this lazy_fetch_info is empty
408  if (lazy_fetch_info.empty()) {
409  return true;
410  } else {
411  return !lazy_fetch_info[col_idx].is_lazily_fetched;
412  }
413  };
414 
415  // parallelized by assigning each column to a thread
416  std::vector<std::future<void>> direct_copy_threads;
417  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
418  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
419  CHECK(!column_buffers_[col_idx]);
420  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
421  } else if (is_column_non_lazily_fetched(col_idx)) {
422  direct_copy_threads.push_back(std::async(
424  [&rows, this](const size_t column_index) {
425  const size_t column_size = num_rows_ * target_types_[column_index].get_size();
426  rows.copyColumnIntoBuffer(
427  column_index, column_buffers_[column_index], column_size);
428  },
429  col_idx));
430  }
431  }
432 
433  for (auto& child : direct_copy_threads) {
434  child.wait();
435  }
436 }
std::vector< int8_t * > column_buffers_
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::vector<int8_t*>& ColumnarResults::getColumnBuffers ( ) const
inline

Definition at line 82 of file ColumnarResults.h.

References column_buffers_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

82 { return column_buffers_; }
std::vector< int8_t * > column_buffers_

+ Here is the caller graph for this function:

const SQLTypeInfo& ColumnarResults::getColumnType ( const int  col_id) const
inline

Definition at line 86 of file ColumnarResults.h.

References CHECK_GE, CHECK_LT, and target_types_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

86  {
87  CHECK_GE(col_id, 0);
88  CHECK_LT(static_cast<size_t>(col_id), target_types_.size());
89  return target_types_[col_id];
90  }
#define CHECK_GE(x, y)
Definition: Logger.h:222
#define CHECK_LT(x, y)
Definition: Logger.h:219
const std::vector< SQLTypeInfo > target_types_

+ Here is the caller graph for this function:

std::tuple< std::vector< ColumnarResults::WriteFunction >, std::vector< ColumnarResults::ReadFunction > > ColumnarResults::initAllConversionFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

This function goes through all target types in the output, and chooses appropriate write and read functions per target. The goal is then to simply use these functions for each row and per target. Read functions are used to read each cell's data content (particular target in a row), and write functions are used to properly write back the cell's content into the output column buffers.

Definition at line 1200 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, initWriteFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntriesWithoutTargetSkipping(), and compactAndCopyEntriesWithTargetSkipping().

1203  {
1205  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1206  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1207 
1208  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1209  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1210  if (rows.didOutputColumnar()) {
1211  return std::make_tuple(
1212  std::move(write_functions),
1213  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1214  rows, slot_idx_per_target_idx, targets_to_skip));
1215  } else {
1216  return std::make_tuple(
1217  std::move(write_functions),
1218  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1219  rows, slot_idx_per_target_idx, targets_to_skip));
1220  }
1221  } else {
1222  if (rows.didOutputColumnar()) {
1223  return std::make_tuple(
1224  std::move(write_functions),
1225  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1226  rows, slot_idx_per_target_idx, targets_to_skip));
1227  } else {
1228  return std::make_tuple(
1229  std::move(write_functions),
1230  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1231  rows, slot_idx_per_target_idx, targets_to_skip));
1232  }
1233  }
1234 }
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ColumnarResults::ReadFunction > ColumnarResults::initReadFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initializes a set of read funtions to properly access the contents of the result set's storage buffer. Each particular read function is chosen based on the data type and data size used to store that target in the result set's storage buffer. These functions are then used for each row in the result set.

Definition at line 1102 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, GroupByBaselineHash, anonymous_namespace{ColumnarResults.cpp}::invalid_read_func(), isDirectColumnarConversionPossible(), kDOUBLE, kFLOAT, target_types_, and UNREACHABLE.

1105  {
1107  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1108  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1109 
1110  std::vector<ReadFunction> read_functions;
1111  read_functions.reserve(target_types_.size());
1112 
1113  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1114  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1115  // for targets that should be skipped, we use a placeholder function that should
1116  // never be called. The CHECKs inside it make sure that never happens.
1117  read_functions.emplace_back(invalid_read_func);
1118  continue;
1119  }
1120 
1121  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1122  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1123  // for key columns only
1124  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1125  if (target_types_[target_idx].is_fp()) {
1126  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1127  switch (target_types_[target_idx].get_type()) {
1128  case kFLOAT:
1129  read_functions.emplace_back(
1130  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1131  break;
1132  case kDOUBLE:
1133  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1134  break;
1135  default:
1136  UNREACHABLE()
1137  << "Invalid data type encountered (BaselineHash, floating point key).";
1138  break;
1139  }
1140  } else {
1141  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1142  case 8:
1143  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1144  break;
1145  case 4:
1146  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1147  break;
1148  default:
1149  UNREACHABLE()
1150  << "Invalid data type encountered (BaselineHash, integer key).";
1151  }
1152  }
1153  continue;
1154  }
1155  }
1156  if (target_types_[target_idx].is_fp()) {
1157  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1158  case 8:
1159  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1160  break;
1161  case 4:
1162  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1163  break;
1164  default:
1165  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1166  break;
1167  }
1168  } else {
1169  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1170  case 8:
1171  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1172  break;
1173  case 4:
1174  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1175  break;
1176  case 2:
1177  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1178  break;
1179  case 1:
1180  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1181  break;
1182  default:
1183  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1184  break;
1185  }
1186  }
1187  }
1188  return read_functions;
1189 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
#define UNREACHABLE()
Definition: Logger.h:253
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

std::vector< ColumnarResults::WriteFunction > ColumnarResults::initWriteFunctions ( const ResultSet rows,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initialize a set of write functions per target (i.e., column). Target types' logical size are used to categorize the correct write function per target. These functions are then used for every row in the result set.

Definition at line 914 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), run_benchmark_import::result, target_types_, and UNREACHABLE.

Referenced by initAllConversionFunctions().

916  {
918  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
919  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
920 
921  std::vector<WriteFunction> result;
922  result.reserve(target_types_.size());
923 
924  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
925  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
926  result.emplace_back([](const ResultSet& rows,
927  const size_t input_buffer_entry_idx,
928  const size_t output_buffer_entry_idx,
929  const size_t target_idx,
930  const size_t slot_idx,
931  const ReadFunction& read_function) {
932  UNREACHABLE() << "Invalid write back function used.";
933  });
934  continue;
935  }
936 
937  if (target_types_[target_idx].is_fp()) {
938  switch (target_types_[target_idx].get_size()) {
939  case 8:
940  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
941  this,
942  std::placeholders::_1,
943  std::placeholders::_2,
944  std::placeholders::_3,
945  std::placeholders::_4,
946  std::placeholders::_5,
947  std::placeholders::_6));
948  break;
949  case 4:
950  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
951  this,
952  std::placeholders::_1,
953  std::placeholders::_2,
954  std::placeholders::_3,
955  std::placeholders::_4,
956  std::placeholders::_5,
957  std::placeholders::_6));
958  break;
959  default:
960  UNREACHABLE() << "Invalid target type encountered.";
961  break;
962  }
963  } else {
964  switch (target_types_[target_idx].get_size()) {
965  case 8:
966  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
967  this,
968  std::placeholders::_1,
969  std::placeholders::_2,
970  std::placeholders::_3,
971  std::placeholders::_4,
972  std::placeholders::_5,
973  std::placeholders::_6));
974  break;
975  case 4:
976  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
977  this,
978  std::placeholders::_1,
979  std::placeholders::_2,
980  std::placeholders::_3,
981  std::placeholders::_4,
982  std::placeholders::_5,
983  std::placeholders::_6));
984  break;
985  case 2:
986  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
987  this,
988  std::placeholders::_1,
989  std::placeholders::_2,
990  std::placeholders::_3,
991  std::placeholders::_4,
992  std::placeholders::_5,
993  std::placeholders::_6));
994  break;
995  case 1:
996  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
997  this,
998  std::placeholders::_1,
999  std::placeholders::_2,
1000  std::placeholders::_3,
1001  std::placeholders::_4,
1002  std::placeholders::_5,
1003  std::placeholders::_6));
1004  break;
1005  default:
1006  UNREACHABLE() << "Invalid target type encountered.";
1007  break;
1008  }
1009  }
1010  }
1011  return result;
1012 }
#define UNREACHABLE()
Definition: Logger.h:253
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ColumnarResults::isDirectColumnarConversionPossible ( ) const
inline
bool ColumnarResults::isParallelConversion ( ) const
inline

Definition at line 92 of file ColumnarResults.h.

References parallel_conversion_.

Referenced by materializeAllColumnsGroupBy(), and materializeAllColumnsThroughIteration().

92 { return parallel_conversion_; }

+ Here is the caller graph for this function:

void ColumnarResults::locateAndCountEntries ( const ResultSet rows,
ColumnBitmap bitmap,
std::vector< size_t > &  non_empty_per_thread,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
) const
private

This function goes through all the keys in the result set, and count the total number of non-empty keys. It also store the location of non-empty keys in a bitmap data structure for later faster access.

Definition at line 568 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), ColumnBitmap::set(), and UNLIKELY.

Referenced by materializeAllColumnsGroupBy().

573  {
575  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
576  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
577  CHECK_EQ(num_threads, non_empty_per_thread.size());
578  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
579  const size_t local_idx,
580  const size_t entry_idx,
581  const size_t thread_idx) {
582  if (!rows.isRowAtEmpty(entry_idx)) {
583  total_non_empty++;
584  bitmap.set(local_idx, thread_idx, true);
585  }
586  };
587  auto locate_and_count_func =
588  [&do_work, &non_empty_per_thread, this](
589  size_t start_index, size_t end_index, size_t thread_idx) {
590  size_t total_non_empty = 0;
591  size_t local_idx = 0;
593  for (size_t entry_idx = start_index; entry_idx < end_index;
594  entry_idx++, local_idx++) {
595  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
596  executor_->checkNonKernelTimeInterrupted())) {
598  }
599  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
600  }
601  } else {
602  for (size_t entry_idx = start_index; entry_idx < end_index;
603  entry_idx++, local_idx++) {
604  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
605  }
606  }
607  non_empty_per_thread[thread_idx] = total_non_empty;
608  };
609 
610  std::vector<std::future<void>> conversion_threads;
611  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
612  const size_t start_entry = thread_idx * size_per_thread;
613  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
614  conversion_threads.push_back(std::async(
615  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
616  }
617 
618  try {
619  for (auto& child : conversion_threads) {
620  child.wait();
621  }
622  } catch (QueryExecutionError& e) {
625  }
626  throw e;
627  } catch (...) {
628  throw;
629  }
630 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
void set(const size_t index, const size_t bank_index, const bool val)
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsDirectly ( const ResultSet rows,
const size_t  num_columns 
)
private

This function materializes all columns from the main storage and all appended storages and form a single continguous column for each output column. Depending on whether the column is lazily fetched or not, it will treat them differently.

NOTE: this function should only be used when the result set is columnar and completely compacted (e.g., in columnar projections).

Definition at line 353 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), materializeAllColumnsGroupBy(), materializeAllColumnsProjection(), Projection, and UNREACHABLE.

Referenced by ColumnarResults().

354  {
356  switch (rows.getQueryDescriptionType()) {
358  materializeAllColumnsProjection(rows, num_columns);
359  break;
360  }
363  materializeAllColumnsGroupBy(rows, num_columns);
364  break;
365  }
366  default:
367  UNREACHABLE()
368  << "Direct columnar conversion for this query type is not supported yet.";
369  }
370 }
#define UNREACHABLE()
Definition: Logger.h:253
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsGroupBy ( const ResultSet rows,
const size_t  num_columns 
)
private

This function is to directly columnarize a result set for group by queries. Its main difference with the traditional alternative is that it directly reads non-empty entries from the result set, and then writes them into output column buffers, rather than using the result set's iterators.

Definition at line 533 of file ColumnarResults.cpp.

References CHECK, compactAndCopyEntries(), cpu_threads(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), isParallelConversion(), and locateAndCountEntries().

Referenced by materializeAllColumnsDirectly().

534  {
536  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
537  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
538 
539  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
540  const size_t entry_count = rows.entryCount();
541  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
542 
543  // step 1: compute total non-empty elements and store a bitmap per thread
544  std::vector<size_t> non_empty_per_thread(num_threads,
545  0); // number of non-empty entries per thread
546 
547  ColumnBitmap bitmap(size_per_thread, num_threads);
548 
550  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
551 
552  // step 2: go through the generated bitmap and copy/decode corresponding entries
553  // into the output buffer
555  bitmap,
556  non_empty_per_thread,
557  num_columns,
558  entry_count,
559  num_threads,
560  size_per_thread);
561 }
bool isParallelConversion() const
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:209
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsProjection ( const ResultSet rows,
const size_t  num_columns 
)
private

This function handles materialization for two types of columns in columnar projections:

  1. for all non-lazy columns, it directly copies the results from the result set's storage into the output column buffers
  2. for all lazy fetched columns, it uses result set's iterators to decode the proper values before storing them into the output column buffers

Definition at line 379 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), materializeAllLazyColumns(), and Projection.

Referenced by materializeAllColumnsDirectly().

380  {
381  CHECK(rows.query_mem_desc_.didOutputColumnar());
383  rows.query_mem_desc_.getQueryDescriptionType() ==
385 
386  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
387 
388  // We can directly copy each non-lazy column's content
389  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
390 
391  // Only lazy columns are iterated through first and then materialized
392  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
393 }
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:209
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsThroughIteration ( const ResultSet rows,
const size_t  num_columns 
)
private

This function iterates through the result set (using the getRowAtNoTranslation and getNextRow family of functions) and writes back the results into output column buffers.

Definition at line 162 of file ColumnarResults.cpp.

References threading_serial::async(), cpu_threads(), Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), i, isParallelConversion(), makeIntervals(), num_rows_, UNLIKELY, and writeBackCell().

Referenced by ColumnarResults().

163  {
164  std::atomic<size_t> row_idx{0};
165  if (isParallelConversion()) {
166  const size_t worker_count = cpu_threads();
167  std::vector<std::future<void>> conversion_threads;
168  const auto do_work = [num_columns, &rows, &row_idx, this](const size_t i) {
169  const auto crt_row = rows.getRowAtNoTranslations(i);
170  if (!crt_row.empty()) {
171  auto cur_row_idx = row_idx.fetch_add(1);
172  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
173  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx);
174  }
175  }
176  };
177  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
178  conversion_threads.push_back(std::async(
180  [&do_work, this](const size_t start, const size_t end) {
182  size_t local_idx = 0;
183  for (size_t i = start; i < end; ++i, ++local_idx) {
184  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
185  executor_->checkNonKernelTimeInterrupted())) {
187  }
188  do_work(i);
189  }
190  } else {
191  for (size_t i = start; i < end; ++i) {
192  do_work(i);
193  }
194  }
195  },
196  interval.begin,
197  interval.end));
198  }
199 
200  try {
201  for (auto& child : conversion_threads) {
202  child.wait();
203  }
204  } catch (QueryExecutionError& e) {
207  }
208  throw e;
209  } catch (...) {
210  throw;
211  }
212 
213  num_rows_ = row_idx;
214  rows.setCachedRowCount(num_rows_);
215  return;
216  }
217  bool done = false;
218  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
219  const auto crt_row = rows.getNextRow(false, false);
220  if (crt_row.empty()) {
221  done = true;
222  return;
223  }
224  for (size_t i = 0; i < num_columns; ++i) {
225  writeBackCell(crt_row[i], row_idx, i);
226  }
227  ++row_idx;
228  };
230  while (!done) {
231  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
232  executor_->checkNonKernelTimeInterrupted())) {
234  }
235  do_work();
236  }
237  } else {
238  while (!done) {
239  do_work();
240  }
241  }
242 
243  rows.moveToBegin();
244 }
bool isParallelConversion() const
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
future< Result > async(Fn &&fn, Args &&...args)
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

For all lazy fetched columns, we should iterate through the column's content and properly materialize it.

This function is parallelized through dividing total rows among all existing threads. Since there's no invalid element in the result set (e.g., columnar projections), the output buffer will have as many rows as there are in the result set, removing the need for atomicly incrementing the output buffer position.

Definition at line 447 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, cpu_threads(), Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), i, isDirectColumnarConversionPossible(), makeIntervals(), UNLIKELY, result_set::use_parallel_algorithms(), and writeBackCell().

Referenced by materializeAllColumnsProjection().

450  {
452  const auto do_work_just_lazy_columns = [num_columns, &rows, this](
453  const size_t row_idx,
454  const std::vector<bool>& targets_to_skip) {
455  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
456  for (size_t i = 0; i < num_columns; ++i) {
457  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
458  writeBackCell(crt_row[i], row_idx, i);
459  }
460  }
461  };
462 
463  const auto contains_lazy_fetched_column =
464  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
465  for (auto& col_info : lazy_fetch_info) {
466  if (col_info.is_lazily_fetched) {
467  return true;
468  }
469  }
470  return false;
471  };
472 
473  // parallelized by assigning a chunk of rows to each thread)
474  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
475  if (contains_lazy_fetched_column(lazy_fetch_info)) {
476  const size_t worker_count =
478  std::vector<std::future<void>> conversion_threads;
479  std::vector<bool> targets_to_skip;
480  if (skip_non_lazy_columns) {
481  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
482  targets_to_skip.reserve(num_columns);
483  for (size_t i = 0; i < num_columns; i++) {
484  // we process lazy columns (i.e., skip non-lazy columns)
485  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
486  }
487  }
488  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
489  conversion_threads.push_back(std::async(
491  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
492  const size_t end) {
494  size_t local_idx = 0;
495  for (size_t i = start; i < end; ++i, ++local_idx) {
496  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
497  executor_->checkNonKernelTimeInterrupted())) {
499  }
500  do_work_just_lazy_columns(i, targets_to_skip);
501  }
502  } else {
503  for (size_t i = start; i < end; ++i) {
504  do_work_just_lazy_columns(i, targets_to_skip);
505  }
506  }
507  },
508  interval.begin,
509  interval.end));
510  }
511 
512  try {
513  for (auto& child : conversion_threads) {
514  child.wait();
515  }
516  } catch (QueryExecutionError& e) {
519  }
520  throw e;
521  } catch (...) {
522  throw;
523  }
524  }
525 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1163
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:116
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:119
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1139
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
#define CHECK(condition)
Definition: Logger.h:209
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< ColumnarResults > ColumnarResults::mergeResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const std::vector< std::unique_ptr< ColumnarResults >> &  sub_results 
)
static

Definition at line 118 of file ColumnarResults.cpp.

References gpu_enabled::accumulate(), CHECK_EQ, ColumnarResults(), logger::init(), run_benchmark_import::result, and target_types_.

Referenced by ColumnFetcher::getAllTableColumnFragments().

120  {
121  if (sub_results.empty()) {
122  return nullptr;
123  }
124  const auto total_row_count = std::accumulate(
125  sub_results.begin(),
126  sub_results.end(),
127  size_t(0),
128  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
129  return init + result->size();
130  });
131  std::unique_ptr<ColumnarResults> merged_results(
132  new ColumnarResults(total_row_count, sub_results[0]->target_types_));
133  const auto col_count = sub_results[0]->column_buffers_.size();
134  const auto nonempty_it = std::find_if(
135  sub_results.begin(),
136  sub_results.end(),
137  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
138  if (nonempty_it == sub_results.end()) {
139  return nullptr;
140  }
141  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
142  const auto byte_width = (*nonempty_it)->getColumnType(col_idx).get_size();
143  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
144  merged_results->column_buffers_.push_back(write_ptr);
145  for (auto& rs : sub_results) {
146  CHECK_EQ(col_count, rs->column_buffers_.size());
147  if (!rs->size()) {
148  continue;
149  }
150  CHECK_EQ(byte_width, rs->getColumnType(col_idx).get_size());
151  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
152  write_ptr += rs->size() * byte_width;
153  }
154  }
155  return merged_results;
156 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
void init(LogOptions const &log_opts)
Definition: Logger.cpp:290
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
const std::vector< SQLTypeInfo > target_types_
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const size_t ColumnarResults::size ( ) const
inline

Definition at line 84 of file ColumnarResults.h.

References num_rows_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

84 { return num_rows_; }

+ Here is the caller graph for this function:

void ColumnarResults::writeBackCell ( const TargetValue col_val,
const size_t  row_idx,
const size_t  column_idx 
)
inlineprivate

Definition at line 254 of file ColumnarResults.cpp.

References CHECK, column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), kDOUBLE, kFLOAT, and target_types_.

Referenced by compactAndCopyEntriesWithTargetSkipping(), materializeAllColumnsThroughIteration(), and materializeAllLazyColumns().

256  {
257  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
258  CHECK(scalar_col_val);
259  auto i64_p = boost::get<int64_t>(scalar_col_val);
260  const auto& type_info = target_types_[column_idx];
261  if (i64_p) {
262  const auto val = fixed_encoding_nullable_val(*i64_p, type_info);
263  switch (target_types_[column_idx].get_size()) {
264  case 1:
265  ((int8_t*)column_buffers_[column_idx])[row_idx] = static_cast<int8_t>(val);
266  break;
267  case 2:
268  ((int16_t*)column_buffers_[column_idx])[row_idx] = static_cast<int16_t>(val);
269  break;
270  case 4:
271  ((int32_t*)column_buffers_[column_idx])[row_idx] = static_cast<int32_t>(val);
272  break;
273  case 8:
274  ((int64_t*)column_buffers_[column_idx])[row_idx] = val;
275  break;
276  default:
277  CHECK(false);
278  }
279  } else {
280  CHECK(target_types_[column_idx].is_fp());
281  switch (target_types_[column_idx].get_type()) {
282  case kFLOAT: {
283  auto float_p = boost::get<float>(scalar_col_val);
284  ((float*)column_buffers_[column_idx])[row_idx] = static_cast<float>(*float_p);
285  break;
286  }
287  case kDOUBLE: {
288  auto double_p = boost::get<double>(scalar_col_val);
289  ((double*)column_buffers_[column_idx])[row_idx] = static_cast<double>(*double_p);
290  break;
291  }
292  default:
293  CHECK(false);
294  }
295  }
296 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
#define CHECK(condition)
Definition: Logger.h:209
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DATA_TYPE >
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

A set of write functions to be used to directly write into final column_buffers_. The read_from_function is used to read from the input result set's storage NOTE: currently only used for direct columnarizations

Definition at line 304 of file ColumnarResults.cpp.

References column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), and target_types_.

309  {
310  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
311  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
312  target_types_[target_idx]));
313  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
314  val;
315 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 318 of file ColumnarResults.cpp.

323  {
324  const int32_t ival =
325  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
326  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
327  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
328 }
std::vector< int8_t * > column_buffers_
template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 331 of file ColumnarResults.cpp.

337  {
338  const int64_t ival =
339  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
340  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
341  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
342 }
std::vector< int8_t * > column_buffers_

Member Data Documentation

std::vector<int8_t*> ColumnarResults::column_buffers_
protected
bool ColumnarResults::direct_columnar_conversion_
private

Definition at line 196 of file ColumnarResults.h.

Referenced by isDirectColumnarConversionPossible().

size_t ColumnarResults::num_rows_
protected
bool ColumnarResults::parallel_conversion_
private

Definition at line 195 of file ColumnarResults.h.

Referenced by isParallelConversion().

const std::vector<SQLTypeInfo> ColumnarResults::target_types_
private
size_t ColumnarResults::thread_idx_
private

Definition at line 198 of file ColumnarResults.h.

Referenced by ColumnarResults().


The documentation for this class was generated from the following files: