OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnarResults Class Reference

#include <ColumnarResults.h>

Public Types

using ReadFunction = std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)>
 
using WriteFunction = std::function< void(const ResultSet &, const size_t, const size_t, const size_t, const size_t, const ReadFunction &)>
 

Public Member Functions

 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)
 
 ColumnarResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const int8_t *one_col_buffer, const size_t num_rows, const SQLTypeInfo &target_type, const size_t executor_id, const size_t thread_idx)
 
const std::vector< int8_t * > & getColumnBuffers () const
 
const size_t size () const
 
const SQLTypeInfogetColumnType (const int col_id) const
 
bool isParallelConversion () const
 
bool isDirectColumnarConversionPossible () const
 

Static Public Member Functions

static std::unique_ptr
< ColumnarResults
mergeResults (const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
 

Protected Attributes

std::vector< int8_t * > column_buffers_
 
size_t num_rows_
 

Private Member Functions

 ColumnarResults (const size_t num_rows, const std::vector< SQLTypeInfo > &target_types, const std::vector< size_t > &padded_target_sizes)
 
void writeBackCell (const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
 
void materializeAllColumnsDirectly (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsThroughIteration (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsGroupBy (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsProjection (const ResultSet &rows, const size_t num_columns)
 
void materializeAllColumnsTableFunction (const ResultSet &rows, const size_t num_columns)
 
void copyAllNonLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void materializeAllLazyColumns (const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
 
void locateAndCountEntries (const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
 
void compactAndCopyEntries (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
void compactAndCopyEntriesWithoutTargetSkipping (const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
 
template<typename DATA_TYPE >
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
 
std::vector< WriteFunctioninitWriteFunctions (const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
 
template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ReadFunctioninitReadFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
std::tuple< std::vector
< WriteFunction >, std::vector
< ReadFunction > > 
initAllConversionFunctions (const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 
template<>
void writeBackCellDirect (const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_from_function)
 

Private Attributes

const std::vector< SQLTypeInfotarget_types_
 
bool parallel_conversion_
 
bool direct_columnar_conversion_
 
size_t thread_idx_
 
std::shared_ptr< Executorexecutor_
 
std::vector< size_t > padded_target_sizes_
 

Detailed Description

Definition at line 61 of file ColumnarResults.h.

Member Typedef Documentation

using ColumnarResults::ReadFunction = std::function<int64_t(const ResultSet&, const size_t, const size_t, const size_t)>

Definition at line 98 of file ColumnarResults.h.

using ColumnarResults::WriteFunction = std::function<void(const ResultSet&, const size_t, const size_t, const size_t, const size_t, const ReadFunction&)>

Definition at line 107 of file ColumnarResults.h.

Constructor & Destructor Documentation

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const ResultSet rows,
const size_t  num_columns,
const std::vector< SQLTypeInfo > &  target_types,
const size_t  executor_id,
const size_t  thread_idx,
const bool  is_parallel_execution_enforced = false 
)

Definition at line 149 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, column_buffers_, anonymous_namespace{ColumnarResults.cpp}::countNumberOfValues(), DEBUG_TIMER, executor_, Executor::getExecutor(), getVarlenArrayBufferSize(), initializeVarlenArray(), isDirectColumnarConversionPossible(), FlatBufferManager::isFlatBuffer(), kENCODING_NONE, materializeAllColumnsDirectly(), materializeAllColumnsThroughIteration(), num_rows_, padded_target_sizes_, and thread_idx_.

Referenced by mergeResults().

156  : column_buffers_(num_columns)
158  rows.isDirectColumnarConversionPossible()
159  ? rows.entryCount()
160  : rows.rowCount())
161  , target_types_(target_types)
162  , parallel_conversion_(is_parallel_execution_enforced
163  ? true
165  , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
166  , thread_idx_(thread_idx)
167  , padded_target_sizes_(get_padded_target_sizes(rows, target_types)) {
168  auto timer = DEBUG_TIMER(__func__);
169  column_buffers_.resize(num_columns);
170  executor_ = Executor::getExecutor(executor_id);
171  CHECK(executor_);
172  CHECK_EQ(padded_target_sizes_.size(), target_types.size());
173  for (size_t i = 0; i < num_columns; ++i) {
174  const auto ti = target_types[i];
175  if (ti.is_array()) {
177  rows.isZeroCopyColumnarConversionPossible(i)) {
178  const int8_t* col_buf = rows.getColumnarBuffer(i);
180  } else {
181  int64_t values_count = countNumberOfValues(rows, i);
182  const int64_t flatbuffer_size =
183  getVarlenArrayBufferSize(num_rows_, values_count, ti);
184  column_buffers_[i] = row_set_mem_owner->allocate(flatbuffer_size, thread_idx_);
186  initializeVarlenArray(m, num_rows_, values_count, ti);
187  }
188  } else {
189  const bool is_varlen =
190  (ti.is_string() && ti.get_compression() == kENCODING_NONE) || ti.is_geometry();
191  if (is_varlen) {
193  }
195  !rows.isZeroCopyColumnarConversionPossible(i)) {
196  column_buffers_[i] =
197  row_set_mem_owner->allocate(num_rows_ * padded_target_sizes_[i], thread_idx_);
198  }
199  }
200  }
201 
202  if (isDirectColumnarConversionPossible() && rows.entryCount() > 0) {
203  materializeAllColumnsDirectly(rows, num_columns);
204  } else {
205  materializeAllColumnsThroughIteration(rows, num_columns);
206  }
207 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int8_t * > column_buffers_
int64_t countNumberOfValues(const ResultSet &rows, const size_t column_idx)
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1482
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
static bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:184
bool isDirectColumnarConversionPossible() const
void initializeVarlenArray(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1460
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
int64_t getVarlenArrayBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1440
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ColumnarResults::ColumnarResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const int8_t *  one_col_buffer,
const size_t  num_rows,
const SQLTypeInfo target_type,
const size_t  executor_id,
const size_t  thread_idx 
)

Definition at line 209 of file ColumnarResults.cpp.

215  : column_buffers_(1)
216  , num_rows_(num_rows)
217  , target_types_{target_type}
218  , parallel_conversion_(false)
220  , thread_idx_(thread_idx) {
221  auto timer = DEBUG_TIMER(__func__);
222  const bool is_varlen =
223  target_type.is_array() ||
224  (target_type.is_string() && target_type.get_compression() == kENCODING_NONE) ||
225  target_type.is_geometry();
226  if (is_varlen) {
228  }
229  executor_ = Executor::getExecutor(executor_id);
230  padded_target_sizes_.emplace_back(target_type.get_size());
231  CHECK(executor_);
232  const auto buf_size = num_rows * target_type.get_size();
233  column_buffers_[0] =
234  reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
235  memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
236 }
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
bool direct_columnar_conversion_
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:412
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
#define CHECK(condition)
Definition: Logger.h:222
bool is_geometry() const
Definition: sqltypes.h:612
#define DEBUG_TIMER(name)
Definition: Logger.h:371
bool is_string() const
Definition: sqltypes.h:600
const std::vector< SQLTypeInfo > target_types_
bool is_array() const
Definition: sqltypes.h:608
ColumnarResults::ColumnarResults ( const size_t  num_rows,
const std::vector< SQLTypeInfo > &  target_types,
const std::vector< size_t > &  padded_target_sizes 
)
inlineprivate

Definition at line 114 of file ColumnarResults.h.

117  : num_rows_(num_rows)
118  , target_types_(target_types)
119  , padded_target_sizes_(padded_target_sizes) {}
std::vector< size_t > padded_target_sizes_
const std::vector< SQLTypeInfo > target_types_

Member Function Documentation

void ColumnarResults::compactAndCopyEntries ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This function goes through all non-empty elements marked in the bitmap data structure, and store them back into output column buffers. The output column buffers are compacted without any holes in it.

TODO(Saman): if necessary, we can look into the distribution of non-empty entries and choose a different load-balanced strategy (assigning equal number of non-empties to each thread) as opposed to equal partitioning of the bitmap

Definition at line 791 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, compactAndCopyEntriesWithoutTargetSkipping(), compactAndCopyEntriesWithTargetSkipping(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), and gpu_enabled::partial_sum().

Referenced by materializeAllColumnsGroupBy().

798  {
800  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
801  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
802  CHECK_EQ(num_threads, non_empty_per_thread.size());
803 
804  // compute the exclusive scan over all non-empty totals
805  std::vector<size_t> global_offsets(num_threads + 1, 0);
806  std::partial_sum(non_empty_per_thread.begin(),
807  non_empty_per_thread.end(),
808  std::next(global_offsets.begin()));
809 
810  const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
811  const auto [single_slot_targets_to_skip, num_single_slot_targets] =
812  rows.getSupportedSingleSlotTargetBitmap();
813 
814  // We skip multi-slot targets (e.g., AVG). These skipped targets are treated
815  // differently and accessed through result set's iterator
816  if (num_single_slot_targets < num_columns) {
818  bitmap,
819  non_empty_per_thread,
820  global_offsets,
821  single_slot_targets_to_skip,
822  slot_idx_per_target_idx,
823  num_columns,
824  entry_count,
825  num_threads,
826  size_per_thread);
827  } else {
829  bitmap,
830  non_empty_per_thread,
831  global_offsets,
832  slot_idx_per_target_idx,
833  num_columns,
834  entry_count,
835  num_threads,
836  size_per_thread);
837  }
838 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithoutTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, all targets are assumed to be single-slot and thus can be directly columnarized.

Definition at line 966 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), and UNLIKELY.

Referenced by compactAndCopyEntries().

975  {
977  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
978  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
979 
980  const auto [write_functions, read_functions] =
981  initAllConversionFunctions(rows, slot_idx_per_target_idx);
982  CHECK_EQ(write_functions.size(), num_columns);
983  CHECK_EQ(read_functions.size(), num_columns);
984  auto do_work = [&rows,
985  &bitmap,
986  &global_offsets,
987  &num_columns,
988  &slot_idx_per_target_idx,
989  &write_functions = write_functions,
990  &read_functions = read_functions](size_t& entry_idx,
991  size_t& non_empty_idx,
992  const size_t total_non_empty,
993  const size_t local_idx,
994  const size_t thread_idx,
995  const size_t end_idx) {
996  if (non_empty_idx >= total_non_empty) {
997  // all non-empty entries has been written back
998  entry_idx = end_idx;
999  return;
1000  }
1001  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1002  if (bitmap.get(local_idx, thread_idx)) {
1003  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1004  write_functions[column_idx](rows,
1005  entry_idx,
1006  output_buffer_row_idx,
1007  column_idx,
1008  slot_idx_per_target_idx[column_idx],
1009  read_functions[column_idx]);
1010  }
1011  non_empty_idx++;
1012  }
1013  };
1014  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
1015  const size_t start_index,
1016  const size_t end_index,
1017  const size_t thread_idx) {
1018  const size_t total_non_empty = non_empty_per_thread[thread_idx];
1019  size_t non_empty_idx = 0;
1020  size_t local_idx = 0;
1022  for (size_t entry_idx = start_index; entry_idx < end_index;
1023  entry_idx++, local_idx++) {
1024  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
1025  executor_->checkNonKernelTimeInterrupted())) {
1027  }
1028  do_work(
1029  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1030  }
1031  } else {
1032  for (size_t entry_idx = start_index; entry_idx < end_index;
1033  entry_idx++, local_idx++) {
1034  do_work(
1035  entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1036  }
1037  }
1038  };
1039 
1040  std::vector<std::future<void>> compaction_threads;
1041  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1042  const size_t start_entry = thread_idx * size_per_thread;
1043  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1044  compaction_threads.push_back(std::async(
1045  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
1046  }
1047 
1048  try {
1049  for (auto& child : compaction_threads) {
1050  child.wait();
1051  }
1052  } catch (QueryExecutionError& e) {
1055  }
1056  throw e;
1057  } catch (...) {
1058  throw;
1059  }
1060 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
future< Result > async(Fn &&fn, Args &&...args)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::compactAndCopyEntriesWithTargetSkipping ( const ResultSet rows,
const ColumnBitmap bitmap,
const std::vector< size_t > &  non_empty_per_thread,
const std::vector< size_t > &  global_offsets,
const std::vector< bool > &  targets_to_skip,
const std::vector< size_t > &  slot_idx_per_target_idx,
const size_t  num_columns,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
)
private

This functions takes a bitmap of non-empty entries within the result set's storage and compact and copy those contents back into the output column_buffers_. In this variation, multi-slot targets (e.g., AVG) are treated with the existing result set's iterations, but everything else is directly columnarized.

Definition at line 846 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, ColumnBitmap::get(), QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, initAllConversionFunctions(), isDirectColumnarConversionPossible(), UNLIKELY, and writeBackCell().

Referenced by compactAndCopyEntries().

856  {
858  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
859  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
860 
861  const auto [write_functions, read_functions] =
862  initAllConversionFunctions(rows, slot_idx_per_target_idx, targets_to_skip);
863  CHECK_EQ(write_functions.size(), num_columns);
864  CHECK_EQ(read_functions.size(), num_columns);
865  std::mutex write_mutex;
866  auto do_work = [this,
867  &bitmap,
868  &rows,
869  &slot_idx_per_target_idx,
870  &global_offsets,
871  &targets_to_skip,
872  &num_columns,
873  &write_mutex,
874  &write_functions = write_functions,
875  &read_functions = read_functions](size_t& non_empty_idx,
876  const size_t total_non_empty,
877  const size_t local_idx,
878  size_t& entry_idx,
879  const size_t thread_idx,
880  const size_t end_idx) {
881  if (non_empty_idx >= total_non_empty) {
882  // all non-empty entries has been written back
883  entry_idx = end_idx;
884  }
885  const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
886  if (bitmap.get(local_idx, thread_idx)) {
887  // targets that are recovered from the result set iterators:
888  const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
889  for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
890  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
892  crt_row[column_idx], output_buffer_row_idx, column_idx, &write_mutex);
893  }
894  }
895  // targets that are copied directly without any translation/decoding from
896  // result set
897  for (size_t column_idx = 0; column_idx < num_columns; column_idx++) {
898  if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
899  continue;
900  }
901  write_functions[column_idx](rows,
902  entry_idx,
903  output_buffer_row_idx,
904  column_idx,
905  slot_idx_per_target_idx[column_idx],
906  read_functions[column_idx]);
907  }
908  non_empty_idx++;
909  }
910  };
911 
912  auto compact_buffer_func = [&non_empty_per_thread, &do_work, this](
913  const size_t start_index,
914  const size_t end_index,
915  const size_t thread_idx) {
916  const size_t total_non_empty = non_empty_per_thread[thread_idx];
917  size_t non_empty_idx = 0;
918  size_t local_idx = 0;
920  for (size_t entry_idx = start_index; entry_idx < end_index;
921  entry_idx++, local_idx++) {
922  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
923  executor_->checkNonKernelTimeInterrupted())) {
925  }
926  do_work(
927  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
928  }
929  } else {
930  for (size_t entry_idx = start_index; entry_idx < end_index;
931  entry_idx++, local_idx++) {
932  do_work(
933  non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
934  }
935  }
936  };
937 
938  std::vector<std::future<void>> compaction_threads;
939  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
940  const size_t start_entry = thread_idx * size_per_thread;
941  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
942  compaction_threads.push_back(std::async(
943  std::launch::async, compact_buffer_func, start_entry, end_entry, thread_idx));
944  }
945 
946  try {
947  for (auto& child : compaction_threads) {
948  child.wait();
949  }
950  } catch (QueryExecutionError& e) {
953  }
954  throw e;
955  } catch (...) {
956  throw;
957  }
958 }
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
future< Result > async(Fn &&fn, Args &&...args)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::copyAllNonLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 545 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, column_buffers_, isDirectColumnarConversionPossible(), num_rows_, padded_target_sizes_, and TableFunction.

