OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "QueryEngine/Execute.h"
21 #include "Shared/sqltypes.h"
22 
23 /*
24  The TableFunctionManager implements the following features:
25 
26  - Manage the memory of output column buffers.
27 
28  - Allow table functions to communicate error/exception messages up
29  to the execution context. Table functions can return with a call
30  to `table_function_error` with an error message. This will
31  indicate to the execution context that an error ocurred within the
32  table function, and the error will be propagated as an exception.
33 */
34 
35 // TableFunctionError encapsulates any runtime errors caused by table function execution.
36 class TableFunctionError : public std::runtime_error {
37  public:
38  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
39 };
40 
41 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
42 // functions, i.e. through calling table_function_error()
44  public:
45  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
46 };
47 
48 // Use a set negative value to distinguish from already-existing
49 // negative return values
50 enum TableFunctionErrorCode : int32_t {
51  GenericError = -0x75BCD15,
52  NotAnError = -0x4F4B, // used to indicate a succesful UDTF
53  // execution without specifying the row
54  // size for output columns
55 };
56 
57 extern std::mutex TableFunctionManager_singleton_mutex;
58 
59 struct TableFunctionManager {
60  std::unique_ptr<QueryMemoryInitializer> query_buffers;
61 
63  Executor* executor,
64  std::vector<const int8_t*>& col_buf_ptrs,
65  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
66  bool is_singleton)
67  : exe_unit_(exe_unit)
68  , executor_(executor)
69  , col_buf_ptrs_(col_buf_ptrs)
70  , row_set_mem_owner_(row_set_mem_owner)
71  , output_num_rows_(-1)
72  , is_singleton_(is_singleton)
73  , thread_id_(std::this_thread::get_id()) {
74  if (isSingleton()) {
75  set_singleton(this); // start of singleton life
76  }
77  auto num_out_columns = get_ncols();
78  output_col_buf_ptrs.reserve(num_out_columns);
79  output_column_ptrs.reserve(num_out_columns);
80  output_item_values_total_number_.reserve(num_out_columns);
81  for (size_t i = 0; i < num_out_columns; i++) {
82  output_col_buf_ptrs.emplace_back(nullptr);
83  output_column_ptrs.emplace_back(nullptr);
84  output_item_values_total_number_.emplace_back(-1);
85  }
86  }
87 
88  // Return the number of output columns
89  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
90 
91  // Return the number of rows of output columns.
92  size_t get_nrows() const { return output_num_rows_; }
93 
94  void check_thread_id() const {
95  if (std::this_thread::get_id() != thread_id_) {
96  throw std::runtime_error(
97  "TableFunctionManager instance accessed from an alien thread!");
98  }
99  }
100 
101  // Store the pointer to output Column instance
102  void set_output_column(int32_t index, int8_t* ptr) {
103  check_thread_id();
104  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
105  CHECK(ptr);
106  output_column_ptrs[index] = ptr;
107  }
108 
109  // Set the total number of item values in a column of
110  // non-scalars. For example, the total number of array values in a
111  // column of arrays, or the total number of points in a column of
112  // GeoLineString's, etc.
114  int64_t output_item_values_total_number) {
116  size_t(-1)); // set_output_item_values_total_number must
117  // be called before set_output_row_size
118  // because set_output_row_size allocates
119  // the output buffers
120  int32_t num_out_columns = get_ncols();
121  CHECK_LE(0, index);
122  CHECK_LT(index, num_out_columns);
123  output_item_values_total_number_[index] = output_item_values_total_number;
124  }
125 
126  // Set the total number of array values in a column of arrays.
128  int64_t output_array_values_total_number) {
129  set_output_item_values_total_number(index, output_array_values_total_number);
130  }
131 
132  void allocate_output_buffers(int64_t output_num_rows) {
133  check_thread_id();
135  size_t(-1)); // re-allocation of output buffers is not supported
136 
137  output_num_rows_ = output_num_rows;
138  auto num_out_columns = get_ncols();
140  output_num_rows, // divide by row multiplier???
142 
143  for (size_t i = 0; i < num_out_columns; i++) {
144  // All outputs have padded width set to logical column width
145  auto ti = exe_unit_.target_exprs[i]->get_type_info();
146  if (ti.usesFlatBuffer()) {
147  int64_t total_number = -1;
148  switch (ti.get_type()) {
149  case kTEXT:
150  if (ti.get_compression() != kENCODING_NONE) {
151  UNREACHABLE() << "allocate_output_buffers not implemented for "
152  << ti.toString();
153  }
154  case kARRAY:
155  case kLINESTRING:
156  case kPOLYGON:
157  case kMULTIPOINT:
158  case kMULTILINESTRING:
159  case kMULTIPOLYGON: {
160  if (output_item_values_total_number_[i] == -1) {
161  throw std::runtime_error("set_output_item_values_total_number(" +
162  std::to_string(i) +
163  ", <total_number>) must be called before "
164  "set_output_row_size(<size>) in " +
166  }
167  total_number = output_item_values_total_number_[i];
168  break;
169  }
170  case kPOINT:
171  break;
172  default:
173  UNREACHABLE() << "allocate_output_buffers not implemented for "
174  << ti.toString();
175  }
176  /*
177  Here we compute the byte size of flatbuffer and store it in
178  query memory descriptor's ColSlotContext instance. The
179  flatbuffer memory will be allocated in
180  QueryMemoryInitializer constructor and the memory will be
181  initialized below.
182  */
184  output_num_rows_, total_number, ti)); // used by QueryMemoryInitializer
185  } else {
186  const size_t col_width = ti.get_size();
187  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
188  }
189  }
190 
191  // The members layout of Column must match with Column defined in
192  // heavydbTypes.h
193  struct Column {
194  int8_t* ptr;
195  int64_t size;
196  // just for debugging:
197  std::string toString() const {
198  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
199  }
200  };
201  // We do not init output buffers for CPU currently, so CPU
202  // table functions are expected to handle their own initialization
203  query_buffers = std::make_unique<QueryMemoryInitializer>(
204  exe_unit_,
206  /*device_id=*/0,
208  (output_num_rows_ == 0 ? 1 : output_num_rows_),
209  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
210  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
212  nullptr,
213  executor_);
214  if (output_num_rows_ != 0) {
215  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
216  CHECK(group_by_buffers_ptr);
217  auto output_buffers_ptr = reinterpret_cast<int8_t*>(group_by_buffers_ptr[0]);
218  for (size_t i = 0; i < num_out_columns; i++) {
219  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
220  CHECK(col);
221  // set the members of output Column instances:
222  output_col_buf_ptrs[i] = reinterpret_cast<int64_t*>(output_buffers_ptr);
223  col->ptr = output_buffers_ptr;
224  col->size = output_num_rows_;
225 
226  auto ti = exe_unit_.target_exprs[i]->get_type_info();
227  if (ti.usesFlatBuffer()) {
228  FlatBufferManager m{output_buffers_ptr};
229  int64_t total_number = -1;
230  switch (ti.get_type()) {
231  case kTEXT:
232  if (ti.get_compression() != kENCODING_NONE) {
233  UNREACHABLE() << "allocate_output_buffers not implemented for "
234  << ti.toString();
235  }
236  case kARRAY:
237  case kLINESTRING:
238  case kPOLYGON:
239  case kMULTIPOINT:
240  case kMULTILINESTRING:
241  case kMULTIPOLYGON: {
242  total_number = output_item_values_total_number_[i];
243  break;
244  }
245  case kPOINT:
246  break;
247  default:
248  UNREACHABLE() << "allocate_output_buffers not implemented for "
249  << ti.toString();
250  }
251  initializeFlatBuffer(m, output_num_rows_, total_number, ti);
252  CHECK(FlatBufferManager::isFlatBuffer(output_buffers_ptr));
253  // Checks if the implementations of getFlatBufferSize and
254  // initializeFlatBuffer in sqltypes.h are in sync:
255  CHECK_EQ(m.getBufferSize(), query_mem_desc.getFlatBufferSize(i));
256  output_buffers_ptr = align_to_int64(output_buffers_ptr + m.getBufferSize());
257  } else {
258  const size_t col_width = ti.get_size();
259  output_buffers_ptr =
260  align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
261  }
262  }
263  }
264  }
265 
266  const char* get_error_message() const {
267  check_thread_id();
268  return error_message_.c_str();
269  }
270 
271  void set_error_message(const char* msg) {
272  check_thread_id();
273  error_message_ = std::string(msg);
274  }
275 
276  void set_metadata(const char* key,
277  const uint8_t* raw_bytes,
278  const size_t num_bytes,
279  const TableFunctionMetadataType value_type) const {
281  row_set_mem_owner_->setTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
282  }
283 
284  void get_metadata(const char* key,
285  const uint8_t*& raw_bytes,
286  size_t& num_bytes,
287  TableFunctionMetadataType& value_type) const {
289  row_set_mem_owner_->getTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
290  }
291 
292  inline int32_t getNewDictDbId() {
293  const auto proxy = executor_->getStringDictionaryProxy(
295  return proxy->getDictKey().db_id;
296  }
297 
298  inline int32_t getNewDictId() {
299  const auto proxy = executor_->getStringDictionaryProxy(
301  return proxy->getDictKey().dict_id;
302  }
303 
304  inline int8_t* getStringDictionaryProxy(int32_t db_id, int32_t dict_id) {
305  return reinterpret_cast<int8_t*>(
306  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true));
307  }
308 
309  inline std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id) {
310  const auto proxy =
311  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
312  return proxy->getString(string_id);
313  }
314 
315  inline const int32_t getOrAddTransient(int32_t db_id,
316  int32_t dict_id,
317  const std::string& str) {
318  const auto proxy =
319  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
320  return proxy->getOrAddTransient(str);
321  }
322 
323  inline int8_t* makeBuffer(int64_t element_count, int64_t element_size) {
324  int8_t* buffer =
325  reinterpret_cast<int8_t*>(checked_malloc((element_count + 1) * element_size));
326  row_set_mem_owner_->addVarlenBuffer(buffer);
327  return buffer;
328  }
329 
330  // Methods for managing singleton instance of TableFunctionManager:
331 
332  bool isSingleton() const { return is_singleton_; }
333 
335  if (isSingleton()) {
336  set_singleton(nullptr); // end of singleton life
337  }
338  }
339 
341  static TableFunctionManager* instance = nullptr;
342  return instance;
343  }
344 
345  private:
348 
349  static void set_singleton(TableFunctionManager* instance) {
351  // ensure being singleton and lock/unlock
352  if (instance) {
353  instance->lock();
354  CHECK(instance_ == nullptr);
355  } else {
356  CHECK(instance_ != nullptr);
357  instance_->unlock();
358  }
359  instance_ = instance;
360  }
361 
363  Executor* executor_;
364  // Pointers to the buffers of input Columns
365  std::vector<const int8_t*>& col_buf_ptrs_;
366  //
367  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
368  // Pointers to the buffers of output Columns
369  std::vector<int64_t*> output_col_buf_ptrs;
370  // Number of rows of output Columns
372  // Total number of item values (scalars, points, etc) in the output
373  // columns of non-scalars (arrays, linestrings, etc)
374  std::vector<int64_t> output_item_values_total_number_;
375  // Pointers to output Column instances
376  std::vector<int8_t*> output_column_ptrs;
377  // If TableFunctionManager is global
379  // Store thread id for sanity check
380  std::thread::id thread_id_;
381  // Error message
382  std::string error_message_;
383 };
std::vector< int64_t > output_item_values_total_number_
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id)
std::vector< const int8_t * > & col_buf_ptrs_
void set_output_array_values_total_number(int32_t index, int64_t output_array_values_total_number)
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void get_metadata(const char *key, const uint8_t *&raw_bytes, size_t &num_bytes, TableFunctionMetadataType &value_type) const
const table_functions::TableFunction table_func
void set_error_message(const char *msg)
DEVICE int64_t size() const
#define UNREACHABLE()
Definition: Logger.h:338
std::vector< int8_t * > output_column_ptrs
void initializeFlatBuffer(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1991
Constants for Builtin SQL Types supported by HEAVY.AI.
int8_t * getStringDictionaryProxy(int32_t db_id, int32_t dict_id)
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
std::vector< int64_t * > output_col_buf_ptrs
static TableFunctionManager *& get_singleton_internal()
std::string to_string(char const *&&v)
void allocate_output_buffers(int64_t output_num_rows)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
int64_t getFlatBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1839
std::mutex TableFunctionManager_singleton_mutex
std::string getName(const bool drop_suffix=false, const bool lower=false) const
TableFunctionMetadataType
#define CHECK_LT(x, y)
Definition: Logger.h:303
int8_t * makeBuffer(int64_t element_count, int64_t element_size)
Definition: sqltypes.h:79
#define CHECK_LE(x, y)
Definition: Logger.h:304
UserTableFunctionError(const std::string &message)
void set_metadata(const char *key, const uint8_t *raw_bytes, const size_t num_bytes, const TableFunctionMetadataType value_type) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void set_output_item_values_total_number(int32_t index, int64_t output_item_values_total_number)
#define CHECK(condition)
Definition: Logger.h:291
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:528
const int32_t getOrAddTransient(int32_t db_id, int32_t dict_id, const std::string &str)
const char * get_error_message() const
std::string toString() const
Definition: heavydbTypes.h:436
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
TableFunctionErrorCode
static void set_singleton(TableFunctionManager *instance)