OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 
21 /*
22  The TableFunctionManager implements the following features:
23 
24  - Manage the memory of output column buffers.
25 
26  - Allow table functions to communicate error/exception messages up
27  to the execution context. Table functions can return with a call
28  to `table_function_error` with an error message. This will
29  indicate to the execution context that an error ocurred within the
30  table function, and the error will be propagated as an exception.
31 */
32 
33 // TableFunctionError encapsulates any runtime errors caused by table function execution.
34 class TableFunctionError : public std::runtime_error {
35  public:
36  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
37 };
38 
39 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
40 // functions, i.e. through calling table_function_error()
42  public:
43  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
44 };
45 
46 // Use a set negative value to distinguish from already-existing
47 // negative return values
48 enum TableFunctionErrorCode : int32_t {
49  GenericError = -0x75BCD15,
50 };
51 
52 extern std::mutex TableFunctionManager_singleton_mutex;
53 
54 struct TableFunctionManager {
55  std::unique_ptr<QueryMemoryInitializer> query_buffers;
56 
58  Executor* executor,
59  std::vector<const int8_t*>& col_buf_ptrs,
60  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
61  bool is_singleton)
62  : exe_unit_(exe_unit)
63  , executor_(executor)
64  , col_buf_ptrs_(col_buf_ptrs)
65  , row_set_mem_owner_(row_set_mem_owner)
66  , output_num_rows_(-1)
67  , is_singleton_(is_singleton)
68  , thread_id_(std::this_thread::get_id()) {
69  if (isSingleton()) {
70  set_singleton(this); // start of singleton life
71  }
72  auto num_out_columns = get_ncols();
73  output_col_buf_ptrs.reserve(num_out_columns);
74  output_column_ptrs.reserve(num_out_columns);
75  for (size_t i = 0; i < num_out_columns; i++) {
76  output_col_buf_ptrs.emplace_back(nullptr);
77  output_column_ptrs.emplace_back(nullptr);
78  }
79  }
80 
81  // Return the number of output columns
82  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
83 
84  // Return the number of rows of output columns.
85  size_t get_nrows() const { return output_num_rows_; }
86 
87  void check_thread_id() const {
88  if (std::this_thread::get_id() != thread_id_) {
89  throw std::runtime_error(
90  "TableFunctionManager instance accessed from an alien thread!");
91  }
92  }
93 
94  // Store the pointer to output Column instance
95  void set_output_column(int32_t index, int8_t* ptr) {
97  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
98  CHECK(ptr);
99  output_column_ptrs[index] = ptr;
100  }
101 
102  void allocate_output_buffers(int64_t output_num_rows) {
103  check_thread_id();
105  size_t(-1)); // re-allocation of output buffers is not supported
106  output_num_rows_ = output_num_rows;
107  auto num_out_columns = get_ncols();
109  output_num_rows, // divide by row multiplier???
111  /*is_table_function=*/true);
112  query_mem_desc.setOutputColumnar(true);
113 
114  for (size_t i = 0; i < num_out_columns; i++) {
115  // All outputs padded to 8 bytes
116  query_mem_desc.addColSlotInfo({std::make_tuple(8, 8)});
117  }
118 
119  // The members layout of Column must match with Column defined in
120  // OmniSciTypes.h
121  struct Column {
122  int8_t* ptr;
123  int64_t size;
124  // just for debugging:
125  std::string toString() const {
126  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
127  }
128  };
129 
130  query_buffers = std::make_unique<QueryMemoryInitializer>(
131  exe_unit_,
133  /*device_id=*/0,
135  (output_num_rows_ == 0 ? 1 : output_num_rows_),
136  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
137  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
139  nullptr,
140  executor_);
141  if (output_num_rows_ != 0) {
142  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
143  CHECK(group_by_buffers_ptr);
144  auto output_buffers_ptr = reinterpret_cast<int64_t*>(group_by_buffers_ptr[0]);
145  for (size_t i = 0; i < num_out_columns; i++) {
146  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
147  CHECK(col);
148  // set the members of output Column instances:
149  output_col_buf_ptrs[i] = output_buffers_ptr + i * output_num_rows_;
150  col->ptr = reinterpret_cast<int8_t*>(output_col_buf_ptrs[i]);
151  col->size = output_num_rows_;
152  }
153  }
154  }
155 
156  const char* get_error_message() const {
157  check_thread_id();
158  return error_message_.c_str();
159  }
160 
161  void set_error_message(const char* msg) {
162  check_thread_id();
163  error_message_ = std::string(msg);
164  }
165 
166  // Methods for managing singleton instance of TableFunctionManager:
167 
168  bool isSingleton() const { return is_singleton_; }
169 
171  if (isSingleton()) {
172  set_singleton(nullptr); // end of singleton life
173  }
174  }
175 
177  static TableFunctionManager* instance = nullptr;
178  return instance;
179  }
180 
181  private:
184 
185  static void set_singleton(TableFunctionManager* instance) {
186  auto& instance_ = get_singleton();
187  // ensure being singleton and lock/unlock
188  if (instance) {
189  instance->lock();
190  CHECK(instance_ == nullptr);
191  } else {
192  CHECK(instance_ != nullptr);
193  instance_->unlock();
194  }
195  instance_ = instance;
196  }
197 
199  Executor* executor_;
200  // Pointers to the buffers of input Columns
201  std::vector<const int8_t*>& col_buf_ptrs_;
202  //
203  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
204  // Pointers to the buffers of output Columns
205  std::vector<int64_t*> output_col_buf_ptrs;
206  // Number of rows of output Columns
208  // Pointers to output Column instances
209  std::vector<int8_t*> output_column_ptrs;
210  // If TableFunctionManager is global
212  // Store thread id for sanity check
213  std::thread::id thread_id_;
214  // Error message
215  std::string error_message_;
216 };
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< const int8_t * > & col_buf_ptrs_
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
std::string toString(const ExtArgumentType &sig_type)
const TableFunctionExecutionUnit & exe_unit_
void set_error_message(const char *msg)
DEVICE int64_t size() const
Definition: OmniSciTypes.h:218
void setOutputColumnar(const bool val)
std::vector< int8_t * > output_column_ptrs
std::vector< int64_t * > output_col_buf_ptrs
void allocate_output_buffers(int64_t output_num_rows)
std::mutex TableFunctionManager_singleton_mutex
UserTableFunctionError(const std::string &message)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
#define CHECK(condition)
Definition: Logger.h:209
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
static TableFunctionManager * get_singleton()
Definition: OmniSciTypes.h:300
const char * get_error_message() const
TableFunctionErrorCode
static TableFunctionManager *& get_singleton()
static void set_singleton(TableFunctionManager *instance)