OmniSciDB  f17484ade4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "QueryEngine/Execute.h"
21 #include "Shared/sqltypes.h"
22 
23 /*
24  The TableFunctionManager implements the following features:
25 
26  - Manage the memory of output column buffers.
27 
28  - Allow table functions to communicate error/exception messages up
29  to the execution context. Table functions can return with a call
30  to `table_function_error` with an error message. This will
31  indicate to the execution context that an error ocurred within the
32  table function, and the error will be propagated as an exception.
33 */
34 
35 // TableFunctionError encapsulates any runtime errors caused by table function execution.
36 class TableFunctionError : public std::runtime_error {
37  public:
38  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
39 };
40 
41 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
42 // functions, i.e. through calling table_function_error()
44  public:
45  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
46 };
47 
48 // Use a set negative value to distinguish from already-existing
49 // negative return values
50 enum TableFunctionErrorCode : int32_t {
51  GenericError = -0x75BCD15,
52 };
53 
54 extern std::mutex TableFunctionManager_singleton_mutex;
55 
56 struct TableFunctionManager {
57  std::unique_ptr<QueryMemoryInitializer> query_buffers;
58 
60  Executor* executor,
61  std::vector<const int8_t*>& col_buf_ptrs,
62  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
63  bool is_singleton)
64  : exe_unit_(exe_unit)
65  , executor_(executor)
66  , col_buf_ptrs_(col_buf_ptrs)
67  , row_set_mem_owner_(row_set_mem_owner)
68  , output_num_rows_(-1)
69  , is_singleton_(is_singleton)
70  , thread_id_(std::this_thread::get_id()) {
71  if (isSingleton()) {
72  set_singleton(this); // start of singleton life
73  }
74  auto num_out_columns = get_ncols();
75  output_col_buf_ptrs.reserve(num_out_columns);
76  output_column_ptrs.reserve(num_out_columns);
77  output_item_values_total_number_.reserve(num_out_columns);
78  for (size_t i = 0; i < num_out_columns; i++) {
79  output_col_buf_ptrs.emplace_back(nullptr);
80  output_column_ptrs.emplace_back(nullptr);
81  output_item_values_total_number_.emplace_back(-1);
82  }
83  }
84 
85  // Return the number of output columns
86  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
87 
88  // Return the number of rows of output columns.
89  size_t get_nrows() const { return output_num_rows_; }
90 
91  void check_thread_id() const {
92  if (std::this_thread::get_id() != thread_id_) {
93  throw std::runtime_error(
94  "TableFunctionManager instance accessed from an alien thread!");
95  }
96  }
97 
98  // Store the pointer to output Column instance
99  void set_output_column(int32_t index, int8_t* ptr) {
100  check_thread_id();
101  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
102  CHECK(ptr);
103  output_column_ptrs[index] = ptr;
104  }
105 
106  // Set the total number of item values in a column of
107  // non-scalars. For example, the total number of array values in a
108  // column of arrays, or the total number of points in a column of
109  // GeoLineString's, etc.
111  int64_t output_item_values_total_number) {
113  size_t(-1)); // set_output_item_values_total_number must
114  // be called before set_output_row_size
115  // because set_output_row_size allocates
116  // the output buffers
117  int32_t num_out_columns = get_ncols();
118  CHECK_LE(0, index);
119  CHECK_LT(index, num_out_columns);
120  output_item_values_total_number_[index] = output_item_values_total_number;
121  }
122 
123  // Set the total number of array values in a column of arrays.
125  int64_t output_array_values_total_number) {
126  set_output_item_values_total_number(index, output_array_values_total_number);
127  }
128 
129  void allocate_output_buffers(int64_t output_num_rows) {
130  check_thread_id();
132  size_t(-1)); // re-allocation of output buffers is not supported
133 
134  output_num_rows_ = output_num_rows;
135  auto num_out_columns = get_ncols();
137  output_num_rows, // divide by row multiplier???
139 
140  for (size_t i = 0; i < num_out_columns; i++) {
141  // All outputs have padded width set to logical column width
142  auto ti = exe_unit_.target_exprs[i]->get_type_info();
143  if (ti.usesFlatBuffer()) {
144  int64_t total_number = -1;
145  switch (ti.get_type()) {
146  case kTEXT:
147  if (ti.get_compression() != kENCODING_NONE) {
148  UNREACHABLE() << "allocate_output_buffers not implemented for "
149  << ti.toString();
150  }
151  case kARRAY:
152  case kLINESTRING:
153  case kPOLYGON:
154  case kMULTIPOINT:
155  case kMULTILINESTRING:
156  case kMULTIPOLYGON: {
157  if (output_item_values_total_number_[i] == -1) {
158  throw std::runtime_error("set_output_item_values_total_number(" +
159  std::to_string(i) +
160  ", <total_number>) must be called before "
161  "set_output_row_size(<size>) in " +
163  }
164  total_number = output_item_values_total_number_[i];
165  break;
166  }
167  case kPOINT:
168  break;
169  default:
170  UNREACHABLE() << "allocate_output_buffers not implemented for "
171  << ti.toString();
172  }
173  /*
174  Here we compute the byte size of flatbuffer and store it in
175  query memory descriptor's ColSlotContext instance. The
176  flatbuffer memory will be allocated in
177  QueryMemoryInitializer constructor and the memory will be
178  initialized below.
179  */
181  output_num_rows_, total_number, ti)); // used by QueryMemoryInitializer
182  } else {
183  const size_t col_width = ti.get_size();
184  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
185  }
186  }
187 
188  // The members layout of Column must match with Column defined in
189  // heavydbTypes.h
190  struct Column {
191  int8_t* ptr;
192  int64_t size;
193  // just for debugging:
194  std::string toString() const {
195  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
196  }
197  };
198  // We do not init output buffers for CPU currently, so CPU
199  // table functions are expected to handle their own initialization
200  query_buffers = std::make_unique<QueryMemoryInitializer>(
201  exe_unit_,
203  /*device_id=*/0,
205  (output_num_rows_ == 0 ? 1 : output_num_rows_),
206  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
207  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
209  nullptr,
210  executor_);
211  if (output_num_rows_ != 0) {
212  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
213  CHECK(group_by_buffers_ptr);
214  auto output_buffers_ptr = reinterpret_cast<int8_t*>(group_by_buffers_ptr[0]);
215  for (size_t i = 0; i < num_out_columns; i++) {
216  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
217  CHECK(col);
218  // set the members of output Column instances:
219  output_col_buf_ptrs[i] = reinterpret_cast<int64_t*>(output_buffers_ptr);
220  col->ptr = output_buffers_ptr;
221  col->size = output_num_rows_;
222 
223  auto ti = exe_unit_.target_exprs[i]->get_type_info();
224  if (ti.usesFlatBuffer()) {
225  FlatBufferManager m{output_buffers_ptr};
226  int64_t total_number = -1;
227  switch (ti.get_type()) {
228  case kTEXT:
229  if (ti.get_compression() != kENCODING_NONE) {
230  UNREACHABLE() << "allocate_output_buffers not implemented for "
231  << ti.toString();
232  }
233  case kARRAY:
234  case kLINESTRING:
235  case kPOLYGON:
236  case kMULTIPOINT:
237  case kMULTILINESTRING:
238  case kMULTIPOLYGON: {
239  total_number = output_item_values_total_number_[i];
240  break;
241  }
242  case kPOINT:
243  break;
244  default:
245  UNREACHABLE() << "allocate_output_buffers not implemented for "
246  << ti.toString();
247  }
248  initializeFlatBuffer(m, output_num_rows_, total_number, ti);
249  CHECK(FlatBufferManager::isFlatBuffer(output_buffers_ptr));
250  // Checks if the implementations of getFlatBufferSize and
251  // initializeFlatBuffer in sqltypes.h are in sync:
252  CHECK_EQ(m.getBufferSize(), query_mem_desc.getFlatBufferSize(i));
253  output_buffers_ptr = align_to_int64(output_buffers_ptr + m.getBufferSize());
254  } else {
255  const size_t col_width = ti.get_size();
256  output_buffers_ptr =
257  align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
258  }
259  }
260  }
261  }
262 
263  const char* get_error_message() const {
264  check_thread_id();
265  return error_message_.c_str();
266  }
267 
268  void set_error_message(const char* msg) {
269  check_thread_id();
270  error_message_ = std::string(msg);
271  }
272 
273  void set_metadata(const char* key,
274  const uint8_t* raw_bytes,
275  const size_t num_bytes,
276  const TableFunctionMetadataType value_type) const {
278  row_set_mem_owner_->setTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
279  }
280 
281  void get_metadata(const char* key,
282  const uint8_t*& raw_bytes,
283  size_t& num_bytes,
284  TableFunctionMetadataType& value_type) const {
286  row_set_mem_owner_->getTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
287  }
288 
289  inline int32_t getNewDictDbId() {
290  const auto proxy = executor_->getStringDictionaryProxy(
292  return proxy->getDictKey().db_id;
293  }
294 
295  inline int32_t getNewDictId() {
296  const auto proxy = executor_->getStringDictionaryProxy(
298  return proxy->getDictKey().dict_id;
299  }
300 
301  inline int8_t* getStringDictionaryProxy(int32_t db_id, int32_t dict_id) {
302  return reinterpret_cast<int8_t*>(
303  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true));
304  }
305 
306  inline std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id) {
307  const auto proxy =
308  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
309  return proxy->getString(string_id);
310  }
311 
312  inline const int32_t getOrAddTransient(int32_t db_id,
313  int32_t dict_id,
314  const std::string& str) {
315  const auto proxy =
316  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
317  return proxy->getOrAddTransient(str);
318  }
319 
320  inline int8_t* makeBuffer(int64_t element_count, int64_t element_size) {
321  int8_t* buffer =
322  reinterpret_cast<int8_t*>(checked_malloc((element_count + 1) * element_size));
323  row_set_mem_owner_->addVarlenBuffer(buffer);
324  return buffer;
325  }
326 
327  // Methods for managing singleton instance of TableFunctionManager:
328 
329  bool isSingleton() const { return is_singleton_; }
330 
332  if (isSingleton()) {
333  set_singleton(nullptr); // end of singleton life
334  }
335  }
336 
338  static TableFunctionManager* instance = nullptr;
339  return instance;
340  }
341 
342  private:
345 
346  static void set_singleton(TableFunctionManager* instance) {
348  // ensure being singleton and lock/unlock
349  if (instance) {
350  instance->lock();
351  CHECK(instance_ == nullptr);
352  } else {
353  CHECK(instance_ != nullptr);
354  instance_->unlock();
355  }
356  instance_ = instance;
357  }
358 
360  Executor* executor_;
361  // Pointers to the buffers of input Columns
362  std::vector<const int8_t*>& col_buf_ptrs_;
363  //
364  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
365  // Pointers to the buffers of output Columns
366  std::vector<int64_t*> output_col_buf_ptrs;
367  // Number of rows of output Columns
369  // Total number of item values (scalars, points, etc) in the output
370  // columns of non-scalars (arrays, linestrings, etc)
371  std::vector<int64_t> output_item_values_total_number_;
372  // Pointers to output Column instances
373  std::vector<int8_t*> output_column_ptrs;
374  // If TableFunctionManager is global
376  // Store thread id for sanity check
377  std::thread::id thread_id_;
378  // Error message
379  std::string error_message_;
380 };
std::vector< int64_t > output_item_values_total_number_
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id)
std::vector< const int8_t * > & col_buf_ptrs_
void set_output_array_values_total_number(int32_t index, int64_t output_array_values_total_number)
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void get_metadata(const char *key, const uint8_t *&raw_bytes, size_t &num_bytes, TableFunctionMetadataType &value_type) const
const table_functions::TableFunction table_func
void set_error_message(const char *msg)
DEVICE int64_t size() const
#define UNREACHABLE()
Definition: Logger.h:338
std::vector< int8_t * > output_column_ptrs
void initializeFlatBuffer(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1991
Constants for Builtin SQL Types supported by HEAVY.AI.
int8_t * getStringDictionaryProxy(int32_t db_id, int32_t dict_id)
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
std::vector< int64_t * > output_col_buf_ptrs
static TableFunctionManager *& get_singleton_internal()
std::string to_string(char const *&&v)
void allocate_output_buffers(int64_t output_num_rows)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
int64_t getFlatBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1839
std::mutex TableFunctionManager_singleton_mutex
std::string getName(const bool drop_suffix=false, const bool lower=false) const
TableFunctionMetadataType
#define CHECK_LT(x, y)
Definition: Logger.h:303
int8_t * makeBuffer(int64_t element_count, int64_t element_size)
Definition: sqltypes.h:79
#define CHECK_LE(x, y)
Definition: Logger.h:304
UserTableFunctionError(const std::string &message)
void set_metadata(const char *key, const uint8_t *raw_bytes, const size_t num_bytes, const TableFunctionMetadataType value_type) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void set_output_item_values_total_number(int32_t index, int64_t output_item_values_total_number)
#define CHECK(condition)
Definition: Logger.h:291
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
HOST static DEVICE bool isFlatBuffer(const void *buffer)
Definition: FlatBuffer.h:528
const int32_t getOrAddTransient(int32_t db_id, int32_t dict_id, const std::string &str)
const char * get_error_message() const
std::string toString() const
Definition: heavydbTypes.h:434
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
TableFunctionErrorCode
static void set_singleton(TableFunctionManager *instance)