OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowFunctionContext Class Reference

#include <WindowContext.h>

+ Collaboration diagram for WindowFunctionContext:

Classes

struct  AggregateState
 

Public Types

using Comparator = std::function< bool(const int64_t lhs, const int64_t rhs)>
 

Public Member Functions

 WindowFunctionContext (const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
 WindowFunctionContext (const Analyzer::WindowFunction *window_func, const std::shared_ptr< HashJoin > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
 
 WindowFunctionContext (const WindowFunctionContext &)=delete
 
WindowFunctionContextoperator= (const WindowFunctionContext &)=delete
 
 ~WindowFunctionContext ()
 
void addOrderColumn (const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
 
void compute ()
 
const Analyzer::WindowFunctiongetWindowFunction () const
 
const int8_t * output () const
 
const int64_t * aggregateState () const
 
const int64_t * aggregateStateCount () const
 
int64_t aggregateStatePendingOutputs () const
 
const int8_t * partitionStart () const
 
const int8_t * partitionEnd () const
 
size_t elementCount () const
 

Private Member Functions

void computePartition (const size_t partition_idx, int64_t *output_for_partition_buff)
 
void computePartitionBuffer (int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
 
void fillPartitionStart ()
 
void fillPartitionEnd ()
 
const int32_t * payload () const
 
const int32_t * offsets () const
 
const int32_t * counts () const
 
size_t partitionCount () const
 

Static Private Member Functions

static Comparator makeComparator (const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
 

Private Attributes

const Analyzer::WindowFunctionwindow_func_
 
std::vector< std::vector
< std::shared_ptr
< Chunk_NS::Chunk > > > 
order_columns_owner_
 
std::vector< const int8_t * > order_columns_
 
std::shared_ptr< HashJoinpartitions_
 
size_t elem_count_
 
int8_t * output_
 
int8_t * partition_start_
 
int8_t * partition_end_
 
AggregateState aggregate_state_
 
const ExecutorDeviceType device_type_
 
std::shared_ptr
< RowSetMemoryOwner
row_set_mem_owner_
 
const int32_t dummy_count_
 
const int32_t dummy_offset_
 
int32_t * dummy_payload_
 

Detailed Description

Definition at line 66 of file WindowContext.h.

Member Typedef Documentation

using WindowFunctionContext::Comparator = std::function<bool(const int64_t lhs, const int64_t rhs)>

Definition at line 119 of file WindowContext.h.

Constructor & Destructor Documentation

WindowFunctionContext::WindowFunctionContext ( const Analyzer::WindowFunction window_func,
const size_t  elem_count,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)

Definition at line 46 of file WindowContext.cpp.

References CHECK_LE, checked_malloc(), dummy_payload_, elem_count_, and gpu_enabled::iota().

51  : window_func_(window_func)
52  , partitions_(nullptr)
53  , elem_count_(elem_count)
54  , output_(nullptr)
55  , partition_start_(nullptr)
56  , partition_end_(nullptr)
57  , device_type_(device_type)
58  , row_set_mem_owner_(row_set_mem_owner)
59  , dummy_count_(elem_count)
60  , dummy_offset_(0)
61  , dummy_payload_(nullptr) {
62  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
63  if (elem_count_ > 0) {
65  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
67  }
68 }
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
const int32_t dummy_count_
const int32_t dummy_offset_
const Analyzer::WindowFunction * window_func_
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_LE(x, y)
Definition: Logger.h:222
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the call graph for this function:

WindowFunctionContext::WindowFunctionContext ( const Analyzer::WindowFunction window_func,
const std::shared_ptr< HashJoin > &  partitions,
const size_t  elem_count,
const ExecutorDeviceType  device_type,
std::shared_ptr< RowSetMemoryOwner row_set_mem_owner 
)

Definition at line 71 of file WindowContext.cpp.

References CHECK, and partitions_.

77  : window_func_(window_func)
78  , partitions_(partitions)
79  , elem_count_(elem_count)
80  , output_(nullptr)
81  , partition_start_(nullptr)
82  , partition_end_(nullptr)
83  , device_type_(device_type)
84  , row_set_mem_owner_(row_set_mem_owner)
85  , dummy_count_(elem_count)
86  , dummy_offset_(0)
87  , dummy_payload_(nullptr) {
88  CHECK(partitions_); // This version should have hash table
89 }
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
const int32_t dummy_count_
const int32_t dummy_offset_
const Analyzer::WindowFunction * window_func_
std::shared_ptr< HashJoin > partitions_
#define CHECK(condition)
Definition: Logger.h:211
const ExecutorDeviceType device_type_
WindowFunctionContext::WindowFunctionContext ( const WindowFunctionContext )
delete
WindowFunctionContext::~WindowFunctionContext ( )

Definition at line 91 of file WindowContext.cpp.

References dummy_payload_, partition_end_, and partition_start_.

91  {
92  free(partition_start_);
93  free(partition_end_);
94  if (dummy_payload_) {
95  free(dummy_payload_);
96  }
97 }

Member Function Documentation

void WindowFunctionContext::addOrderColumn ( const int8_t *  column,
const Analyzer::ColumnVar col_var,
const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &  chunks_owner 
)

Definition at line 99 of file WindowContext.cpp.

References order_columns_, and order_columns_owner_.

102  {
103  order_columns_owner_.push_back(chunks_owner);
104  order_columns_.push_back(column);
105 }
std::vector< const int8_t * > order_columns_
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
const int64_t * WindowFunctionContext::aggregateState ( ) const

Definition at line 604 of file WindowContext.cpp.

References aggregate_state_, CHECK, Analyzer::WindowFunction::getKind(), WindowFunctionContext::AggregateState::val, window_func_, and window_function_is_aggregate().

604  {
606  return &aggregate_state_.val;
607 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the call graph for this function:

const int64_t * WindowFunctionContext::aggregateStateCount ( ) const

Definition at line 609 of file WindowContext.cpp.

References aggregate_state_, CHECK, WindowFunctionContext::AggregateState::count, Analyzer::WindowFunction::getKind(), window_func_, and window_function_is_aggregate().

609  {
611  return &aggregate_state_.count;
612 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the call graph for this function:

int64_t WindowFunctionContext::aggregateStatePendingOutputs ( ) const

Definition at line 614 of file WindowContext.cpp.

References aggregate_state_, CHECK, Analyzer::WindowFunction::getKind(), WindowFunctionContext::AggregateState::outputs, window_func_, and window_function_is_aggregate().

614  {
616  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
617 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
const Analyzer::WindowFunction * window_func_
AggregateState aggregate_state_
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the call graph for this function:

void WindowFunctionContext::compute ( )

Definition at line 520 of file WindowContext.cpp.

References CHECK, computePartition(), cpu_threads(), DEBUG_TIMER, elem_count_, fillPartitionEnd(), fillPartitionStart(), g_enable_parallel_window_partition_compute, g_parallel_window_partition_compute_threshold, Analyzer::WindowFunction::getKind(), i, offsets(), output_, partitionCount(), payload(), row_set_mem_owner_, threading_std::task_group::run(), threading_std::task_group::wait(), window_func_, anonymous_namespace{WindowContext.cpp}::window_function_buffer_element_size(), window_function_is_aggregate(), and window_function_requires_peer_handling().

520  {
521  auto timer = DEBUG_TIMER(__func__);
522  CHECK(!output_);
523  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(
525  /*thread_idx=*/0));
526  const bool is_window_function_aggregate =
528  if (is_window_function_aggregate) {
532  }
533  }
534 
535  std::unique_ptr<int64_t[]> scratchpad;
536  int64_t* intermediate_output_buffer;
537  if (is_window_function_aggregate) {
538  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
539  } else {
540  scratchpad.reset(new int64_t[elem_count_]);
541  intermediate_output_buffer = scratchpad.get();
542  }
543 
544  const size_t partition_count{partitionCount()};
545 
546  const auto compute_partitions = [&](const size_t start, const size_t end) {
547  for (size_t partition_idx = start; partition_idx != end; ++partition_idx) {
548  computePartition(partition_idx,
549  intermediate_output_buffer + offsets()[partition_idx]);
550  }
551  };
552 
553  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
554  elem_count_ >=
556  if (should_parallelize) {
557  auto timer = DEBUG_TIMER("Window Function Partition Compute");
558  threading::task_group thread_pool;
559  for (auto interval : makeIntervals<size_t>(0, partition_count, cpu_threads())) {
560  thread_pool.run([=] { compute_partitions(interval.begin, interval.end); });
561  }
562  thread_pool.wait();
563  } else {
564  auto timer = DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
565  compute_partitions(0, partition_count);
566  }
567 
568  if (is_window_function_aggregate) {
569  // If window function is aggregate we were able to write to the final output buffer
570  // directly in computePartition and we are done.
571  return;
572  }
573 
574  auto output_i64 = reinterpret_cast<int64_t*>(output_);
575 
576  const auto payload_copy = [&](const size_t start, const size_t end) {
577  for (size_t i = start; i < end; ++i) {
578  output_i64[payload()[i]] = intermediate_output_buffer[i];
579  }
580  };
581 
582  if (should_parallelize) {
583  auto timer = DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
584  threading::task_group thread_pool;
585  for (auto interval : makeIntervals<size_t>(0, elem_count_, cpu_threads())) {
586  thread_pool.run([=] { payload_copy(interval.begin, interval.end); });
587  }
588  thread_pool.wait();
589  } else {
590  auto timer =
591  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
592  payload_copy(0, elem_count_);
593  }
594 }
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void computePartition(const size_t partition_idx, int64_t *output_for_partition_buff)
const Analyzer::WindowFunction * window_func_
const int32_t * offsets() const
size_t g_parallel_window_partition_compute_threshold
size_t partitionCount() const
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
const int32_t * payload() const
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

void WindowFunctionContext::computePartition ( const size_t  partition_idx,
int64_t *  output_for_partition_buff 
)
private

Definition at line 456 of file WindowContext.cpp.

References CHECK, CHECK_EQ, computePartitionBuffer(), counts(), g_enable_parallel_window_partition_sort, g_parallel_window_partition_sort_threshold, Analyzer::WindowFunction::getCollation(), Analyzer::WindowFunction::getOrderKeys(), gpu_enabled::iota(), makeComparator(), offsets(), order_columns_, payload(), gpu_enabled::sort(), and window_func_.

Referenced by compute().

457  {
458  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
459  if (partition_size == 0) {
460  return;
461  }
462  const auto offset = offsets()[partition_idx];
463  std::iota(
464  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
465  std::vector<Comparator> comparators;
466  const auto& order_keys = window_func_->getOrderKeys();
467  const auto& collation = window_func_->getCollation();
468  CHECK_EQ(order_keys.size(), collation.size());
469  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
470  ++order_column_idx) {
471  auto order_column_buffer = order_columns_[order_column_idx];
472  const auto order_col =
473  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
474  CHECK(order_col);
475  const auto& order_col_collation = collation[order_column_idx];
476  const auto asc_comparator = makeComparator(order_col,
477  order_column_buffer,
478  payload() + offset,
479  order_col_collation.nulls_first);
480  auto comparator = asc_comparator;
481  if (order_col_collation.is_desc) {
482  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
483  return asc_comparator(rhs, lhs);
484  };
485  }
486  comparators.push_back(comparator);
487  }
488  const auto col_tuple_comparator = [&comparators](const int64_t lhs, const int64_t rhs) {
489  for (const auto& comparator : comparators) {
490  if (comparator(lhs, rhs)) {
491  return true;
492  }
493  }
494  return false;
495  };
496 
498  partition_size >= g_parallel_window_partition_sort_threshold) {
499 #ifdef HAVE_TBB
500  tbb::parallel_sort(output_for_partition_buff,
501  output_for_partition_buff + partition_size,
502  col_tuple_comparator);
503 #else
504  thrust::sort(output_for_partition_buff,
505  output_for_partition_buff + partition_size,
506  col_tuple_comparator);
507 #endif
508  } else {
509  std::sort(output_for_partition_buff,
510  output_for_partition_buff + partition_size,
511  col_tuple_comparator);
512  }
513  computePartitionBuffer(output_for_partition_buff,
514  partition_size,
515  offset,
516  window_func_,
517  col_tuple_comparator);
518 }
bool g_enable_parallel_window_partition_sort
#define CHECK_EQ(x, y)
Definition: Logger.h:219
void computePartitionBuffer(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
const int32_t * offsets() const
size_t g_parallel_window_partition_sort_threshold
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1615
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1619
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
#define CHECK(condition)
Definition: Logger.h:211
std::vector< const int8_t * > order_columns_
const int32_t * payload() const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::computePartitionBuffer ( int64_t *  output_for_partition_buff,
const size_t  partition_size,
const size_t  off,
const Analyzer::WindowFunction window_func,
const std::function< bool(const int64_t lhs, const int64_t rhs)> &  comparator 
)
private

Definition at line 750 of file WindowContext.cpp.

References anonymous_namespace{WindowContext.cpp}::apply_first_value_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_lag_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_last_value_to_partition(), anonymous_namespace{WindowContext.cpp}::apply_permutation_to_partition(), run_benchmark_import::args, AVG, CHECK_EQ, gpu_enabled::copy(), COUNT, CUME_DIST, DENSE_RANK, FIRST_VALUE, anonymous_namespace{WindowContext.cpp}::get_int_constant_from_expr(), anonymous_namespace{WindowContext.cpp}::get_lag_or_lead_argument(), Analyzer::WindowFunction::getArgs(), Analyzer::WindowFunction::getKind(), anonymous_namespace{WindowContext.cpp}::index_to_cume_dist(), anonymous_namespace{WindowContext.cpp}::index_to_dense_rank(), anonymous_namespace{WindowContext.cpp}::index_to_ntile(), anonymous_namespace{WindowContext.cpp}::index_to_partition_end(), anonymous_namespace{WindowContext.cpp}::index_to_percent_rank(), anonymous_namespace{WindowContext.cpp}::index_to_rank(), anonymous_namespace{WindowContext.cpp}::index_to_row_number(), LAG, LAST_VALUE, LEAD, MAX, MIN, anonymous_namespace{Utm.h}::n, NTILE, partitionEnd(), payload(), PERCENT_RANK, RANK, ROW_NUMBER, SUM, toString(), and window_function_requires_peer_handling().

Referenced by computePartition().

755  {
756  switch (window_func->getKind()) {
758  const auto row_numbers =
759  index_to_row_number(output_for_partition_buff, partition_size);
760  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
761  break;
762  }
764  const auto rank =
765  index_to_rank(output_for_partition_buff, partition_size, comparator);
766  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
767  break;
768  }
770  const auto dense_rank =
771  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
772  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
773  break;
774  }
776  const auto percent_rank =
777  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
778  std::copy(percent_rank.begin(),
779  percent_rank.end(),
780  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
781  break;
782  }
784  const auto cume_dist =
785  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
786  std::copy(cume_dist.begin(),
787  cume_dist.end(),
788  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
789  break;
790  }
792  const auto& args = window_func->getArgs();
793  CHECK_EQ(args.size(), size_t(1));
794  const auto n = get_int_constant_from_expr(args.front().get());
795  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
796  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
797  break;
798  }
801  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
802  const auto partition_row_offsets = payload() + off;
804  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
805  break;
806  }
808  const auto partition_row_offsets = payload() + off;
810  partition_row_offsets, output_for_partition_buff, partition_size);
811  break;
812  }
814  const auto partition_row_offsets = payload() + off;
816  partition_row_offsets, output_for_partition_buff, partition_size);
817  break;
818  }
824  const auto partition_row_offsets = payload() + off;
825  if (window_function_requires_peer_handling(window_func)) {
827  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
828  }
830  output_for_partition_buff, partition_row_offsets, partition_size);
831  break;
832  }
833  default: {
834  throw std::runtime_error("Window function not supported yet: " +
835  ::toString(window_func->getKind()));
836  }
837  }
838 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1609
const int8_t * partitionEnd() const
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1379
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const int32_t * payload() const
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int32_t * WindowFunctionContext::counts ( ) const
private

