OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TableFunctionManager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "QueryEngine/Execute.h"
21 #include "Shared/sqltypes.h"
22 
23 /*
24  The TableFunctionManager implements the following features:
25 
26  - Manage the memory of output column buffers.
27 
28  - Allow table functions to communicate error/exception messages up
29  to the execution context. Table functions can return with a call
30  to `table_function_error` with an error message. This will
31  indicate to the execution context that an error ocurred within the
32  table function, and the error will be propagated as an exception.
33 */
34 
35 // TableFunctionError encapsulates any runtime errors caused by table function execution.
36 class TableFunctionError : public std::runtime_error {
37  public:
38  TableFunctionError(const std::string& message) : std::runtime_error(message) {}
39 };
40 
41 // UserTableFunctionErrors represent errors thrown explicitly by user code within table
42 // functions, i.e. through calling table_function_error()
44  public:
45  UserTableFunctionError(const std::string& message) : TableFunctionError(message) {}
46 };
47 
48 // Use a set negative value to distinguish from already-existing
49 // negative return values
50 enum TableFunctionErrorCode : int32_t {
51  GenericError = -0x75BCD15,
52 };
53 
54 extern std::mutex TableFunctionManager_singleton_mutex;
55 
56 struct TableFunctionManager {
57  std::unique_ptr<QueryMemoryInitializer> query_buffers;
58 
60  Executor* executor,
61  std::vector<const int8_t*>& col_buf_ptrs,
62  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
63  bool is_singleton)
64  : exe_unit_(exe_unit)
65  , executor_(executor)
66  , col_buf_ptrs_(col_buf_ptrs)
67  , row_set_mem_owner_(row_set_mem_owner)
68  , output_num_rows_(-1)
69  , is_singleton_(is_singleton)
70  , thread_id_(std::this_thread::get_id()) {
71  if (isSingleton()) {
72  set_singleton(this); // start of singleton life
73  }
74  auto num_out_columns = get_ncols();
75  output_col_buf_ptrs.reserve(num_out_columns);
76  output_column_ptrs.reserve(num_out_columns);
77  output_array_values_total_number_.reserve(num_out_columns);
78  for (size_t i = 0; i < num_out_columns; i++) {
79  output_col_buf_ptrs.emplace_back(nullptr);
80  output_column_ptrs.emplace_back(nullptr);
81  output_array_values_total_number_.emplace_back(-1);
82  }
83  }
84 
85  // Return the number of output columns
86  size_t get_ncols() const { return exe_unit_.target_exprs.size(); }
87 
88  // Return the number of rows of output columns.
89  size_t get_nrows() const { return output_num_rows_; }
90 
91  void check_thread_id() const {
92  if (std::this_thread::get_id() != thread_id_) {
93  throw std::runtime_error(
94  "TableFunctionManager instance accessed from an alien thread!");
95  }
96  }
97 
98  // Store the pointer to output Column instance
99  void set_output_column(int32_t index, int8_t* ptr) {
100  check_thread_id();
101  CHECK(index >= 0 && index < static_cast<int32_t>(get_ncols()));
102  CHECK(ptr);
103  output_column_ptrs[index] = ptr;
104  }
105 
106  // Set the total number of array values in a column of arrays
108  int64_t output_array_values_total_number) {
110  size_t(-1)); // set_output_array_size must be called
111  // before set_output_row_size because
112  // set_output_row_size allocates the output
113  // buffers
114  int32_t num_out_columns = get_ncols();
115  CHECK_LE(0, index);
116  CHECK_LT(index, num_out_columns);
117  output_array_values_total_number_[index] = output_array_values_total_number;
118  }
119 
120  void allocate_output_buffers(int64_t output_num_rows) {
121  check_thread_id();
123  size_t(-1)); // re-allocation of output buffers is not supported
124 
125  output_num_rows_ = output_num_rows;
126  auto num_out_columns = get_ncols();
128  output_num_rows, // divide by row multiplier???
130  /*is_table_function=*/true);
131  query_mem_desc.setOutputColumnar(true);
132 
133  for (size_t i = 0; i < num_out_columns; i++) {
134  // All outputs have padded width set to logical column width
135  auto ti = exe_unit_.target_exprs[i]->get_type_info();
136  if (ti.is_array()) {
138  -1); // set_output_array_values_total_number(i, ...) is not called
139  /*
140  Here we compute the byte size of flatbuffer and store it in
141  query memory descriptor's ColSlotContext instance. The
142  flatbuffer memory will be allocated in
143  QueryMemoryInitializer constructor and the memory will be
144  initialized below.
145  */
146  const int64_t flatbuffer_size = getVarlenArrayBufferSize(
148  query_mem_desc.addColSlotInfoFlatBuffer(
149  flatbuffer_size); // used by QueryMemoryInitializer
150  } else {
151  const size_t col_width = ti.get_size();
152  query_mem_desc.addColSlotInfo({std::make_tuple(col_width, col_width)});
153  }
154  }
155 
156  // The members layout of Column must match with Column defined in
157  // heavydbTypes.h
158  struct Column {
159  int8_t* ptr;
160  int64_t size;
161  // just for debugging:
162  std::string toString() const {
163  return "Column{" + ::toString(ptr) + ", " + ::toString(size) + "}";
164  }
165  };
166  // We do not init output buffers for CPU currently, so CPU
167  // table functions are expected to handle their own initialization
168  query_buffers = std::make_unique<QueryMemoryInitializer>(
169  exe_unit_,
171  /*device_id=*/0,
173  (output_num_rows_ == 0 ? 1 : output_num_rows_),
174  std::vector<std::vector<const int8_t*>>{col_buf_ptrs_},
175  std::vector<std::vector<uint64_t>>{{0}}, // frag offsets
177  nullptr,
178  executor_);
179  if (output_num_rows_ != 0) {
180  auto group_by_buffers_ptr = query_buffers->getGroupByBuffersPtr();
181  CHECK(group_by_buffers_ptr);
182  auto output_buffers_ptr = reinterpret_cast<int8_t*>(group_by_buffers_ptr[0]);
183  for (size_t i = 0; i < num_out_columns; i++) {
184  Column* col = reinterpret_cast<Column*>(output_column_ptrs[i]);
185  CHECK(col);
186  // set the members of output Column instances:
187  output_col_buf_ptrs[i] = reinterpret_cast<int64_t*>(output_buffers_ptr);
188  col->ptr = output_buffers_ptr;
189  col->size = output_num_rows_;
190 
191  auto ti = exe_unit_.target_exprs[i]->get_type_info();
192  if (ti.is_array()) {
193  FlatBufferManager m{output_buffers_ptr};
195  m, output_num_rows_, output_array_values_total_number_[i], ti);
196  output_buffers_ptr = align_to_int64(output_buffers_ptr + m.flatbufferSize());
197  } else {
198  const size_t col_width = ti.get_size();
199  output_buffers_ptr =
200  align_to_int64(output_buffers_ptr + col_width * output_num_rows_);
201  }
202  }
203  }
204  }
205 
206  const char* get_error_message() const {
207  check_thread_id();
208  return error_message_.c_str();
209  }
210 
211  void set_error_message(const char* msg) {
212  check_thread_id();
213  error_message_ = std::string(msg);
214  }
215 
216  void set_metadata(const char* key,
217  const uint8_t* raw_bytes,
218  const size_t num_bytes,
219  const TableFunctionMetadataType value_type) const {
221  row_set_mem_owner_->setTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
222  }
223 
224  void get_metadata(const char* key,
225  const uint8_t*& raw_bytes,
226  size_t& num_bytes,
227  TableFunctionMetadataType& value_type) const {
229  row_set_mem_owner_->getTableFunctionMetadata(key, raw_bytes, num_bytes, value_type);
230  }
231 
232  inline int32_t getNewDictDbId() {
233  const auto proxy = executor_->getStringDictionaryProxy(
235  return proxy->getDictKey().db_id;
236  }
237 
238  inline int32_t getNewDictId() {
239  const auto proxy = executor_->getStringDictionaryProxy(
241  return proxy->getDictKey().dict_id;
242  }
243 
244  inline int8_t* getStringDictionaryProxy(int32_t db_id, int32_t dict_id) {
245  return reinterpret_cast<int8_t*>(
246  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true));
247  }
248 
249  inline std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id) {
250  const auto proxy =
251  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
252  return proxy->getString(string_id);
253  }
254 
255  inline const char* getCString(int32_t db_id, int32_t dict_id, int32_t string_id) {
256  const auto proxy =
257  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
258  auto [c_str, len] = proxy->getStringBytes(string_id);
259  return c_str;
260  }
261 
262  inline const int32_t getOrAddTransient(int32_t db_id,
263  int32_t dict_id,
264  const std::string& str) {
265  const auto proxy =
266  executor_->getStringDictionaryProxy({db_id, dict_id}, row_set_mem_owner_, true);
267  return proxy->getOrAddTransient(str);
268  }
269 
270  // Methods for managing singleton instance of TableFunctionManager:
271 
272  bool isSingleton() const { return is_singleton_; }
273 
275  if (isSingleton()) {
276  set_singleton(nullptr); // end of singleton life
277  }
278  }
279 
281  static TableFunctionManager* instance = nullptr;
282  return instance;
283  }
284 
285  private:
288 
289  static void set_singleton(TableFunctionManager* instance) {
290  auto& instance_ = get_singleton();
291  // ensure being singleton and lock/unlock
292  if (instance) {
293  instance->lock();
294  CHECK(instance_ == nullptr);
295  } else {
296  CHECK(instance_ != nullptr);
297  instance_->unlock();
298  }
299  instance_ = instance;
300  }
301 
303  Executor* executor_;
304  // Pointers to the buffers of input Columns
305  std::vector<const int8_t*>& col_buf_ptrs_;
306  //
307  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
308  // Pointers to the buffers of output Columns
309  std::vector<int64_t*> output_col_buf_ptrs;
310  // Number of rows of output Columns
312  // Total number of array values in the output columns of arrays
313  std::vector<int64_t> output_array_values_total_number_;
314  // Pointers to output Column instances
315  std::vector<int8_t*> output_column_ptrs;
316  // If TableFunctionManager is global
318  // Store thread id for sanity check
319  std::thread::id thread_id_;
320  // Error message
321  std::string error_message_;
322 };
std::unique_ptr< QueryMemoryInitializer > query_buffers
void set_output_column(int32_t index, int8_t *ptr)
void addColSlotInfoFlatBuffer(const int64_t flatbuffer_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::string getString(int32_t db_id, int32_t dict_id, int32_t string_id)
std::vector< const int8_t * > & col_buf_ptrs_
void set_output_array_values_total_number(int32_t index, int64_t output_array_values_total_number)
TableFunctionManager(const TableFunctionExecutionUnit &exe_unit, Executor *executor, std::vector< const int8_t * > &col_buf_ptrs, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, bool is_singleton)
const TableFunctionExecutionUnit & exe_unit_
void get_metadata(const char *key, const uint8_t *&raw_bytes, size_t &num_bytes, TableFunctionMetadataType &value_type) const
void set_error_message(const char *msg)
DEVICE int64_t size() const
Definition: heavydbTypes.h:751
void setOutputColumnar(const bool val)
std::vector< int8_t * > output_column_ptrs
Constants for Builtin SQL Types supported by HEAVY.AI.
int8_t * getStringDictionaryProxy(int32_t db_id, int32_t dict_id)
#define TRANSIENT_DICT_ID
Definition: DbObjectKeys.h:24
std::vector< int64_t * > output_col_buf_ptrs
void allocate_output_buffers(int64_t output_num_rows)
#define CHECK_NE(x, y)
Definition: Logger.h:302
const char * getCString(int32_t db_id, int32_t dict_id, int32_t string_id)
std::string toString(const ExecutorDeviceType &device_type)
std::mutex TableFunctionManager_singleton_mutex
TableFunctionMetadataType
#define CHECK_LT(x, y)
Definition: Logger.h:303
void initializeVarlenArray(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1481
#define CHECK_LE(x, y)
Definition: Logger.h:304
UserTableFunctionError(const std::string &message)
void set_metadata(const char *key, const uint8_t *raw_bytes, const size_t num_bytes, const TableFunctionMetadataType value_type) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
std::vector< int64_t > output_array_values_total_number_
int64_t getVarlenArrayBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
Definition: sqltypes.h:1461
#define CHECK(condition)
Definition: Logger.h:291
TableFunctionError(const std::string &message)
std::vector< Analyzer::Expr * > target_exprs
void addColSlotInfo(const std::vector< std::tuple< int8_t, int8_t >> &slots_for_col)
static TableFunctionManager * get_singleton()
const int32_t getOrAddTransient(int32_t db_id, int32_t dict_id, const std::string &str)
const char * get_error_message() const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
TableFunctionErrorCode
static TableFunctionManager *& get_singleton()
static void set_singleton(TableFunctionManager *instance)