Referenced by materializeAllColumnsProjection(), and materializeAllColumnsTableFunction().

548  {
550  const auto is_column_non_lazily_fetched = [&lazy_fetch_info](const size_t col_idx) {
551  // Saman: make sure when this lazy_fetch_info is empty
552  if (lazy_fetch_info.empty()) {
553  return true;
554  } else {
555  return !lazy_fetch_info[col_idx].is_lazily_fetched;
556  }
557  };
558 
559  // parallelized by assigning each column to a thread
560  std::vector<std::future<void>> direct_copy_threads;
561  for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
562  if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
563  CHECK(!column_buffers_[col_idx]);
564  // The name of the method implies a copy but this is not a copy!!
565  column_buffers_[col_idx] = const_cast<int8_t*>(rows.getColumnarBuffer(col_idx));
566  } else if (is_column_non_lazily_fetched(col_idx)) {
567  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
569  direct_copy_threads.push_back(std::async(
571  [&rows, this](const size_t column_index) {
572  const size_t column_size = num_rows_ * padded_target_sizes_[column_index];
573  rows.copyColumnIntoBuffer(
574  column_index, column_buffers_[column_index], column_size);
575  },
576  col_idx));
577  }
578  }
579 
580  for (auto& child : direct_copy_threads) {
581  child.wait();
582  }
583 }
std::vector< int8_t * > column_buffers_
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
std::vector< size_t > padded_target_sizes_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const std::vector<int8_t*>& ColumnarResults::getColumnBuffers ( ) const
inline