Definition at line 906 of file WindowContext.cpp.

References device_type_, dummy_count_, and partitions_.

Referenced by computePartition(), fillPartitionEnd(), fillPartitionStart(), and partitionCount().

906  {
907  if (partitions_) {
908  return reinterpret_cast<const int32_t*>(
909  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
910  }
911  return &dummy_count_;
912 }
const int32_t dummy_count_
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

size_t WindowFunctionContext::elementCount ( ) const

Definition at line 627 of file WindowContext.cpp.

References elem_count_.

627  {
628  return elem_count_;
629 }
void WindowFunctionContext::fillPartitionEnd ( )
private

Definition at line 862 of file WindowContext.cpp.

References agg_count_distinct_bitmap(), Bitmap, checked_calloc(), counts(), CPU, elem_count_, i, gpu_enabled::partial_sum(), partition_end_, partitionCount(), and partitions_.

Referenced by compute().

862  {
864  0,
865  static_cast<int64_t>(elem_count_),
866  false,
868  1};
869  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
870  if (partitions_) {
871  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
872  }
873  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
874  int64_t partition_count = partitionCount();
875  std::vector<size_t> partition_offsets(partition_count);
876  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
877  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
878  for (int64_t i = 0; i < partition_count - 1; ++i) {
879  if (partition_offsets[i] == 0) {
880  continue;
881  }
882  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
883  }
884  if (elem_count_) {
885  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
886  }
887 }
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const int32_t * counts() const
size_t partitionCount() const
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
std::shared_ptr< HashJoin > partitions_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void WindowFunctionContext::fillPartitionStart ( )
private

