OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ExternalExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Logger/Logger.h"
20 #include "QueryEngine/Execute.h"
23 
24 namespace {
25 
26 struct OmniSciVtab {
27  sqlite3_vtab base;
28  sqlite3* db;
30 };
31 
32 struct OmniSciCursor {
33  sqlite3_vtab_cursor base;
34  int count;
35  int eof;
36 };
37 
38 int vt_destructor(sqlite3_vtab* pVtab) {
39  OmniSciVtab* p = reinterpret_cast<OmniSciVtab*>(pVtab);
40  sqlite3_free(p);
41 
42  return 0;
43 }
44 
45 int vt_create(sqlite3* db,
46  void* p_aux,
47  int argc,
48  const char* const* argv,
49  sqlite3_vtab** pp_vt,
50  char** pzErr) {
51  // Allocate the sqlite3_vtab/OmniSciVtab structure itself.
52  auto p_vt = static_cast<OmniSciVtab*>(sqlite3_malloc(sizeof(OmniSciVtab)));
53  if (!p_vt) {
54  return SQLITE_NOMEM;
55  }
56 
57  p_vt->db = db;
58  p_vt->external_query_table = reinterpret_cast<const ExternalQueryTable*>(p_aux);
59 
60  std::vector<std::string> col_defs;
61  std::transform(p_vt->external_query_table->schema.begin(),
62  p_vt->external_query_table->schema.end(),
63  std::back_inserter(col_defs),
64  [](const TargetMetaInfo& target_metainfo) {
65  return target_metainfo.get_resname() + " " +
66  target_metainfo.get_type_info().get_type_name();
67  });
68  const auto col_defs_str = boost::algorithm::join(col_defs, ", ");
69  const auto create_statement =
70  "create table vtable (" + (col_defs_str.empty() ? "dummy int" : col_defs_str) + ")";
71 
72  // Declare the vtable's structure.
73  int rc = sqlite3_declare_vtab(db, create_statement.c_str());
74 
75  if (rc != SQLITE_OK) {
76  vt_destructor(reinterpret_cast<sqlite3_vtab*>(p_vt));
77  return SQLITE_ERROR;
78  }
79 
80  // Success. Set *pp_vt and return.
81  *pp_vt = &p_vt->base;
82 
83  return SQLITE_OK;
84 }
85 
86 int vt_connect(sqlite3* db,
87  void* p_aux,
88  int argc,
89  const char* const* argv,
90  sqlite3_vtab** pp_vt,
91  char** pzErr) {
92  return vt_create(db, p_aux, argc, argv, pp_vt, pzErr);
93 }
94 
95 int vt_disconnect(sqlite3_vtab* pVtab) {
96  return vt_destructor(pVtab);
97 }
98 
99 int vt_destroy(sqlite3_vtab* pVtab) {
100  int rc = SQLITE_OK;
101 
102  if (rc == SQLITE_OK) {
103  rc = vt_destructor(pVtab);
104  }
105 
106  return rc;
107 }
108 
109 int vt_open(sqlite3_vtab* pVTab, sqlite3_vtab_cursor** pp_cursor) {
110  auto p_cur = static_cast<OmniSciCursor*>(sqlite3_malloc(sizeof(OmniSciCursor)));
111  *pp_cursor = reinterpret_cast<sqlite3_vtab_cursor*>(p_cur);
112 
113  return (p_cur ? SQLITE_OK : SQLITE_NOMEM);
114 }
115 
116 int vt_close(sqlite3_vtab_cursor* cur) {
117  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
118  sqlite3_free(p_cur);
119 
120  return SQLITE_OK;
121 }
122 
123 int vt_eof(sqlite3_vtab_cursor* cur) {
124  return reinterpret_cast<OmniSciCursor*>(cur)->eof;
125 }
126 
127 int64_t get_num_rows(OmniSciCursor* p_cur) {
128  auto p = reinterpret_cast<OmniSciVtab*>(p_cur->base.pVtab);
129  CHECK_EQ(p->external_query_table->fetch_result.num_rows.size(), size_t(1));
130  CHECK_EQ(p->external_query_table->fetch_result.num_rows.front().size(), size_t(1));
131  return p->external_query_table->fetch_result.num_rows.front().front();
132 }
133 
134 int vt_next(sqlite3_vtab_cursor* cur) {
135  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
136  const auto num_rows = get_num_rows(p_cur);
137 
138  if (p_cur->count == num_rows) {
139  p_cur->eof = 1;
140  }
141 
142  // Increment the current row count.
143  ++p_cur->count;
144 
145  return SQLITE_OK;
146 }
147 
149  std::pair<const char*, size_t> payload;
150  bool is_null;
151 };
152 
153 template <class T>
154 DecodedString decode_string(const int8_t* column,
155  const size_t cursor,
156  StringDictionaryProxy* sdp) {
157  const auto ids_column = reinterpret_cast<const T*>(column);
158  const auto val = ids_column[cursor];
160  if (val == inline_int_null_value<T>()) {
161  result.is_null = true;
162  } else {
163  result.payload = sdp->getStringBytes(val);
164  }
165  return result;
166 }
167 
168 int vt_column(sqlite3_vtab_cursor* cur, sqlite3_context* ctx, int col_idx) {
169  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
170  const auto num_rows = get_num_rows(p_cur);
171 
172  auto p = reinterpret_cast<OmniSciVtab*>(p_cur->base.pVtab);
173  const auto& external_query_table = *(p->external_query_table);
174  CHECK_LT(static_cast<size_t>(col_idx),
175  external_query_table.fetch_result.col_buffers[0].size());
176  const auto column = external_query_table.fetch_result.col_buffers[0][col_idx];
177  const auto& col_ti = external_query_table.schema[col_idx].get_type_info();
178  switch (col_ti.get_type()) {
179  case kTINYINT: {
180  const auto val = column[p_cur->count - 1];
181  if (val == inline_int_null_value<int8_t>()) {
182  sqlite3_result_null(ctx);
183  } else {
184  sqlite3_result_int(ctx, val);
185  }
186  break;
187  }
188  case kSMALLINT: {
189  const auto int_column = reinterpret_cast<const int16_t*>(column);
190  const auto val = int_column[p_cur->count - 1];
191  if (val == inline_int_null_value<int16_t>()) {
192  sqlite3_result_null(ctx);
193  } else {
194  sqlite3_result_int(ctx, val);
195  }
196  break;
197  }
198  case kINT: {
199  const auto int_column = reinterpret_cast<const int32_t*>(column);
200  const auto val = int_column[p_cur->count - 1];
201  if (val == inline_int_null_value<int32_t>()) {
202  sqlite3_result_null(ctx);
203  } else {
204  sqlite3_result_int(ctx, val);
205  }
206  break;
207  }
208  case kBIGINT: {
209  const auto int_column = reinterpret_cast<const int64_t*>(column);
210  const auto val = int_column[p_cur->count - 1];
211  if (val == inline_int_null_value<int64_t>()) {
212  sqlite3_result_null(ctx);
213  } else {
214  sqlite3_result_int(ctx, val);
215  }
216  break;
217  }
218  case kFLOAT: {
219  const auto float_column = reinterpret_cast<const float*>(column);
220  const auto val = float_column[p_cur->count - 1];
221  if (val == inline_fp_null_value<float>()) {
222  sqlite3_result_null(ctx);
223  } else {
224  sqlite3_result_double(ctx, val);
225  }
226  break;
227  }
228  case kDOUBLE: {
229  const auto double_column = reinterpret_cast<const double*>(column);
230  const auto val = double_column[p_cur->count - 1];
231  if (val == inline_fp_null_value<double>()) {
232  sqlite3_result_null(ctx);
233  } else {
234  sqlite3_result_double(ctx, val);
235  }
236  break;
237  }
238  case kTEXT: {
239  if (col_ti.get_compression() == kENCODING_DICT) {
240  const auto executor = external_query_table.executor;
241  const auto sdp = executor->getStringDictionaryProxy(
242  col_ti.getStringDictKey(), executor->getRowSetMemoryOwner(), true);
243  CHECK(sdp);
244  DecodedString decoded_string;
245  switch (col_ti.get_size()) {
246  case 1: {
247  decoded_string = decode_string<uint8_t>(column, p_cur->count - 1, sdp);
248  break;
249  }
250  case 2: {
251  decoded_string = decode_string<uint16_t>(column, p_cur->count - 1, sdp);
252  break;
253  }
254  case 4: {
255  decoded_string = decode_string<int32_t>(column, p_cur->count - 1, sdp);
256  break;
257  }
258  default: {
259  decoded_string = DecodedString{};
260  LOG(FATAL) << "Invalid encoding size: " << col_ti.get_size();
261  }
262  }
263  if (decoded_string.is_null) {
264  sqlite3_result_null(ctx);
265  } else {
266  sqlite3_result_text(
267  ctx, decoded_string.payload.first, decoded_string.payload.second, nullptr);
268  }
269  } else {
270  CHECK(col_ti.get_compression() == kENCODING_NONE);
271  const auto chunk_iter =
272  const_cast<ChunkIter*>(reinterpret_cast<const ChunkIter*>(column));
273  VarlenDatum vd;
274  bool is_end;
275  ChunkIter_get_nth(chunk_iter, p_cur->count - 1, false, &vd, &is_end);
276  if (vd.is_null) {
277  sqlite3_result_null(ctx);
278  } else {
279  sqlite3_result_text(
280  ctx, reinterpret_cast<const char*>(vd.pointer), vd.length, nullptr);
281  }
282  }
283  break;
284  }
285  default: {
286  LOG(FATAL) << "Unexpected type: " << col_ti.get_type_name();
287  break;
288  }
289  }
290  CHECK_LE(p_cur->count, num_rows);
291 
292  return SQLITE_OK;
293 }
294 
295 int vt_rowid(sqlite3_vtab_cursor* cur, sqlite_int64* p_rowid) {
296  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
297  // Just use the current row count as the rowid.
298  *p_rowid = p_cur->count;
299  return SQLITE_OK;
300 }
301 
302 int vt_filter(sqlite3_vtab_cursor* p_vtc,
303  int idxNum,
304  const char* idxStr,
305  int argc,
306  sqlite3_value** argv) {
307  // Initialize the cursor structure.
308  auto p_cur = reinterpret_cast<OmniSciCursor*>(p_vtc);
309  // Zero rows returned thus far.
310  p_cur->count = 0;
311  // Have not reached end of set.
312  p_cur->eof = 0;
313  // Move cursor to first row.
314  return vt_next(p_vtc);
315 }
316 
317 // We don't implement indexing.
318 int vt_best_index(sqlite3_vtab* tab, sqlite3_index_info* pIdxInfo) {
319  return SQLITE_OK;
320 }
321 
322 sqlite3_module omnisci_module = {
323  0, // iVersion
324  vt_create, // xCreate - create a vtable
325  vt_connect, // xConnect - associate a vtable with a connection
326  vt_best_index, // xBestIndex - best index
327  vt_disconnect, // xDisconnect - disassociate a vtable with a connection
328  vt_destroy, // xDestroy - destroy a vtable
329  vt_open, // xOpen - open a cursor
330  vt_close, // xClose - close a cursor
331  vt_filter, // xFilter - configure scan constraints
332  vt_next, // xNext - advance a cursor
333  vt_eof, // xEof - inidicate end of result set
334  vt_column, // xColumn - read data
335  vt_rowid, // xRowid - read data
336  nullptr, // xUpdate - write data
337  nullptr, // xBegin - begin transaction
338  nullptr, // xSync - sync transaction
339  nullptr, // xCommit - commit transaction
340  nullptr, // xRollback - rollback transaction
341  nullptr, // xFindFunction - function overloading
342  nullptr, // xRename - function overloading
343  nullptr, // xSavepoint - function overloading
344  nullptr, // xRelease - function overloading
345  nullptr // xRollbackto - function overloading
346 };
347 
348 std::vector<TargetMetaInfo> create_table_schema(const PlanState* plan_state) {
349  std::map<size_t, TargetMetaInfo> schema_map;
350  for (const auto& kv : plan_state->global_to_local_col_ids_) {
351  const auto& table_key = kv.first.getScanDesc().getTableKey();
352  const auto catalog =
354  const int column_id = kv.first.getColId();
355  SQLTypeInfo column_type;
356  if (table_key.table_id < 0) {
357  const auto result_set = get_temporary_table(
358  plan_state->executor_->getTemporaryTables(), table_key.table_id);
359  column_type = result_set->getColType(column_id);
360  } else {
361  CHECK(catalog);
362  const auto cd = catalog->getMetadataForColumn(table_key.table_id, column_id);
363  column_type = cd->columnType;
364  }
365  if (!is_supported_type_for_extern_execution(column_type)) {
366  throw std::runtime_error("Type not supported yet for extern execution: " +
367  column_type.get_type_name());
368  }
369  const auto column_ref =
370  serialize_column_ref(table_key.table_id, column_id, catalog.get());
371  const auto it_ok =
372  schema_map.emplace(kv.second, TargetMetaInfo(column_ref, column_type));
373  CHECK(it_ok.second);
374  }
375  std::vector<TargetMetaInfo> schema;
376  for (const auto& kv : schema_map) {
377  schema.push_back(kv.second);
378  }
379  return schema;
380 }
381 
382 } // namespace
383 
385  : external_query_table_(external_query_table) {
386  int status = sqlite3_open(":memory:", &db_);
387  CHECK_EQ(status, SQLITE_OK);
388  status = sqlite3_create_module(db_, "omnisci", &omnisci_module, &external_query_table_);
389  CHECK_EQ(status, SQLITE_OK);
390 }
391 
393  std::lock_guard session_lock(session_mutex_);
394  int status = sqlite3_close(db_);
395  CHECK_EQ(status, SQLITE_OK);
396 }
397 
398 void SqliteMemDatabase::run(const std::string& sql) {
399  std::lock_guard session_lock(session_mutex_);
400  char* msg;
401  int status = sqlite3_exec(db_, sql.c_str(), nullptr, nullptr, &msg);
402  CHECK_EQ(status, SQLITE_OK);
403 }
404 
405 namespace {
406 
407 int64_t* get_scan_output_slot(int64_t* output_buffer,
408  const size_t output_buffer_entry_count,
409  const size_t pos,
410  const size_t row_size_quad) {
411  const auto off = pos * row_size_quad;
412  CHECK_LT(pos, output_buffer_entry_count);
413  output_buffer[off] = off;
414  return output_buffer + off + 1;
415 }
416 
417 } // namespace
418 
419 std::unique_ptr<ResultSet> SqliteMemDatabase::runSelect(
420  const std::string& sql,
421  const ExternalQueryOutputSpec& output_spec) {
422  SqliteConnector connector(db_);
423  connector.query(sql);
424  auto query_mem_desc = output_spec.query_mem_desc;
425  const auto num_rows = connector.getNumRows();
426  query_mem_desc.setEntryCount(num_rows);
427  auto rs = std::make_unique<ResultSet>(output_spec.target_infos,
430  output_spec.executor->getRowSetMemoryOwner(),
431  0,
432  0);
433  const auto storage = rs->allocateStorage();
434  auto output_buffer = storage->getUnderlyingBuffer();
435  CHECK(!num_rows || output_buffer);
436  for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) {
437  auto row = get_scan_output_slot(reinterpret_cast<int64_t*>(output_buffer),
438  num_rows,
439  row_idx,
440  query_mem_desc.getRowSize() / sizeof(int64_t));
441  CHECK_EQ(output_spec.target_infos.size(), connector.getNumCols());
442  size_t slot_idx = 0;
443  for (size_t col_idx = 0; col_idx < connector.getNumCols(); ++col_idx, ++slot_idx) {
444  const auto& col_type = output_spec.target_infos[col_idx].sql_type;
445  const int sqlite_col_type = connector.columnTypes[col_idx];
446  switch (col_type.get_type()) {
447  case kBOOLEAN:
448  case kTINYINT:
449  case kSMALLINT:
450  case kINT:
451  case kBIGINT: {
452  static const std::string overflow_message{"Overflow or underflow"};
453  if (sqlite_col_type != SQLITE_INTEGER && sqlite_col_type != SQLITE_NULL) {
454  throw std::runtime_error(overflow_message);
455  }
456  if (!connector.isNull(row_idx, col_idx)) {
457  const auto limits = inline_int_max_min(col_type.get_logical_size());
458  const auto val = connector.getData<int64_t>(row_idx, col_idx);
459  if (val > limits.first || val < limits.second) {
460  throw std::runtime_error(overflow_message);
461  }
462  row[slot_idx] = val;
463  } else {
464  row[slot_idx] = inline_int_null_val(col_type);
465  }
466  break;
467  }
468  case kFLOAT: {
469  CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
470  if (!connector.isNull(row_idx, col_idx)) {
471  reinterpret_cast<double*>(row)[slot_idx] =
472  connector.getData<double>(row_idx, col_idx);
473  } else {
474  reinterpret_cast<double*>(row)[slot_idx] = inline_fp_null_value<float>();
475  }
476  break;
477  }
478  case kDOUBLE: {
479  CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
480  if (!connector.isNull(row_idx, col_idx)) {
481  reinterpret_cast<double*>(row)[slot_idx] =
482  connector.getData<double>(row_idx, col_idx);
483  } else {
484  reinterpret_cast<double*>(row)[slot_idx] = inline_fp_null_value<double>();
485  }
486  break;
487  }
488  case kCHAR:
489  case kTEXT:
490  case kVARCHAR: {
491  CHECK(sqlite_col_type == SQLITE_TEXT || sqlite_col_type == SQLITE_NULL);
492  if (!connector.isNull(row_idx, col_idx)) {
493  const auto str = connector.getData<std::string>(row_idx, col_idx);
494  const auto owned_str =
495  output_spec.executor->getRowSetMemoryOwner()->addString(str);
496  row[slot_idx] = reinterpret_cast<int64_t>(owned_str->c_str());
497  row[++slot_idx] = str.size();
498  } else {
499  row[slot_idx] = 0;
500  row[++slot_idx] = 0;
501  }
502  break;
503  }
504  default: {
505  LOG(FATAL) << "Unexpected type: " << col_type.get_type_name();
506  break;
507  }
508  }
509  }
510  }
511  return rs;
512 }
513 
515 
516 std::unique_ptr<ResultSet> run_query_external(
517  const ExecutionUnitSql& sql,
518  const FetchResult& fetch_result,
519  const PlanState* plan_state,
520  const ExternalQueryOutputSpec& output_spec) {
521  ExternalQueryTable external_query_table{fetch_result,
522  create_table_schema(plan_state),
523  sql.from_table,
524  output_spec.executor};
525  SqliteMemDatabase db(external_query_table);
526  const auto create_table = "create virtual table " + sql.from_table + " using omnisci";
527  db.run(create_table);
528  return db.runSelect(sql.query, output_spec);
529 }
530 
532  return ti.is_integer() || ti.is_fp() || ti.is_string();
533 }
std::lock_guard< T > lock_guard
bool is_supported_type_for_extern_execution(const SQLTypeInfo &ti)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::pair< const char *, size_t > getStringBytes(int32_t string_id) const noexcept
int vt_connect(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
T getData(const int row, const int col)
int vt_filter(sqlite3_vtab_cursor *p_vtc, int idxNum, const char *idxStr, int argc, sqlite3_value **argv)
bool is_null
Definition: Datum.h:57
int vt_rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *p_rowid)
int vt_close(sqlite3_vtab_cursor *cur)
std::unordered_map< InputColDescriptor, size_t > global_to_local_col_ids_
Definition: PlanState.h:59
RUNTIME_EXPORT ALWAYS_INLINE DEVICE int64_t * get_scan_output_slot(int64_t *output_buffer, const uint32_t output_buffer_entry_count, const uint32_t pos, const int64_t offset_in_fragment, const uint32_t row_size_quad)
#define LOG(tag)
Definition: Logger.h:285
bool is_fp() const
Definition: sqltypes.h:571
void run(const std::string &sql)
tuple cur
Definition: report.py:42
DecodedString decode_string(const int8_t *column, const size_t cursor, StringDictionaryProxy *sdp)
std::string join(T const &container, std::string const &delim)
virtual void query(const std::string &queryString)
int64_t get_num_rows(OmniSciCursor *p_cur)
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:182
const Executor * executor_
Definition: PlanState.h:68
int8_t * pointer
Definition: Datum.h:56
std::string serialize_column_ref(const int table_id, const int column_id, const Catalog_Namespace::Catalog *catalog)
const Executor * executor
std::unique_ptr< ResultSet > runSelect(const std::string &sql, const ExternalQueryOutputSpec &output_spec)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:246
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::vector< TargetMetaInfo > create_table_schema(const PlanState *plan_state)
int vt_open(sqlite3_vtab *pVTab, sqlite3_vtab_cursor **pp_cursor)
bool is_integer() const
Definition: sqltypes.h:565
int vt_column(sqlite3_vtab_cursor *cur, sqlite3_context *ctx, int col_idx)
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:320
std::vector< int > columnTypes
std::vector< TargetInfo > target_infos
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:79
QueryMemoryDescriptor query_mem_desc
bool isNull(const int row, const int col) const
#define CHECK_LE(x, y)
Definition: Logger.h:304
constexpr float inline_fp_null_value< float >()
ExternalQueryTable external_query_table_
constexpr double inline_fp_null_value< double >()
std::string get_type_name() const
Definition: sqltypes.h:482
Definition: sqltypes.h:68
std::string from_table
static std::mutex session_mutex_
virtual size_t getNumCols() const
#define CHECK(condition)
Definition: Logger.h:291
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
std::pair< int64_t, int64_t > inline_int_max_min(const size_t byte_width)
Definition: sqltypes.h:72
std::string query
bool is_string() const
Definition: sqltypes.h:559
int vt_create(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
SqliteMemDatabase(const ExternalQueryTable &external_query_table)
int vt_best_index(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo)
virtual size_t getNumRows() const
size_t length
Definition: Datum.h:55