Definition at line 82 of file ColumnarResults.h.

References column_buffers_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

82 { return column_buffers_; }
std::vector< int8_t * > column_buffers_

+ Here is the caller graph for this function:

const SQLTypeInfo& ColumnarResults::getColumnType ( const int  col_id) const
inline

Definition at line 86 of file ColumnarResults.h.

References CHECK_GE, CHECK_LT, and target_types_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

86  {
87  CHECK_GE(col_id, 0);
88  CHECK_LT(static_cast<size_t>(col_id), target_types_.size());
89  return target_types_[col_id];
90  }
#define CHECK_GE(x, y)
Definition: Logger.h:235
#define CHECK_LT(x, y)
Definition: Logger.h:232
const std::vector< SQLTypeInfo > target_types_

+ Here is the caller graph for this function:

std::tuple< std::vector< ColumnarResults::WriteFunction >, std::vector< ColumnarResults::ReadFunction > > ColumnarResults::initAllConversionFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

This function goes through all target types in the output, and chooses appropriate write and read functions per target. The goal is then to simply use these functions for each row and per target. Read functions are used to read each cell's data content (particular target in a row), and write functions are used to properly write back the cell's content into the output column buffers.

Definition at line 1353 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, initWriteFunctions(), and isDirectColumnarConversionPossible().