Definition at line 840 of file WindowContext.cpp.

References agg_count_distinct_bitmap(), Bitmap, checked_calloc(), counts(), CPU, elem_count_, i, gpu_enabled::partial_sum(), partition_start_, partitionCount(), and partitions_.

Referenced by compute().

840  {
842  0,
843  static_cast<int64_t>(elem_count_),
844  false,
846  1};
847  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
848  if (partitions_) {
849  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
850  }
851  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
852  int64_t partition_count = partitionCount();
853  std::vector<size_t> partition_offsets(partition_count);
854  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
855  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
856  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
857  for (int64_t i = 0; i < partition_count - 1; ++i) {
858  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
859  }
860 }
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const int32_t * counts() const
size_t partitionCount() const
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
std::shared_ptr< HashJoin > partitions_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const Analyzer::WindowFunction * WindowFunctionContext::getWindowFunction ( ) const

Definition at line 596 of file WindowContext.cpp.

References window_func_.

Referenced by Executor::codegenWindowFunction().

596  {
597  return window_func_;
598 }
const Analyzer::WindowFunction * window_func_

+ Here is the caller graph for this function:

std::function< bool(const int64_t lhs, const int64_t rhs)> WindowFunctionContext::makeComparator ( const Analyzer::ColumnVar col_var,
const int8_t *  partition_values,
const int32_t *  partition_indices,
const bool  nulls_first 
)
staticprivate

