OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ExternalExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "QueryEngine/Execute.h"
21 #include "Shared/Logger.h"
23 
24 namespace {
25 
26 struct OmniSciVtab {
27  sqlite3_vtab base;
28  sqlite3* db;
30 };
31 
32 struct OmniSciCursor {
33  sqlite3_vtab_cursor base;
34  int count;
35  int eof;
36 };
37 
38 int vt_destructor(sqlite3_vtab* pVtab) {
39  OmniSciVtab* p = reinterpret_cast<OmniSciVtab*>(pVtab);
40  sqlite3_free(p);
41 
42  return 0;
43 }
44 
45 int vt_create(sqlite3* db,
46  void* p_aux,
47  int argc,
48  const char* const* argv,
49  sqlite3_vtab** pp_vt,
50  char** pzErr) {
51  // Allocate the sqlite3_vtab/OmniSciVtab structure itself.
52  auto p_vt = static_cast<OmniSciVtab*>(sqlite3_malloc(sizeof(OmniSciVtab)));
53  if (!p_vt) {
54  return SQLITE_NOMEM;
55  }
56 
57  p_vt->db = db;
58  p_vt->external_query_table = reinterpret_cast<const ExternalQueryTable*>(p_aux);
59 
60  std::vector<std::string> col_defs;
61  std::transform(p_vt->external_query_table->schema.begin(),
62  p_vt->external_query_table->schema.end(),
63  std::back_inserter(col_defs),
64  [](const TargetMetaInfo& target_metainfo) {
65  return target_metainfo.get_resname() + " " +
66  target_metainfo.get_type_info().get_type_name();
67  });
68  const auto col_defs_str = boost::algorithm::join(col_defs, ", ");
69  const auto create_statement =
70  "create table vtable (" + (col_defs_str.empty() ? "dummy int" : col_defs_str) + ")";
71 
72  // Declare the vtable's structure.
73  int rc = sqlite3_declare_vtab(db, create_statement.c_str());
74 
75  if (rc != SQLITE_OK) {
76  vt_destructor(reinterpret_cast<sqlite3_vtab*>(p_vt));
77  return SQLITE_ERROR;
78  }
79 
80  // Success. Set *pp_vt and return.
81  *pp_vt = &p_vt->base;
82 
83  return SQLITE_OK;
84 }
85 
86 int vt_connect(sqlite3* db,
87  void* p_aux,
88  int argc,
89  const char* const* argv,
90  sqlite3_vtab** pp_vt,
91  char** pzErr) {
92  return vt_create(db, p_aux, argc, argv, pp_vt, pzErr);
93 }
94 
95 int vt_disconnect(sqlite3_vtab* pVtab) {
96  return vt_destructor(pVtab);
97 }
98 
99 int vt_destroy(sqlite3_vtab* pVtab) {
100  int rc = SQLITE_OK;
101 
102  if (rc == SQLITE_OK) {
103  rc = vt_destructor(pVtab);
104  }
105 
106  return rc;
107 }
108 
109 int vt_open(sqlite3_vtab* pVTab, sqlite3_vtab_cursor** pp_cursor) {
110  auto p_cur = static_cast<OmniSciCursor*>(sqlite3_malloc(sizeof(OmniSciCursor)));
111  *pp_cursor = reinterpret_cast<sqlite3_vtab_cursor*>(p_cur);
112 
113  return (p_cur ? SQLITE_OK : SQLITE_NOMEM);
114 }
115 
116 int vt_close(sqlite3_vtab_cursor* cur) {
117  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
118  sqlite3_free(p_cur);
119 
120  return SQLITE_OK;
121 }
122 
123 int vt_eof(sqlite3_vtab_cursor* cur) {
124  return reinterpret_cast<OmniSciCursor*>(cur)->eof;
125 }
126 
127 int64_t get_num_rows(OmniSciCursor* p_cur) {
128  auto p = reinterpret_cast<OmniSciVtab*>(p_cur->base.pVtab);
129  CHECK_EQ(p->external_query_table->fetch_result.num_rows.size(), size_t(1));
130  CHECK_EQ(p->external_query_table->fetch_result.num_rows.front().size(), size_t(1));
131  return p->external_query_table->fetch_result.num_rows.front().front();
132 }
133 
134 int vt_next(sqlite3_vtab_cursor* cur) {
135  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
136  const auto num_rows = get_num_rows(p_cur);
137 
138  if (p_cur->count == num_rows) {
139  p_cur->eof = 1;
140  }
141 
142  // Increment the current row count.
143  ++p_cur->count;
144 
145  return SQLITE_OK;
146 }
147 
149  std::pair<const char*, size_t> payload;
150  bool is_null;
151 };
152 
153 template <class T>
154 DecodedString decode_string(const int8_t* column,
155  const size_t cursor,
156  StringDictionaryProxy* sdp) {
157  const auto ids_column = reinterpret_cast<const T*>(column);
158  const auto val = ids_column[cursor];
160  if (val == inline_int_null_value<T>()) {
161  result.is_null = true;
162  } else {
163  result.payload = sdp->getStringBytes(val);
164  }
165  return result;
166 }
167 
168 int vt_column(sqlite3_vtab_cursor* cur, sqlite3_context* ctx, int col_idx) {
169  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
170  const auto num_rows = get_num_rows(p_cur);
171 
172  auto p = reinterpret_cast<OmniSciVtab*>(p_cur->base.pVtab);
173  const auto& external_query_table = *(p->external_query_table);
174  CHECK_LT(static_cast<size_t>(col_idx),
175  external_query_table.fetch_result.col_buffers[0].size());
176  const auto column = external_query_table.fetch_result.col_buffers[0][col_idx];
177  const auto& col_ti = external_query_table.schema[col_idx].get_type_info();
178  switch (col_ti.get_type()) {
179  case kTINYINT: {
180  const auto val = column[p_cur->count - 1];
181  if (val == inline_int_null_value<int8_t>()) {
182  sqlite3_result_null(ctx);
183  } else {
184  sqlite3_result_int(ctx, val);
185  }
186  break;
187  }
188  case kSMALLINT: {
189  const auto int_column = reinterpret_cast<const int16_t*>(column);
190  const auto val = int_column[p_cur->count - 1];
191  if (val == inline_int_null_value<int16_t>()) {
192  sqlite3_result_null(ctx);
193  } else {
194  sqlite3_result_int(ctx, val);
195  }
196  break;
197  }
198  case kINT: {
199  const auto int_column = reinterpret_cast<const int32_t*>(column);
200  const auto val = int_column[p_cur->count - 1];
201  if (val == inline_int_null_value<int32_t>()) {
202  sqlite3_result_null(ctx);
203  } else {
204  sqlite3_result_int(ctx, val);
205  }
206  break;
207  }
208  case kBIGINT: {
209  const auto int_column = reinterpret_cast<const int64_t*>(column);
210  const auto val = int_column[p_cur->count - 1];
211  if (val == inline_int_null_value<int64_t>()) {
212  sqlite3_result_null(ctx);
213  } else {
214  sqlite3_result_int(ctx, val);
215  }
216  break;
217  }
218  case kFLOAT: {
219  const auto float_column = reinterpret_cast<const float*>(column);
220  const auto val = float_column[p_cur->count - 1];
221  if (val == inline_fp_null_value<float>()) {
222  sqlite3_result_null(ctx);
223  } else {
224  sqlite3_result_double(ctx, val);
225  }
226  break;
227  }
228  case kDOUBLE: {
229  const auto double_column = reinterpret_cast<const double*>(column);
230  const auto val = double_column[p_cur->count - 1];
231  if (val == inline_fp_null_value<double>()) {
232  sqlite3_result_null(ctx);
233  } else {
234  sqlite3_result_double(ctx, val);
235  }
236  break;
237  }
238  case kTEXT: {
239  if (col_ti.get_compression() == kENCODING_DICT) {
240  const auto executor = external_query_table.executor;
241  const auto sdp = executor->getStringDictionaryProxy(
242  col_ti.get_comp_param(), executor->getRowSetMemoryOwner(), true);
243  CHECK(sdp);
244  DecodedString decoded_string;
245  switch (col_ti.get_size()) {
246  case 1: {
247  decoded_string = decode_string<uint8_t>(column, p_cur->count - 1, sdp);
248  break;
249  }
250  case 2: {
251  decoded_string = decode_string<uint16_t>(column, p_cur->count - 1, sdp);
252  break;
253  }
254  case 4: {
255  decoded_string = decode_string<int32_t>(column, p_cur->count - 1, sdp);
256  break;
257  }
258  default: {
259  decoded_string = DecodedString{};
260  LOG(FATAL) << "Invalid encoding size: " << col_ti.get_size();
261  }
262  }
263  if (decoded_string.is_null) {
264  sqlite3_result_null(ctx);
265  } else {
266  sqlite3_result_text(
267  ctx, decoded_string.payload.first, decoded_string.payload.second, nullptr);
268  }
269  } else {
270  CHECK(col_ti.get_compression() == kENCODING_NONE);
271  const auto chunk_iter =
272  const_cast<ChunkIter*>(reinterpret_cast<const ChunkIter*>(column));
273  VarlenDatum vd;
274  bool is_end;
275  ChunkIter_get_nth(chunk_iter, p_cur->count - 1, false, &vd, &is_end);
276  if (vd.is_null) {
277  sqlite3_result_null(ctx);
278  } else {
279  sqlite3_result_text(
280  ctx, reinterpret_cast<const char*>(vd.pointer), vd.length, nullptr);
281  }
282  }
283  break;
284  }
285  default: {
286  LOG(FATAL) << "Unexpected type: " << col_ti.get_type_name();
287  break;
288  }
289  }
290  CHECK_LE(p_cur->count, num_rows);
291 
292  return SQLITE_OK;
293 }
294 
295 int vt_rowid(sqlite3_vtab_cursor* cur, sqlite_int64* p_rowid) {
296  auto p_cur = reinterpret_cast<OmniSciCursor*>(cur);
297  // Just use the current row count as the rowid.
298  *p_rowid = p_cur->count;
299  return SQLITE_OK;
300 }
301 
302 int vt_filter(sqlite3_vtab_cursor* p_vtc,
303  int idxNum,
304  const char* idxStr,
305  int argc,
306  sqlite3_value** argv) {
307  // Initialize the cursor structure.
308  auto p_cur = reinterpret_cast<OmniSciCursor*>(p_vtc);
309  // Zero rows returned thus far.
310  p_cur->count = 0;
311  // Have not reached end of set.
312  p_cur->eof = 0;
313  // Move cursor to first row.
314  return vt_next(p_vtc);
315 }
316 
317 // We don't implement indexing.
318 int vt_best_index(sqlite3_vtab* tab, sqlite3_index_info* pIdxInfo) {
319  return SQLITE_OK;
320 }
321 
322 sqlite3_module omnisci_module = {
323  0, // iVersion
324  vt_create, // xCreate - create a vtable
325  vt_connect, // xConnect - associate a vtable with a connection
326  vt_best_index, // xBestIndex - best index
327  vt_disconnect, // xDisconnect - disassociate a vtable with a connection
328  vt_destroy, // xDestroy - destroy a vtable
329  vt_open, // xOpen - open a cursor
330  vt_close, // xClose - close a cursor
331  vt_filter, // xFilter - configure scan constraints
332  vt_next, // xNext - advance a cursor
333  vt_eof, // xEof - inidicate end of result set
334  vt_column, // xColumn - read data
335  vt_rowid, // xRowid - read data
336  nullptr, // xUpdate - write data
337  nullptr, // xBegin - begin transaction
338  nullptr, // xSync - sync transaction
339  nullptr, // xCommit - commit transaction
340  nullptr, // xRollback - rollback transaction
341  nullptr, // xFindFunction - function overloading
342  nullptr, // xRename - function overloading
343  nullptr, // xSavepoint - function overloading
344  nullptr, // xRelease - function overloading
345  nullptr // xRollbackto - function overloading
346 };
347 
348 std::vector<TargetMetaInfo> create_table_schema(const PlanState* plan_state) {
349  std::map<size_t, TargetMetaInfo> schema_map;
350  const auto catalog = plan_state->executor_->getCatalog();
351  for (const auto& kv : plan_state->global_to_local_col_ids_) {
352  const int table_id = kv.first.getScanDesc().getTableId();
353  const int column_id = kv.first.getColId();
354  SQLTypeInfo column_type;
355  if (table_id < 0) {
356  const auto result_set =
357  get_temporary_table(plan_state->executor_->getTemporaryTables(), table_id);
358  column_type = result_set->getColType(column_id);
359  } else {
360  const auto cd = catalog->getMetadataForColumn(table_id, column_id);
361  column_type = cd->columnType;
362  }
363  if (!is_supported_type_for_extern_execution(column_type)) {
364  throw std::runtime_error("Type not supported yet for extern execution: " +
365  column_type.get_type_name());
366  }
367  const auto column_ref = serialize_column_ref(table_id, column_id, catalog);
368  const auto it_ok =
369  schema_map.emplace(kv.second, TargetMetaInfo(column_ref, column_type));
370  CHECK(it_ok.second);
371  }
372  std::vector<TargetMetaInfo> schema;
373  for (const auto& kv : schema_map) {
374  schema.push_back(kv.second);
375  }
376  return schema;
377 }
378 
379 } // namespace
380 
382  : external_query_table_(external_query_table) {
383  int status = sqlite3_open(":memory:", &db_);
384  CHECK_EQ(status, SQLITE_OK);
385  status = sqlite3_create_module(db_, "omnisci", &omnisci_module, &external_query_table_);
386  CHECK_EQ(status, SQLITE_OK);
387 }
388 
390  std::lock_guard session_lock(session_mutex_);
391  int status = sqlite3_close(db_);
392  CHECK_EQ(status, SQLITE_OK);
393 }
394 
395 void SqliteMemDatabase::run(const std::string& sql) {
396  std::lock_guard session_lock(session_mutex_);
397  char* msg;
398  int status = sqlite3_exec(db_, sql.c_str(), nullptr, nullptr, &msg);
399  CHECK_EQ(status, SQLITE_OK);
400 }
401 
402 namespace {
403 
404 int64_t* get_scan_output_slot(int64_t* output_buffer,
405  const size_t output_buffer_entry_count,
406  const size_t pos,
407  const size_t row_size_quad) {
408  const auto off = pos * row_size_quad;
409  CHECK_LT(pos, output_buffer_entry_count);
410  output_buffer[off] = off;
411  return output_buffer + off + 1;
412 }
413 
414 } // namespace
415 
416 std::unique_ptr<ResultSet> SqliteMemDatabase::runSelect(
417  const std::string& sql,
418  const ExternalQueryOutputSpec& output_spec) {
419  SqliteConnector connector(db_);
420  connector.query(sql);
421  auto query_mem_desc = output_spec.query_mem_desc;
422  const auto num_rows = connector.getNumRows();
423  query_mem_desc.setEntryCount(num_rows);
424  auto rs = std::make_unique<ResultSet>(output_spec.target_infos,
427  output_spec.executor->getRowSetMemoryOwner(),
428  nullptr);
429  const auto storage = rs->allocateStorage();
430  auto output_buffer = storage->getUnderlyingBuffer();
431  CHECK(!num_rows || output_buffer);
432  for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) {
433  auto row = get_scan_output_slot(reinterpret_cast<int64_t*>(output_buffer),
434  num_rows,
435  row_idx,
436  query_mem_desc.getRowSize() / sizeof(int64_t));
437  CHECK_EQ(output_spec.target_infos.size(), connector.getNumCols());
438  size_t slot_idx = 0;
439  for (size_t col_idx = 0; col_idx < connector.getNumCols(); ++col_idx, ++slot_idx) {
440  const auto& col_type = output_spec.target_infos[col_idx].sql_type;
441  const int sqlite_col_type = connector.columnTypes[col_idx];
442  switch (col_type.get_type()) {
443  case kBOOLEAN:
444  case kTINYINT:
445  case kSMALLINT:
446  case kINT:
447  case kBIGINT: {
448  static const std::string overflow_message{"Overflow or underflow"};
449  if (sqlite_col_type != SQLITE_INTEGER && sqlite_col_type != SQLITE_NULL) {
450  throw std::runtime_error(overflow_message);
451  }
452  if (!connector.isNull(row_idx, col_idx)) {
453  const auto limits = inline_int_max_min(col_type.get_logical_size());
454  const auto val = connector.getData<int64_t>(row_idx, col_idx);
455  if (val > limits.first || val < limits.second) {
456  throw std::runtime_error(overflow_message);
457  }
458  row[slot_idx] = val;
459  } else {
460  row[slot_idx] = inline_int_null_val(col_type);
461  }
462  break;
463  }
464  case kFLOAT: {
465  CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
466  if (!connector.isNull(row_idx, col_idx)) {
467  reinterpret_cast<double*>(row)[slot_idx] =
468  connector.getData<double>(row_idx, col_idx);
469  } else {
470  reinterpret_cast<double*>(row)[slot_idx] = inline_fp_null_value<float>();
471  }
472  break;
473  }
474  case kDOUBLE: {
475  CHECK(sqlite_col_type == SQLITE_FLOAT || sqlite_col_type == SQLITE_NULL);
476  if (!connector.isNull(row_idx, col_idx)) {
477  reinterpret_cast<double*>(row)[slot_idx] =
478  connector.getData<double>(row_idx, col_idx);
479  } else {
480  reinterpret_cast<double*>(row)[slot_idx] = inline_fp_null_value<double>();
481  }
482  break;
483  }
484  case kCHAR:
485  case kTEXT:
486  case kVARCHAR: {
487  CHECK(sqlite_col_type == SQLITE_TEXT || sqlite_col_type == SQLITE_NULL);
488  if (!connector.isNull(row_idx, col_idx)) {
489  const auto str = connector.getData<std::string>(row_idx, col_idx);
490  const auto owned_str =
491  output_spec.executor->getRowSetMemoryOwner()->addString(str);
492  row[slot_idx] = reinterpret_cast<int64_t>(owned_str->c_str());
493  row[++slot_idx] = str.size();
494  } else {
495  row[slot_idx] = 0;
496  row[++slot_idx] = 0;
497  }
498  break;
499  }
500  default: {
501  LOG(FATAL) << "Unexpected type: " << col_type.get_type_name();
502  break;
503  }
504  }
505  }
506  }
507  return rs;
508 }
509 
511 
512 std::unique_ptr<ResultSet> run_query_external(
513  const ExecutionUnitSql& sql,
514  const FetchResult& fetch_result,
515  const PlanState* plan_state,
516  const ExternalQueryOutputSpec& output_spec) {
517  ExternalQueryTable external_query_table{fetch_result,
518  create_table_schema(plan_state),
519  sql.from_table,
520  output_spec.executor};
521  SqliteMemDatabase db(external_query_table);
522  const auto create_table = "create virtual table " + sql.from_table + " using omnisci";
523  db.run(create_table);
524  return db.runSelect(sql.query, output_spec);
525 }
526 
528  return ti.is_integer() || ti.is_fp() || ti.is_string();
529 }
bool is_supported_type_for_extern_execution(const SQLTypeInfo &ti)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::pair< const char *, size_t > getStringBytes(int32_t string_id) const noexcept
int vt_connect(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
T getData(const int row, const int col)
const int8_t const int64_t * num_rows
ALWAYS_INLINE DEVICE int64_t * get_scan_output_slot(int64_t *output_buffer, const uint32_t output_buffer_entry_count, const uint32_t pos, const int64_t offset_in_fragment, const uint32_t row_size_quad)
int vt_filter(sqlite3_vtab_cursor *p_vtc, int idxNum, const char *idxStr, int argc, sqlite3_value **argv)
bool is_null
Definition: sqltypes.h:75
int vt_rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *p_rowid)
int vt_close(sqlite3_vtab_cursor *cur)
std::unordered_map< InputColDescriptor, size_t > global_to_local_col_ids_
Definition: PlanState.h:52
#define LOG(tag)
Definition: Logger.h:188
bool is_fp() const
Definition: sqltypes.h:419
void run(const std::string &sql)
DecodedString decode_string(const int8_t *column, const size_t cursor, StringDictionaryProxy *sdp)
std::string join(T const &container, std::string const &delim)
virtual void query(const std::string &queryString)
int64_t get_num_rows(OmniSciCursor *p_cur)
std::unique_ptr< ResultSet > run_query_external(const ExecutionUnitSql &sql, const FetchResult &fetch_result, const PlanState *plan_state, const ExternalQueryOutputSpec &output_spec)
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
const Executor * executor_
Definition: PlanState.h:57
int8_t * pointer
Definition: sqltypes.h:74
std::string serialize_column_ref(const int table_id, const int column_id, const Catalog_Namespace::Catalog *catalog)
const Executor * executor
std::unique_ptr< ResultSet > runSelect(const std::string &sql, const ExternalQueryOutputSpec &output_spec)
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
Definition: Execute.h:157
std::vector< TargetMetaInfo > create_table_schema(const PlanState *plan_state)
CHECK(cgen_state)
int vt_open(sqlite3_vtab *pVTab, sqlite3_vtab_cursor **pp_cursor)
bool is_integer() const
Definition: sqltypes.h:417
int vt_column(sqlite3_vtab_cursor *cur, sqlite3_context *ctx, int col_idx)
std::vector< int > columnTypes
std::vector< TargetInfo > target_infos
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
QueryMemoryDescriptor query_mem_desc
bool isNull(const int row, const int col) const
#define CHECK_LE(x, y)
Definition: Logger.h:208
constexpr float inline_fp_null_value< float >()
ExternalQueryTable external_query_table_
constexpr double inline_fp_null_value< double >()
std::string get_type_name() const
Definition: sqltypes.h:361
Definition: sqltypes.h:42
std::string from_table
static std::mutex session_mutex_
virtual size_t getNumCols() const
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
std::pair< int64_t, int64_t > inline_int_max_min(const size_t byte_width)
Definition: sqltypes.h:46
std::string query
bool is_string() const
Definition: sqltypes.h:415
int vt_create(sqlite3 *db, void *p_aux, int argc, const char *const *argv, sqlite3_vtab **pp_vt, char **pzErr)
SqliteMemDatabase(const ExternalQueryTable &external_query_table)
int vt_best_index(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo)
virtual size_t getNumRows() const
size_t length
Definition: sqltypes.h:73