Referenced by compactAndCopyEntriesWithoutTargetSkipping(), and compactAndCopyEntriesWithTargetSkipping().

1356  {
1358  (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1359  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash));
1360 
1361  const auto write_functions = initWriteFunctions(rows, targets_to_skip);
1362  if (rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash) {
1363  if (rows.didOutputColumnar()) {
1364  return std::make_tuple(
1365  std::move(write_functions),
1366  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1367  rows, slot_idx_per_target_idx, targets_to_skip));
1368  } else {
1369  return std::make_tuple(
1370  std::move(write_functions),
1371  initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1372  rows, slot_idx_per_target_idx, targets_to_skip));
1373  }
1374  } else {
1375  if (rows.didOutputColumnar()) {
1376  return std::make_tuple(
1377  std::move(write_functions),
1378  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1379  rows, slot_idx_per_target_idx, targets_to_skip));
1380  } else {
1381  return std::make_tuple(
1382  std::move(write_functions),
1383  initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1384  rows, slot_idx_per_target_idx, targets_to_skip));
1385  }
1386  }
1387 }
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<QueryDescriptionType QUERY_TYPE, bool COLUMNAR_OUTPUT>
std::vector< ColumnarResults::ReadFunction > ColumnarResults::initReadFunctions ( const ResultSet rows,
const std::vector< size_t > &  slot_idx_per_target_idx,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initializes a set of read funtions to properly access the contents of the result set's storage buffer. Each particular read function is chosen based on the data type and data size used to store that target in the result set's storage buffer. These functions are then used for each row in the result set.

Definition at line 1255 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, GroupByBaselineHash, anonymous_namespace{ColumnarResults.cpp}::invalid_read_func(), isDirectColumnarConversionPossible(), kDOUBLE, kFLOAT, target_types_, and UNREACHABLE.

1258  {
1260  CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1261  CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1262 
1263  std::vector<ReadFunction> read_functions;
1264  read_functions.reserve(target_types_.size());
1265 
1266  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1267  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1268  // for targets that should be skipped, we use a placeholder function that should
1269  // never be called. The CHECKs inside it make sure that never happens.
1270  read_functions.emplace_back(invalid_read_func);
1271  continue;
1272  }
1273 
1274  if (QUERY_TYPE == QueryDescriptionType::GroupByBaselineHash) {
1275  if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1276  // for key columns only
1277  CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1278  if (target_types_[target_idx].is_fp()) {
1279  CHECK_EQ(size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1280  switch (target_types_[target_idx].get_type()) {
1281  case kFLOAT:
1282  read_functions.emplace_back(
1283  read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1284  break;
1285  case kDOUBLE:
1286  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1287  break;
1288  default:
1289  UNREACHABLE()
1290  << "Invalid data type encountered (BaselineHash, floating point key).";
1291  break;
1292  }
1293  } else {
1294  switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1295  case 8:
1296  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1297  break;
1298  case 4:
1299  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1300  break;
1301  default:
1302  UNREACHABLE()
1303  << "Invalid data type encountered (BaselineHash, integer key).";
1304  }
1305  }
1306  continue;
1307  }
1308  }
1309  if (target_types_[target_idx].is_fp()) {
1310  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1311  case 8:
1312  read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1313  break;
1314  case 4:
1315  read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1316  break;
1317  default:
1318  UNREACHABLE() << "Invalid data type encountered (floating point agg column).";
1319  break;
1320  }
1321  } else {
1322  switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1323  case 8:
1324  read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1325  break;
1326  case 4:
1327  read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1328  break;
1329  case 2:
1330  read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1331  break;
1332  case 1:
1333  read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1334  break;
1335  default:
1336  UNREACHABLE() << "Invalid data type encountered (integer agg column).";
1337  break;
1338  }
1339  }
1340  }
1341  return read_functions;
1342 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
#define UNREACHABLE()
Definition: Logger.h:266
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

std::vector< ColumnarResults::WriteFunction > ColumnarResults::initWriteFunctions ( const ResultSet rows,
const std::vector< bool > &  targets_to_skip = {} 
)
private

Initialize a set of write functions per target (i.e., column). Target types' logical size are used to categorize the correct write function per target. These functions are then used for every row in the result set.

Definition at line 1067 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), run_benchmark_import::result, target_types_, and UNREACHABLE.

Referenced by initAllConversionFunctions().