Definition at line 686 of file WindowContext.cpp.

References logger::FATAL, Analyzer::Expr::get_type_info(), kDOUBLE, kFLOAT, and LOG.

Referenced by computePartition().

689  {
690  const auto& ti = col_var->get_type_info();
691  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
692  switch (ti.get_size()) {
693  case 8: {
694  return [order_column_buffer, nulls_first, partition_indices, &ti](
695  const int64_t lhs, const int64_t rhs) {
696  return integer_comparator<int64_t>(
697  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
698  };
699  }
700  case 4: {
701  return [order_column_buffer, nulls_first, partition_indices, &ti](
702  const int64_t lhs, const int64_t rhs) {
703  return integer_comparator<int32_t>(
704  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
705  };
706  }
707  case 2: {
708  return [order_column_buffer, nulls_first, partition_indices, &ti](
709  const int64_t lhs, const int64_t rhs) {
710  return integer_comparator<int16_t>(
711  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
712  };
713  }
714  case 1: {
715  return [order_column_buffer, nulls_first, partition_indices, &ti](
716  const int64_t lhs, const int64_t rhs) {
717  return integer_comparator<int8_t>(
718  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
719  };
720  }
721  default: {
722  LOG(FATAL) << "Invalid type size: " << ti.get_size();
723  }
724  }
725  }
726  if (ti.is_fp()) {
727  switch (ti.get_type()) {
728  case kFLOAT: {
729  return [order_column_buffer, nulls_first, partition_indices, &ti](
730  const int64_t lhs, const int64_t rhs) {
731  return fp_comparator<float, int32_t>(
732  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
733  };
734  }
735  case kDOUBLE: {
736  return [order_column_buffer, nulls_first, partition_indices, &ti](
737  const int64_t lhs, const int64_t rhs) {
738  return fp_comparator<double, int64_t>(
739  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
740  };
741  }
742  default: {
743  LOG(FATAL) << "Invalid float type";
744  }
745  }
746  }
747  throw std::runtime_error("Type not supported yet");
748 }
#define LOG(tag)
Definition: Logger.h:205
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int32_t * WindowFunctionContext::offsets ( ) const
private

Definition at line 898 of file WindowContext.cpp.

References device_type_, dummy_offset_, and partitions_.

Referenced by compute(), computePartition(), and partitionCount().

898  {
899  if (partitions_) {
900  return reinterpret_cast<const int32_t*>(
901  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
902  }
903  return &dummy_offset_;
904 }
const int32_t dummy_offset_
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

WindowFunctionContext& WindowFunctionContext::operator= ( const WindowFunctionContext )
delete
const int8_t * WindowFunctionContext::output ( ) const

Definition at line 600 of file WindowContext.cpp.

References output_.

Referenced by CodeGenerator::codegenWindowPosition().

600  {
601  return output_;
602 }

+ Here is the caller graph for this function:

size_t WindowFunctionContext::partitionCount ( ) const
private

Definition at line 914 of file WindowContext.cpp.

References CHECK_GE, counts(), offsets(), and partitions_.

Referenced by compute(), fillPartitionEnd(), and fillPartitionStart().

914  {
915  if (partitions_) {
916  const auto partition_count = counts() - offsets();
917  CHECK_GE(partition_count, 0);
918  return partition_count;
919  }
920  return 1; // non-partitioned window function
921 }
#define CHECK_GE(x, y)
Definition: Logger.h:224
const int32_t * counts() const
const int32_t * offsets() const
std::shared_ptr< HashJoin > partitions_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const int8_t * WindowFunctionContext::partitionEnd ( ) const

Definition at line 623 of file WindowContext.cpp.

References partition_end_.

Referenced by computePartitionBuffer().

623  {
624  return partition_end_;
625 }

+ Here is the caller graph for this function:

const int8_t * WindowFunctionContext::partitionStart ( ) const

Definition at line 619 of file WindowContext.cpp.

References partition_start_.

619  {
620  return partition_start_;
621 }
const int32_t * WindowFunctionContext::payload ( ) const
private

Definition at line 889 of file WindowContext.cpp.

References device_type_, dummy_payload_, and partitions_.

Referenced by compute(), computePartition(), and computePartitionBuffer().

889  {
890  if (partitions_) {
891  return reinterpret_cast<const int32_t*>(
892  partitions_->getJoinHashBuffer(device_type_, 0) +
893  partitions_->payloadBufferOff());
894  }
895  return dummy_payload_; // non-partitioned window function
896 }
std::shared_ptr< HashJoin > partitions_
const ExecutorDeviceType device_type_

+ Here is the caller graph for this function:

Member Data Documentation

AggregateState WindowFunctionContext::aggregate_state_
private
const ExecutorDeviceType WindowFunctionContext::device_type_
private

Definition at line 175 of file WindowContext.h.

Referenced by counts(), offsets(), and payload().

const int32_t WindowFunctionContext::dummy_count_
private

Definition at line 179 of file WindowContext.h.

Referenced by counts().

const int32_t WindowFunctionContext::dummy_offset_
private

Definition at line 180 of file WindowContext.h.

Referenced by offsets().

int32_t* WindowFunctionContext::dummy_payload_
private

Definition at line 185 of file WindowContext.h.

Referenced by payload(), WindowFunctionContext(), and ~WindowFunctionContext().

size_t WindowFunctionContext::elem_count_
private
std::vector<const int8_t*> WindowFunctionContext::order_columns_
private

Definition at line 160 of file WindowContext.h.

Referenced by addOrderColumn(), and computePartition().

std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk> > > WindowFunctionContext::order_columns_owner_
private

Definition at line 158 of file WindowContext.h.

Referenced by addOrderColumn().

int8_t* WindowFunctionContext::output_
private

Definition at line 166 of file WindowContext.h.

Referenced by compute(), and output().

int8_t* WindowFunctionContext::partition_end_
private

Definition at line 172 of file WindowContext.h.

Referenced by fillPartitionEnd(), partitionEnd(), and ~WindowFunctionContext().

int8_t* WindowFunctionContext::partition_start_
private

Definition at line 169 of file WindowContext.h.

Referenced by fillPartitionStart(), partitionStart(), and ~WindowFunctionContext().

std::shared_ptr<HashJoin> WindowFunctionContext::partitions_
private
std::shared_ptr<RowSetMemoryOwner> WindowFunctionContext::row_set_mem_owner_
private

Definition at line 176 of file WindowContext.h.

Referenced by compute().

const Analyzer::WindowFunction* WindowFunctionContext::window_func_
private

The documentation for this class was generated from the following files: