OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
21 /*
22  The TableFunctionManager implements the following features:
23 
24  - Manage the memory of output column buffers.
25 
26  - Allow table functions to communicate error/exception messages up
27  to the execution context. Table functions can return with a call
28  to `table_function_error` with an error message. This will
29  indicate to the execution context that an error ocurred within the
30  table function, and the error will be propagated as an exception.
31 */
32 
33 // TableFunctionError encapsulates any runtime errors caused by table function execution.
34 class TableFunctionError : public std::runtime_error {
35  public:
36  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
37 };
38 
39 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
40 // functions, i.e. through calling table_function_error()
42  public:
43  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
44 };
45 
46 // Use a set negative value to distinguish from already-existing
47 // negative return values
48 enum TableFunctionErrorCode : int32_t {
49  GenericError = -0x75BCD15,
50 };
51 
52 extern std::mutex TableFunctionManager_singleton_mutex;
53 
54 struct TableFunctionManager {
55  std::unique_ptr<QueryMemoryInitializer> query_buffers;
56 
58  Executor* executor,
59  std::vector<const int8_t*>& col_buf_ptrs,
60  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
61  bool is_singleton)
62  : exe_unit_(exe_unit)
63  , executor_(executor)
64  , col_buf_ptrs_(col_buf_ptrs)
65  , row_set_mem_owner_(row_set_mem_owner)
66  , output_num_rows_(-1)
67  , is_singleton_(is_singleton)
68  , thread_id_(std::this_thread::get_id()) {
69  if (isSingleton()) {
70  set_singleton(this); // start of singleton life
71  }
72  auto num_out_columns = get_ncols();
73  output_col_buf_ptrs.reserve(num_out_columns);
74  output_column_ptrs.reserve(num_out_columns);
75  for (size_t i = 0; i < num_out_columns; i++) {
76  output_col_buf_ptrs.emplace_back(nullptr);
77  output_column_ptrs.emplace_back(nullptr);
78  }
79  }
80 
81  // Return the number of output columns
82  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
83 
84  // Return the number of rows of output columns.
85  size_t get_nrows() const { return output_num_rows_; }
86 
87  void check_thread_id() const {
88  if (std::this_thread::get_id() != thread_id_) {
89  throw std::runtime_error(
90  "TableFunctionManager instance accessed from an alien thread!");
91  }
92  }
93 
94  // Store the pointer to output Column instance
95  void set_output_column(int32_t index, int8_t* ptr) {
97  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
98  CHECK(ptr);
99  output_column_ptrs[index] = ptr;
100  }
101 
102  void allocate_output_buffers(int64_t output_num_rows) {
103  check_thread_id();
105  size_t(-1)); // re-allocation of output buffers is not supported
106  output_num_rows_ = output_num_rows;
107  auto num_out_columns = get_ncols();
109  output_num_rows, // divide by row multiplier???
111  /*is_table_function=*/true);
112  query_mem_desc.setOutputColumnar(true);
113 
114  for (size_t i = 0; i < num_out_columns; i++) {
115  // All outputs have padded width set to logical column width
116  const size_t col_width = exe_unit_.target_exprs[i]->get_type_info().get_size();
117  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
118  }
119 
120  // The members layout of Column must match with Column defined in
121  // heavydbTypes.h
122  struct Column {
123  int8_t* ptr;
124  int64_t size;
125  // just for debugging:
126  std::string toString() const {
127  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
128  }
129  };
130  // We do not init output buffers for CPU currently, so CPU
131  // table functions are expected to handle their own initialization
132  query_buffers = std::make_unique<QueryMemoryInitializer>(
133  exe_unit_,
135  /*device_id=*/0,
137  (output_num_rows_ == 0 ? 1 : output_num_rows_),
138  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
139  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
141  nullptr,
142  executor_);
143  if (output_num_rows_ != 0) {
144  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
145  CHECK(group_by_buffers_ptr);
146  auto output_buffers_ptr = reinterpret_cast<int8_t*>(group_by_buffers_ptr[0]);
147  for (size_t i = 0; i < num_out_columns; i++) {
148  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
149  CHECK(col);
150  // set the members of output Column instances:
151  output_col_buf_ptrs[i] = reinterpret_cast<int64_t*>(output_buffers_ptr);
152  col->ptr = output_buffers_ptr;
153  col->size = output_num_rows_;
154 
155  const size_t col_width = exe_unit_.target_exprs[i]->get_type_info().get_size();
156  output_buffers_ptr =
157  align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
158  }
159  }
160  }
161 
162  const char* get_error_message() const {
163  check_thread_id();
164  return error_message_.c_str();
165  }
166 
167  void set_error_message(const char* msg) {
168  check_thread_id();
169  error_message_ = std::string(msg);
170  }
171 
172  // Methods for managing singleton instance of TableFunctionManager:
173 
174  bool isSingleton() const { return is_singleton_; }
175 
177  if (isSingleton()) {
178  set_singleton(nullptr); // end of singleton life
179  }
180  }
181 
183  static TableFunctionManager* instance = nullptr;
184  return instance;
185  }
186 
187  private:
190 
191  static void set_singleton(TableFunctionManager* instance) {
192  auto& instance_ = get_singleton();
193  // ensure being singleton and lock/unlock
194  if (instance) {
195  instance->lock();
196  CHECK(instance_ == nullptr);
197  } else {
198  CHECK(instance_ != nullptr);
199  instance_->unlock();
200  }
201  instance_ = instance;
202  }
203 
205  Executor* executor_;
206  // Pointers to the buffers of input Columns
207  std::vector<const int8_t*>& col_buf_ptrs_;
208  //
209  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
210  // Pointers to the buffers of output Columns
211  std::vector<int64_t*> output_col_buf_ptrs;
212  // Number of rows of output Columns
214  // Pointers to output Column instances
215  std::vector<int8_t*> output_column_ptrs;
216  // If TableFunctionManager is global
218  // Store thread id for sanity check
219  std::thread::id thread_id_;
220  // Error message
221  std::string error_message_;
222 };
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< const int8_t * > & col_buf_ptrs_
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void set_error_message(const char *msg)
DEVICE int64_t size() const
Definition: heavydbTypes.h:445
void setOutputColumnar(const bool val)
std::vector< int8_t * > output_column_ptrs
std::vector< int64_t * > output_col_buf_ptrs
void allocate_output_buffers(int64_t output_num_rows)
std::mutex TableFunctionManager_singleton_mutex
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1453
UserTableFunctionError(const std::string &message)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:223
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
static TableFunctionManager * get_singleton()
Definition: heavydbTypes.h:651
const char * get_error_message() const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
TableFunctionErrorCode
static TableFunctionManager *& get_singleton()
static void set_singleton(TableFunctionManager *instance)