1069  {
1071  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
1072  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
1073 
1074  std::vector<WriteFunction> result;
1075  result.reserve(target_types_.size());
1076 
1077  for (size_t target_idx = 0; target_idx < target_types_.size(); target_idx++) {
1078  if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1079  result.emplace_back([](const ResultSet& rows,
1080  const size_t input_buffer_entry_idx,
1081  const size_t output_buffer_entry_idx,
1082  const size_t target_idx,
1083  const size_t slot_idx,
1084  const ReadFunction& read_function) {
1085  UNREACHABLE() << "Invalid write back function used.";
1086  });
1087  continue;
1088  }
1089 
1090  if (target_types_[target_idx].is_fp()) {
1091  switch (target_types_[target_idx].get_size()) {
1092  case 8:
1093  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1094  this,
1095  std::placeholders::_1,
1096  std::placeholders::_2,
1097  std::placeholders::_3,
1098  std::placeholders::_4,
1099  std::placeholders::_5,
1100  std::placeholders::_6));
1101  break;
1102  case 4:
1103  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1104  this,
1105  std::placeholders::_1,
1106  std::placeholders::_2,
1107  std::placeholders::_3,
1108  std::placeholders::_4,
1109  std::placeholders::_5,
1110  std::placeholders::_6));
1111  break;
1112  default:
1113  UNREACHABLE() << "Invalid target type encountered.";
1114  break;
1115  }
1116  } else {
1117  switch (target_types_[target_idx].get_size()) {
1118  case 8:
1119  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1120  this,
1121  std::placeholders::_1,
1122  std::placeholders::_2,
1123  std::placeholders::_3,
1124  std::placeholders::_4,
1125  std::placeholders::_5,
1126  std::placeholders::_6));
1127  break;
1128  case 4:
1129  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1130  this,
1131  std::placeholders::_1,
1132  std::placeholders::_2,
1133  std::placeholders::_3,
1134  std::placeholders::_4,
1135  std::placeholders::_5,
1136  std::placeholders::_6));
1137  break;
1138  case 2:
1139  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1140  this,
1141  std::placeholders::_1,
1142  std::placeholders::_2,
1143  std::placeholders::_3,
1144  std::placeholders::_4,
1145  std::placeholders::_5,
1146  std::placeholders::_6));
1147  break;
1148  case 1:
1149  result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1150  this,
1151  std::placeholders::_1,
1152  std::placeholders::_2,
1153  std::placeholders::_3,
1154  std::placeholders::_4,
1155  std::placeholders::_5,
1156  std::placeholders::_6));
1157  break;
1158  default:
1159  UNREACHABLE() << "Invalid target type encountered.";
1160  break;
1161  }
1162  }
1163  }
1164  return result;
1165 }
#define UNREACHABLE()
Definition: Logger.h:266
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ColumnarResults::isDirectColumnarConversionPossible ( ) const
inline
bool ColumnarResults::isParallelConversion ( ) const
inline

Definition at line 92 of file ColumnarResults.h.

References parallel_conversion_.

Referenced by materializeAllColumnsGroupBy(), and materializeAllColumnsThroughIteration().

92 { return parallel_conversion_; }

+ Here is the caller graph for this function:

void ColumnarResults::locateAndCountEntries ( const ResultSet rows,
ColumnBitmap bitmap,
std::vector< size_t > &  non_empty_per_thread,
const size_t  entry_count,
const size_t  num_threads,
const size_t  size_per_thread 
) const
private

This function goes through all the keys in the result set, and count the total number of non-empty keys. It also store the location of non-empty keys in a bitmap data structure for later faster access.

Definition at line 718 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), ColumnBitmap::set(), and UNLIKELY.

Referenced by materializeAllColumnsGroupBy().

723  {
725  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
726  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
727  CHECK_EQ(num_threads, non_empty_per_thread.size());
728  auto do_work = [&rows, &bitmap](size_t& total_non_empty,
729  const size_t local_idx,
730  const size_t entry_idx,
731  const size_t thread_idx) {
732  if (!rows.isRowAtEmpty(entry_idx)) {
733  total_non_empty++;
734  bitmap.set(local_idx, thread_idx, true);
735  }
736  };
737  auto locate_and_count_func =
738  [&do_work, &non_empty_per_thread, this](
739  size_t start_index, size_t end_index, size_t thread_idx) {
740  size_t total_non_empty = 0;
741  size_t local_idx = 0;
743  for (size_t entry_idx = start_index; entry_idx < end_index;
744  entry_idx++, local_idx++) {
745  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
746  executor_->checkNonKernelTimeInterrupted())) {
748  }
749  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
750  }
751  } else {
752  for (size_t entry_idx = start_index; entry_idx < end_index;
753  entry_idx++, local_idx++) {
754  do_work(total_non_empty, local_idx, entry_idx, thread_idx);
755  }
756  }
757  non_empty_per_thread[thread_idx] = total_non_empty;
758  };
759 
760  std::vector<std::future<void>> conversion_threads;
761  for (size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
762  const size_t start_entry = thread_idx * size_per_thread;
763  const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
764  conversion_threads.push_back(std::async(
765  std::launch::async, locate_and_count_func, start_entry, end_entry, thread_idx));
766  }
767 
768  try {
769  for (auto& child : conversion_threads) {
770  child.wait();
771  }
772  } catch (QueryExecutionError& e) {
775  }
776  throw e;
777  } catch (...) {
778  throw;
779  }
780 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
void set(const size_t index, const size_t bank_index, const bool val)
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsDirectly ( const ResultSet rows,
const size_t  num_columns 
)
private

This function materializes all columns from the main storage and all appended storages and form a single continguous column for each output column. Depending on whether the column is lazily fetched or not, it will treat them differently.

NOTE: this function should only be used when the result set is columnar and completely compacted (e.g., in columnar projections).

Definition at line 477 of file ColumnarResults.cpp.

References CHECK, GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), materializeAllColumnsGroupBy(), materializeAllColumnsProjection(), materializeAllColumnsTableFunction(), Projection, TableFunction, and UNREACHABLE.

