OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "QueryEngine/Execute.h"
21 #include "Shared/sqltypes.h"
22 
23 /*
24  The TableFunctionManager implements the following features:
25 
26  - Manage the memory of output column buffers.
27 
28  - Allow table functions to communicate error/exception messages up
29  to the execution context. Table functions can return with a call
30  to `table_function_error` with an error message. This will
31  indicate to the execution context that an error ocurred within the
32  table function, and the error will be propagated as an exception.
33 */
34 
35 // TableFunctionError encapsulates any runtime errors caused by table function execution.
36 class TableFunctionError : public std::runtime_error {
37  public:
38  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
39 };
40 
41 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
42 // functions, i.e. through calling table_function_error()
44  public:
45  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
46 };
47 
48 // Use a set negative value to distinguish from already-existing
49 // negative return values
50 enum TableFunctionErrorCode : int32_t {
51  GenericError = -0x75BCD15,
52 };
53 
54 extern std::mutex TableFunctionManager_singleton_mutex;
55 
56 struct TableFunctionManager {
57  std::unique_ptr<QueryMemoryInitializer> query_buffers;
58 
60  Executor* executor,
61  std::vector<const int8_t*>& col_buf_ptrs,
62  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
63  bool is_singleton)
64  : exe_unit_(exe_unit)
65  , executor_(executor)
66  , col_buf_ptrs_(col_buf_ptrs)
67  , row_set_mem_owner_(row_set_mem_owner)
68  , output_num_rows_(-1)
69  , is_singleton_(is_singleton)
70  , thread_id_(std::this_thread::get_id()) {
71  if (isSingleton()) {
72  set_singleton(this); // start of singleton life
73  }
74  auto num_out_columns = get_ncols();
75  output_col_buf_ptrs.reserve(num_out_columns);
76  output_column_ptrs.reserve(num_out_columns);
77  output_array_values_total_number_.reserve(num_out_columns);
78  for (size_t i = 0; i < num_out_columns; i++) {
79  output_col_buf_ptrs.emplace_back(nullptr);
80  output_column_ptrs.emplace_back(nullptr);
81  output_array_values_total_number_.emplace_back(-1);
82  }
83  }
84 
85  // Return the number of output columns
86  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
87 
88  // Return the number of rows of output columns.
89  size_t get_nrows() const { return output_num_rows_; }
90 
91  void check_thread_id() const {
92  if (std::this_thread::get_id() != thread_id_) {
93  throw std::runtime_error(
94  "TableFunctionManager instance accessed from an alien thread!");
95  }
96  }
97 
98  // Store the pointer to output Column instance
99  void set_output_column(int32_t index, int8_t* ptr) {
100  check_thread_id();
101  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
102  CHECK(ptr);
103  output_column_ptrs[index] = ptr;
104  }
105 
106  // Set the total number of array values in a column of arrays
108  int64_t output_array_values_total_number) {
110  size_t(-1)); // set_output_array_size must be called
111  // before set_output_row_size because
112  // set_output_row_size allocates the output
113  // buffers
114  int32_t num_out_columns = get_ncols();
115  CHECK_LE(0, index);
116  CHECK_LT(index, num_out_columns);
117  output_array_values_total_number_[index] = output_array_values_total_number;
118  }
119 
120  void allocate_output_buffers(int64_t output_num_rows) {
121  check_thread_id();
123  size_t(-1)); // re-allocation of output buffers is not supported
124 
125  output_num_rows_ = output_num_rows;
126  auto num_out_columns = get_ncols();
128  output_num_rows, // divide by row multiplier???
130  /*is_table_function=*/true);
131  query_mem_desc.setOutputColumnar(true);
132 
133  for (size_t i = 0; i < num_out_columns; i++) {
134  // All outputs have padded width set to logical column width
135  auto ti = exe_unit_.target_exprs[i]->get_type_info();
136  if (ti.is_array()) {
138  -1); // set_output_array_values_total_number(i, ...) is not called
139  /*
140  Here we compute the byte size of flatbuffer and store it in
141  query memory descriptor's ColSlotContext instance. The
142  flatbuffer memory will be allocated in
143  QueryMemoryInitializer constructor and the memory will be
144  initialized below.
145  */
146  const int64_t flatbuffer_size = getVarlenArrayBufferSize(
148  query_mem_desc.addColSlotInfoFlatBuffer(
149  flatbuffer_size); // used by QueryMemoryInitializer
150  } else {
151  const size_t col_width = ti.get_size();
152  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
153  }
154  }
155 
156  // The members layout of Column must match with Column defined in
157  // heavydbTypes.h
158  struct Column {
159  int8_t* ptr;
160  int64_t size;
161  // just for debugging:
162  std::string toString() const {
163  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
164  }
165  };
166  // We do not init output buffers for CPU currently, so CPU
167  // table functions are expected to handle their own initialization
168  query_buffers = std::make_unique<QueryMemoryInitializer>(
169  exe_unit_,
171  /*device_id=*/0,
173  (output_num_rows_ == 0 ? 1 : output_num_rows_),
174  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
175  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
177  nullptr,
178  executor_);
179  if (output_num_rows_ != 0) {
180  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
181  CHECK(group_by_buffers_ptr);
182  auto output_buffers_ptr = reinterpret_cast<int8_t*>(group_by_buffers_ptr[0]);
183  for (size_t i = 0; i < num_out_columns; i++) {
184  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
185  CHECK(col);
186  // set the members of output Column instances:
187  output_col_buf_ptrs[i] = reinterpret_cast<int64_t*>(output_buffers_ptr);
188  col->ptr = output_buffers_ptr;
189  col->size = output_num_rows_;
190 
191  auto ti = exe_unit_.target_exprs[i]->get_type_info();
192  if (ti.is_array()) {
193  FlatBufferManager m{output_buffers_ptr};
195  m, output_num_rows_, output_array_values_total_number_[i], ti);
196  output_buffers_ptr = align_to_int64(output_buffers_ptr + m.flatbufferSize());
197  } else {
198  const size_t col_width = ti.get_size();
199  output_buffers_ptr =
200  align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
201  }
202  }
203  }
204  }
205 
206  const char* get_error_message() const {
207  check_thread_id();
208  return error_message_.c_str();
209  }
210 
211  void set_error_message(const char* msg) {
212  check_thread_id();
213  error_message_ = std::string(msg);
214  }
215 
216  void set_metadata(const char* key,
217  const uint8_t* raw_bytes,
218  const size_t num_bytes,
219  const TableFunctionMetadataType value_type) const {
221  row_set_mem_owner_->setTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
222  }
223 
224  void get_metadata(const char* key,
225  const uint8_t*& raw_bytes,
226  size_t& num_bytes,
227  TableFunctionMetadataType& value_type) const {
229  row_set_mem_owner_->getTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
230  }
231 
232  inline int32_t getNewDictId() {
233  const auto proxy =
234  executor_->getStringDictionaryProxy(TRANSIENT_DICT_ID, row_set_mem_owner_, true);
235  return proxy->getDictId();
236  }
237 
238  inline std::string getString(int32_t dict_id, int32_t string_id) {
239  const auto proxy =
240  executor_->getStringDictionaryProxy(dict_id, row_set_mem_owner_, true);
241  return proxy->getString(string_id);
242  }
243 
244  inline const char* getCString(int32_t dict_id, int32_t string_id) {
245  const auto proxy =
246  executor_->getStringDictionaryProxy(dict_id, row_set_mem_owner_, true);
247  auto [c_str, len] = proxy->getStringBytes(string_id);
248  return c_str;
249  }
250 
251  inline const int32_t getOrAddTransient(int32_t dict_id, const std::string& str) {
252  const auto proxy =
253  executor_->getStringDictionaryProxy(dict_id, row_set_mem_owner_, true);
254  return proxy->getOrAddTransient(str);
255  }
256 
257  // Methods for managing singleton instance of TableFunctionManager:
258 
259  bool isSingleton() const { return is_singleton_; }
260 
262  if (isSingleton()) {
263  set_singleton(nullptr); // end of singleton life
264  }
265  }
266 
268  static TableFunctionManager* instance = nullptr;
269  return instance;
270  }
271 
272  private:
275 
276  static void set_singleton(TableFunctionManager* instance) {
277  auto& instance_ = get_singleton();
278  // ensure being singleton and lock/unlock
279  if (instance) {
280  instance->lock();
281  CHECK(instance_ == nullptr);
282  } else {
283  CHECK(instance_ != nullptr);
284  instance_->unlock();
285  }
286  instance_ = instance;
287  }
288 
290  Executor* executor_;
291  // Pointers to the buffers of input Columns
292  std::vector<const int8_t*>& col_buf_ptrs_;
293  //
294  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
295  // Pointers to the buffers of output Columns
296  std::vector<int64_t*> output_col_buf_ptrs;
297  // Number of rows of output Columns
299  // Total number of array values in the output columns of arrays
300  std::vector<int64_t> output_array_values_total_number_;
301  // Pointers to output Column instances
302  std::vector<int8_t*> output_column_ptrs;
303  // If TableFunctionManager is global
305  // Store thread id for sanity check
306  std::thread::id thread_id_;
307  // Error message
308  std::string error_message_;
309 };
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:297
std::vector< const int8_t * > & col_buf_ptrs_
void set_output_array_values_total_number(int32_t index, int64_t output_array_values_total_number)
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void get_metadata(const char *key, const uint8_t *&raw_bytes, size_t &num_bytes, TableFunctionMetadataType &value_type) const
void set_error_message(const char *msg)
DEVICE int64_t size() const
Definition: heavydbTypes.h:726
std::string getString(int32_t dict_id, int32_t string_id)
void setOutputColumnar(const bool val)
std::vector< int8_t * > output_column_ptrs
Constants for Builtin SQL Types supported by HEAVY.AI.
const char * getCString(int32_t dict_id, int32_t string_id)
std::vector< int64_t * > output_col_buf_ptrs
void allocate_output_buffers(int64_t output_num_rows)
#define CHECK_NE(x, y)
Definition: Logger.h:298
const int32_t getOrAddTransient(int32_t dict_id, const std::string &str)
std::mutex TableFunctionManager_singleton_mutex
TableFunctionMetadataType
#define CHECK_LT(x, y)
Definition: Logger.h:299
void initializeVarlenArray(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1445
#define TRANSIENT_DICT_ID
Definition: sqltypes.h:310
#define CHECK_LE(x, y)
Definition: Logger.h:300
UserTableFunctionError(const std::string &message)
void set_metadata(const char *key, const uint8_t *raw_bytes, const size_t num_bytes, const TableFunctionMetadataType value_type) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::vector< int64_t > output_array_values_total_number_
int64_t getVarlenArrayBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1425
#define CHECK(condition)
Definition: Logger.h:289
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
static TableFunctionManager * get_singleton()
const char * get_error_message() const
std::string toString() const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
TableFunctionErrorCode
static TableFunctionManager *& get_singleton()
static void set_singleton(TableFunctionManager *instance)