Referenced by ColumnarResults().

478  {
480  switch (rows.getQueryDescriptionType()) {
482  materializeAllColumnsProjection(rows, num_columns);
483  break;
484  }
486  materializeAllColumnsTableFunction(rows, num_columns);
487  break;
488  }
491  materializeAllColumnsGroupBy(rows, num_columns);
492  break;
493  }
494  default:
495  UNREACHABLE()
496  << "Direct columnar conversion for this query type is not supported yet.";
497  }
498 }
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
#define UNREACHABLE()
Definition: Logger.h:266
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsGroupBy ( const ResultSet rows,
const size_t  num_columns 
)
private

This function is to directly columnarize a result set for group by queries. Its main difference with the traditional alternative is that it directly reads non-empty entries from the result set, and then writes them into output column buffers, rather than using the result set's iterators.

Definition at line 683 of file ColumnarResults.cpp.

References CHECK, compactAndCopyEntries(), cpu_threads(), GroupByBaselineHash, GroupByPerfectHash, isDirectColumnarConversionPossible(), isParallelConversion(), and locateAndCountEntries().

Referenced by materializeAllColumnsDirectly().

684  {
686  CHECK(rows.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash ||
687  rows.getQueryDescriptionType() == QueryDescriptionType::GroupByBaselineHash);
688 
689  const size_t num_threads = isParallelConversion() ? cpu_threads() : 1;
690  const size_t entry_count = rows.entryCount();
691  const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
692 
693  // step 1: compute total non-empty elements and store a bitmap per thread
694  std::vector<size_t> non_empty_per_thread(num_threads,
695  0); // number of non-empty entries per thread
696 
697  ColumnBitmap bitmap(size_per_thread, num_threads);
698 
700  rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
701 
702  // step 2: go through the generated bitmap and copy/decode corresponding entries
703  // into the output buffer
705  bitmap,
706  non_empty_per_thread,
707  num_columns,
708  entry_count,
709  num_threads,
710  size_per_thread);
711 }
bool isParallelConversion() const
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
bool isDirectColumnarConversionPossible() const
#define CHECK(condition)
Definition: Logger.h:222
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsProjection ( const ResultSet rows,
const size_t  num_columns 
)
private

This function handles materialization for two types of columns in columnar projections:

  1. for all non-lazy columns, it directly copies the results from the result set's storage into the output column buffers
  2. for all lazy fetched columns, it uses result set's iterators to decode the proper values before storing them into the output column buffers

Definition at line 507 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), materializeAllLazyColumns(), and Projection.

Referenced by materializeAllColumnsDirectly().

508  {
509  CHECK(rows.query_mem_desc_.didOutputColumnar());
511  (rows.query_mem_desc_.getQueryDescriptionType() ==
513 
514  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
515 
516  // We can directly copy each non-lazy column's content
517  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
518 
519  // Only lazy columns are iterated through first and then materialized
520  materializeAllLazyColumns(lazy_fetch_info, rows, num_columns);
521 }
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:222
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsTableFunction ( const ResultSet rows,
const size_t  num_columns 
)
private

Definition at line 523 of file ColumnarResults.cpp.

References CHECK, copyAllNonLazyColumns(), isDirectColumnarConversionPossible(), and TableFunction.

Referenced by materializeAllColumnsDirectly().

524  {
525  CHECK(rows.query_mem_desc_.didOutputColumnar());
527  (rows.query_mem_desc_.getQueryDescriptionType() ==
529 
530  const auto& lazy_fetch_info = rows.getLazyFetchInfo();
531  // Lazy fetching is not currently allowed for table function outputs
532  for (const auto& col_lazy_fetch_info : lazy_fetch_info) {
533  CHECK(!col_lazy_fetch_info.is_lazily_fetched);
534  }
535  // We can directly copy each non-lazy column's content
536  copyAllNonLazyColumns(lazy_fetch_info, rows, num_columns);
537 }
bool isDirectColumnarConversionPossible() const
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllColumnsThroughIteration ( const ResultSet rows,
const size_t  num_columns 
)
private

This function iterates through the result set (using the getRowAtNoTranslation and getNextRow family of functions) and writes back the results into output column buffers.

Definition at line 284 of file ColumnarResults.cpp.

References threading_serial::async(), cpu_threads(), Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), isParallelConversion(), makeIntervals(), num_rows_, UNLIKELY, and writeBackCell().

Referenced by ColumnarResults().

285  {
286  std::atomic<size_t> row_idx{0};
287  if (isParallelConversion()) {
288  const size_t worker_count = cpu_threads();
289  std::vector<std::future<void>> conversion_threads;
290  std::mutex write_mutex;
291  const auto do_work =
292  [num_columns, &rows, &row_idx, &write_mutex, this](const size_t i) {
293  const auto crt_row = rows.getRowAtNoTranslations(i);
294  if (!crt_row.empty()) {
295  auto cur_row_idx = row_idx.fetch_add(1);
296  for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
297  writeBackCell(crt_row[col_idx], cur_row_idx, col_idx, &write_mutex);
298  }
299  }
300  };
301  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
302  conversion_threads.push_back(std::async(
304  [&do_work, this](const size_t start, const size_t end) {
306  size_t local_idx = 0;
307  for (size_t i = start; i < end; ++i, ++local_idx) {
308  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
309  executor_->checkNonKernelTimeInterrupted())) {
311  }
312  do_work(i);
313  }
314  } else {
315  for (size_t i = start; i < end; ++i) {
316  do_work(i);
317  }
318  }
319  },
320  interval.begin,
321  interval.end));
322  }
323 
324  try {
325  for (auto& child : conversion_threads) {
326  child.wait();
327  }
328  } catch (QueryExecutionError& e) {
331  }
332  throw e;
333  } catch (...) {
334  throw;
335  }
336 
337  num_rows_ = row_idx;
338  rows.setCachedRowCount(num_rows_);
339  return;
340  }
341  bool done = false;
342  const auto do_work = [num_columns, &row_idx, &rows, &done, this]() {
343  const auto crt_row = rows.getNextRow(false, false);
344  if (crt_row.empty()) {
345  done = true;
346  return;
347  }
348  for (size_t i = 0; i < num_columns; ++i) {
349  writeBackCell(crt_row[i], row_idx, i);
350  }
351  ++row_idx;
352  };
354  while (!done) {
355  if (UNLIKELY((row_idx & 0xFFFF) == 0 &&
356  executor_->checkNonKernelTimeInterrupted())) {
358  }
359  do_work();
360  }
361  } else {
362  while (!done) {
363  do_work();
364  }
365  }
366 
367  rows.moveToBegin();
368 }
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
bool isParallelConversion() const
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
future< Result > async(Fn &&fn, Args &&...args)
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ColumnarResults::materializeAllLazyColumns ( const std::vector< ColumnLazyFetchInfo > &  lazy_fetch_info,
const ResultSet rows,
const size_t  num_columns 
)
private

For all lazy fetched columns, we should iterate through the column's content and properly materialize it.

This function is parallelized through dividing total rows among all existing threads. Since there's no invalid element in the result set (e.g., columnar projections), the output buffer will have as many rows as there are in the result set, removing the need for atomicly incrementing the output buffer position.

Definition at line 594 of file ColumnarResults.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, cpu_threads(), Executor::ERR_INTERRUPTED, executor_, g_enable_non_kernel_time_query_interrupt, QueryExecutionError::getErrorCode(), isDirectColumnarConversionPossible(), makeIntervals(), TableFunction, UNLIKELY, result_set::use_parallel_algorithms(), and writeBackCell().

Referenced by materializeAllColumnsProjection().

597  {
599  CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
601  std::mutex write_mutex;
602  const auto do_work_just_lazy_columns = [num_columns, &rows, &write_mutex, this](
603  const size_t row_idx,
604  const std::vector<bool>& targets_to_skip) {
605  const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
606  for (size_t i = 0; i < num_columns; ++i) {
607  if (!targets_to_skip.empty() && !targets_to_skip[i]) {
608  writeBackCell(crt_row[i], row_idx, i, &write_mutex);
609  }
610  }
611  };
612 
613  const auto contains_lazy_fetched_column =
614  [](const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
615  for (auto& col_info : lazy_fetch_info) {
616  if (col_info.is_lazily_fetched) {
617  return true;
618  }
619  }
620  return false;
621  };
622 
623  // parallelized by assigning a chunk of rows to each thread)
624  const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
625  if (contains_lazy_fetched_column(lazy_fetch_info)) {
626  const size_t worker_count =
628  std::vector<std::future<void>> conversion_threads;
629  std::vector<bool> targets_to_skip;
630  if (skip_non_lazy_columns) {
631  CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
632  targets_to_skip.reserve(num_columns);
633  for (size_t i = 0; i < num_columns; i++) {
634  // we process lazy columns (i.e., skip non-lazy columns)
635  targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
636  }
637  }
638  for (auto interval : makeIntervals(size_t(0), rows.entryCount(), worker_count)) {
639  conversion_threads.push_back(std::async(
641  [&do_work_just_lazy_columns, &targets_to_skip, this](const size_t start,
642  const size_t end) {
644  size_t local_idx = 0;
645  for (size_t i = start; i < end; ++i, ++local_idx) {
646  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
647  executor_->checkNonKernelTimeInterrupted())) {
649  }
650  do_work_just_lazy_columns(i, targets_to_skip);
651  }
652  } else {
653  for (size_t i = start; i < end; ++i) {
654  do_work_just_lazy_columns(i, targets_to_skip);
655  }
656  }
657  },
658  interval.begin,
659  interval.end));
660  }
661 
662  try {
663  for (auto& child : conversion_threads) {
664  child.wait();
665  }
666  } catch (QueryExecutionError& e) {
669  }
670  throw e;
671  } catch (...) {
672  throw;
673  }
674  }
675 }
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
int32_t getErrorCode() const
Definition: ErrorHandling.h:55
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
Definition: Intervals.h:122
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1482
future< Result > async(Fn &&fn, Args &&...args)
bool isDirectColumnarConversionPossible() const
#define UNLIKELY(x)
Definition: likely.h:25
std::shared_ptr< Executor > executor_
#define CHECK(condition)
Definition: Logger.h:222
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::unique_ptr< ColumnarResults > ColumnarResults::mergeResults ( const std::shared_ptr< RowSetMemoryOwner row_set_mem_owner,
const std::vector< std::unique_ptr< ColumnarResults >> &  sub_results 
)
static

Definition at line 238 of file ColumnarResults.cpp.

References gpu_enabled::accumulate(), CHECK_EQ, ColumnarResults(), logger::init(), padded_target_sizes_, run_benchmark_import::result, and target_types_.

Referenced by ColumnFetcher::getAllTableColumnFragments().

240  {
241  if (sub_results.empty()) {
242  return nullptr;
243  }
244  const auto total_row_count = std::accumulate(
245  sub_results.begin(),
246  sub_results.end(),
247  size_t(0),
248  [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
249  return init + result->size();
250  });
251  std::unique_ptr<ColumnarResults> merged_results(
252  new ColumnarResults(total_row_count,
253  sub_results[0]->target_types_,
254  sub_results[0]->padded_target_sizes_));
255  const auto col_count = sub_results[0]->column_buffers_.size();
256  const auto nonempty_it = std::find_if(
257  sub_results.begin(),
258  sub_results.end(),
259  [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size(); });
260  if (nonempty_it == sub_results.end()) {
261  return nullptr;
262  }
263  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
264  const auto byte_width = merged_results->padded_target_sizes_[col_idx];
265  auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
266  merged_results->column_buffers_.push_back(write_ptr);
267  for (auto& rs : sub_results) {
268  CHECK_EQ(col_count, rs->column_buffers_.size());
269  if (!rs->size()) {
270  continue;
271  }
272  CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
273  memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
274  write_ptr += rs->size() * byte_width;
275  }
276  }
277  return merged_results;
278 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
void init(LogOptions const &log_opts)
Definition: Logger.cpp:308
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
std::vector< size_t > padded_target_sizes_
const std::vector< SQLTypeInfo > target_types_
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const size_t ColumnarResults::size ( ) const
inline

Definition at line 84 of file ColumnarResults.h.

References num_rows_.

Referenced by ColumnFetcher::transferColumnIfNeeded().

84 { return num_rows_; }

+ Here is the caller graph for this function:

void ColumnarResults::writeBackCell ( const TargetValue col_val,
const size_t  row_idx,
const size_t  column_idx,
std::mutex *  write_mutex = nullptr 
)
inlineprivate

Definition at line 379 of file ColumnarResults.cpp.

References CHECK, CHECK_EQ, column_buffers_, FlatBufferManager::isFlatBuffer(), target_types_, and anonymous_namespace{ColumnarResults.cpp}::toBuffer().

Referenced by compactAndCopyEntriesWithTargetSkipping(), materializeAllColumnsThroughIteration(), and materializeAllLazyColumns().

382  {
383  auto& type_info = target_types_[column_idx];
384  if (type_info.is_array()) {
386  FlatBufferManager m{column_buffers_[column_idx]};
387  const auto arr_tv = boost::get<ArrayTargetValue>(&col_val);
388  CHECK(arr_tv);
389  if (arr_tv->is_initialized()) {
390  const auto& vec = arr_tv->get();
391  auto array_item_size = type_info.get_elem_type().get_size();
392  // setEmptyItem reserves a buffer in FlatBuffer instance
393  // that corresponds to varlen array at row_idx row
394  // index. The pointer value to the corresponding buffer is
395  // stored in buf:
396  int8_t* buf = nullptr;
397  FlatBufferManager::Status status{};
398  {
399  auto lock_scope =
400  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
401  : std::unique_lock<std::mutex>(*write_mutex));
402  status = m.setEmptyItemNoValidation(row_idx, vec.size() * array_item_size, &buf);
403  }
404  CHECK_EQ(status, FlatBufferManager::Status::Success);
405  CHECK(buf);
406  // toBuffer initializes varlen array buffer buf using the
407  // result set row with row_idx row index:
408  toBuffer(col_val, type_info, buf);
409  } else {
410  auto lock_scope =
411  (write_mutex == nullptr ? std::unique_lock<std::mutex>()
412  : std::unique_lock<std::mutex>(*write_mutex));
413  m.setNullNoValidation(row_idx);
414  }
415 
416  } else {
417  int8_t* buf = column_buffers_[column_idx];
418  toBuffer(col_val, type_info, buf + type_info.get_size() * row_idx);
419  }
420 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int8_t * > column_buffers_
static bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:184
int64_t toBuffer(const TargetValue &col_val, const SQLTypeInfo &type_info, int8_t *buf)
#define CHECK(condition)
Definition: Logger.h:222
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename DATA_TYPE >
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

A set of write functions to be used to directly write into final column_buffers_. The read_from_function is used to read from the input result set's storage NOTE: currently only used for direct columnarizations

Definition at line 428 of file ColumnarResults.cpp.

References column_buffers_, anonymous_namespace{ColumnarResults.cpp}::fixed_encoding_nullable_val(), and target_types_.

433  {
434  const auto val = static_cast<DATA_TYPE>(fixed_encoding_nullable_val(
435  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
436  target_types_[target_idx]));
437  reinterpret_cast<DATA_TYPE*>(column_buffers_[target_idx])[output_buffer_entry_idx] =
438  val;
439 }
std::vector< int8_t * > column_buffers_
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
const std::vector< SQLTypeInfo > target_types_

+ Here is the call graph for this function:

template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 442 of file ColumnarResults.cpp.

447  {
448  const int32_t ival =
449  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
450  const float fval = *reinterpret_cast<const float*>(may_alias_ptr(&ival));
451  reinterpret_cast<float*>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
452 }
std::vector< int8_t * > column_buffers_
template<>
void ColumnarResults::writeBackCellDirect ( const ResultSet rows,
const size_t  input_buffer_entry_idx,
const size_t  output_buffer_entry_idx,
const size_t  target_idx,
const size_t  slot_idx,
const ReadFunction read_from_function 
)
private

Definition at line 455 of file ColumnarResults.cpp.

461  {
462  const int64_t ival =
463  read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
464  const double dval = *reinterpret_cast<const double*>(may_alias_ptr(&ival));
465  reinterpret_cast<double*>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
466 }
std::vector< int8_t * > column_buffers_

Member Data Documentation

std::vector<int8_t*> ColumnarResults::column_buffers_
protected
bool ColumnarResults::direct_columnar_conversion_
private

Definition at line 204 of file ColumnarResults.h.

Referenced by isDirectColumnarConversionPossible().

size_t ColumnarResults::num_rows_
protected
std::vector<size_t> ColumnarResults::padded_target_sizes_
private

Definition at line 208 of file ColumnarResults.h.

Referenced by ColumnarResults(), copyAllNonLazyColumns(), and mergeResults().

bool ColumnarResults::parallel_conversion_
private

Definition at line 203 of file ColumnarResults.h.

Referenced by isParallelConversion().

const std::vector<SQLTypeInfo> ColumnarResults::target_types_
private
size_t ColumnarResults::thread_idx_
private

Definition at line 206 of file ColumnarResults.h.

Referenced by ColumnarResults().


The documentation for this class was generated from the following files: