OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParserNode.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ParserNode.h"
24 #include "Shared/base64.h"
25 
26 #include <boost/algorithm/string.hpp>
27 #include <boost/core/null_deleter.hpp>
28 #include <boost/filesystem.hpp>
29 #include <boost/function.hpp>
30 
31 #include <rapidjson/document.h>
32 #include <rapidjson/stringbuffer.h>
33 #include <rapidjson/writer.h>
34 
35 #include <cassert>
36 #include <cmath>
37 #include <limits>
38 #include <random>
39 #include <regex>
40 #include <stdexcept>
41 #include <string>
42 #include <type_traits>
43 #include <typeinfo>
44 
46 #include "Catalog/Catalog.h"
53 #include "Geospatial/Compression.h"
55 #include "Geospatial/Types.h"
57 #include "ImportExport/Importer.h"
58 #include "LockMgr/LockMgr.h"
62 #include "QueryEngine/Execute.h"
68 #include "ReservedKeywords.h"
69 #include "Shared/DbObjectKeys.h"
70 #include "Shared/StringTransform.h"
71 #include "Shared/SysDefinitions.h"
72 #include "Shared/measure.h"
73 #include "Shared/shard_key.h"
75 #include "Utils/FsiUtils.h"
76 
77 #include "gen-cpp/CalciteServer.h"
78 
79 size_t g_leaf_count{0};
81 extern bool g_enable_string_functions;
82 extern bool g_enable_fsi;
83 
85 #ifdef ENABLE_IMPORT_PARQUET
86 bool g_enable_legacy_parquet_import{false};
87 #endif
89 
91 
92 extern bool g_enable_ml_functions;
93 
95 using namespace std::string_literals;
96 
97 using TableDefFuncPtr = boost::function<void(TableDescriptor&,
99  const std::list<ColumnDescriptor>& columns)>;
100 
101 using DataframeDefFuncPtr =
102  boost::function<void(DataframeTableDescriptor&,
104  const std::list<ColumnDescriptor>& columns)>;
105 
106 namespace Parser {
107 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
108  // we call this function with unitary executor but is okay since
109  // we know the exact session info from a global session map object
110  // in the executor
113  executor->getSessionLock());
114  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
115  }
116  return false;
117 }
118 
119 std::vector<int> getTableChunkKey(const TableDescriptor* td,
120  Catalog_Namespace::Catalog& catalog) {
121  std::vector<int> table_chunk_key_prefix;
122  if (td) {
123  if (td->fragmenter) {
124  table_chunk_key_prefix = td->fragmenter->getFragmentsForQuery().chunkKeyPrefix;
125  } else {
126  table_chunk_key_prefix.push_back(catalog.getCurrentDB().dbId);
127  table_chunk_key_prefix.push_back(td->tableId);
128  }
129  }
130  return table_chunk_key_prefix;
131 }
132 
133 std::shared_ptr<Analyzer::Expr> NullLiteral::analyze(
134  const Catalog_Namespace::Catalog& catalog,
135  Analyzer::Query& query,
136  TlistRefType allow_tlist_ref) const {
137  return makeExpr<Analyzer::Constant>(kNULLT, true);
138 }
139 
140 std::shared_ptr<Analyzer::Expr> StringLiteral::analyze(
141  const Catalog_Namespace::Catalog& catalog,
142  Analyzer::Query& query,
143  TlistRefType allow_tlist_ref) const {
144  return analyzeValue(*stringval_, false);
145 }
146 
147 std::shared_ptr<Analyzer::Expr> StringLiteral::analyzeValue(const std::string& stringval,
148  const bool is_null) {
149  if (!is_null) {
150  const SQLTypeInfo ti(kVARCHAR, stringval.length(), 0, true);
151  Datum d;
152  d.stringval = new std::string(stringval);
153  return makeExpr<Analyzer::Constant>(ti, false, d);
154  }
155  // Null value
156  return makeExpr<Analyzer::Constant>(kVARCHAR, true);
157 }
158 
159 std::shared_ptr<Analyzer::Expr> IntLiteral::analyze(
160  const Catalog_Namespace::Catalog& catalog,
161  Analyzer::Query& query,
162  TlistRefType allow_tlist_ref) const {
163  return analyzeValue(intval_);
164 }
165 
166 std::shared_ptr<Analyzer::Expr> IntLiteral::analyzeValue(const int64_t intval) {
167  SQLTypes t;
168  Datum d;
169  if (intval >= INT16_MIN && intval <= INT16_MAX) {
170  t = kSMALLINT;
171  d.smallintval = (int16_t)intval;
172  } else if (intval >= INT32_MIN && intval <= INT32_MAX) {
173  t = kINT;
174  d.intval = (int32_t)intval;
175  } else {
176  t = kBIGINT;
177  d.bigintval = intval;
178  }
179  return makeExpr<Analyzer::Constant>(t, false, d);
180 }
181 
182 std::shared_ptr<Analyzer::Expr> FixedPtLiteral::analyze(
183  const Catalog_Namespace::Catalog& catalog,
184  Analyzer::Query& query,
185  TlistRefType allow_tlist_ref) const {
186  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
187  Datum d = StringToDatum(*fixedptval_, ti);
188  return makeExpr<Analyzer::Constant>(ti, false, d);
189 }
190 
191 std::shared_ptr<Analyzer::Expr> FixedPtLiteral::analyzeValue(const int64_t numericval,
192  const int scale,
193  const int precision) {
194  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
195  ti.set_scale(scale);
196  ti.set_precision(precision);
197  Datum d;
198  d.bigintval = numericval;
199  return makeExpr<Analyzer::Constant>(ti, false, d);
200 }
201 
202 std::shared_ptr<Analyzer::Expr> FloatLiteral::analyze(
203  const Catalog_Namespace::Catalog& catalog,
204  Analyzer::Query& query,
205  TlistRefType allow_tlist_ref) const {
206  Datum d;
207  d.floatval = floatval_;
208  return makeExpr<Analyzer::Constant>(kFLOAT, false, d);
209 }
210 
211 std::shared_ptr<Analyzer::Expr> DoubleLiteral::analyze(
212  const Catalog_Namespace::Catalog& catalog,
213  Analyzer::Query& query,
214  TlistRefType allow_tlist_ref) const {
215  Datum d;
216  d.doubleval = doubleval_;
217  return makeExpr<Analyzer::Constant>(kDOUBLE, false, d);
218 }
219 
220 std::shared_ptr<Analyzer::Expr> TimestampLiteral::analyze(
221  const Catalog_Namespace::Catalog& catalog,
222  Analyzer::Query& query,
223  TlistRefType allow_tlist_ref) const {
224  return get(timestampval_);
225 }
226 
227 std::shared_ptr<Analyzer::Expr> TimestampLiteral::get(const int64_t timestampval) {
228  Datum d;
229  d.bigintval = timestampval;
230  return makeExpr<Analyzer::Constant>(kTIMESTAMP, false, d);
231 }
232 
233 std::shared_ptr<Analyzer::Expr> UserLiteral::analyze(
234  const Catalog_Namespace::Catalog& catalog,
235  Analyzer::Query& query,
236  TlistRefType allow_tlist_ref) const {
237  Datum d;
238  return makeExpr<Analyzer::Constant>(kTEXT, false, d);
239 }
240 
241 std::shared_ptr<Analyzer::Expr> UserLiteral::get(const std::string& user) {
242  Datum d;
243  d.stringval = new std::string(user);
244  return makeExpr<Analyzer::Constant>(kTEXT, false, d);
245 }
246 
247 std::shared_ptr<Analyzer::Expr> ArrayLiteral::analyze(
248  const Catalog_Namespace::Catalog& catalog,
249  Analyzer::Query& query,
250  TlistRefType allow_tlist_ref) const {
251  SQLTypeInfo ti = SQLTypeInfo(kARRAY, true);
252  bool set_subtype = true;
253  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
254  for (auto& p : value_list_) {
255  auto e = p->analyze(catalog, query, allow_tlist_ref);
256  CHECK(e);
257  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
258  if (c != nullptr && c->get_is_null()) {
259  value_exprs.push_back(c);
260  continue;
261  }
262  auto subtype = e->get_type_info().get_type();
263  if (subtype == kNULLT) {
264  // NULL element
265  } else if (set_subtype) {
266  ti.set_subtype(subtype);
267  set_subtype = false;
268  }
269  value_exprs.push_back(e);
270  }
271  std::shared_ptr<Analyzer::Expr> result =
272  makeExpr<Analyzer::Constant>(ti, false, value_exprs);
273  return result;
274 }
275 
276 std::string ArrayLiteral::to_string() const {
277  std::string str = "{";
278  bool notfirst = false;
279  for (auto& p : value_list_) {
280  if (notfirst) {
281  str += ", ";
282  } else {
283  notfirst = true;
284  }
285  str += p->to_string();
286  }
287  str += "}";
288  return str;
289 }
290 
291 std::shared_ptr<Analyzer::Expr> OperExpr::analyze(
292  const Catalog_Namespace::Catalog& catalog,
293  Analyzer::Query& query,
294  TlistRefType allow_tlist_ref) const {
295  auto left_expr = left_->analyze(catalog, query, allow_tlist_ref);
296  const auto& left_type = left_expr->get_type_info();
297  if (right_ == nullptr) {
298  return makeExpr<Analyzer::UOper>(
299  left_type, left_expr->get_contains_agg(), optype_, left_expr->decompress());
300  }
301  if (optype_ == kARRAY_AT) {
302  if (left_type.get_type() != kARRAY) {
303  throw std::runtime_error(left_->to_string() + " is not of array type.");
304  }
305  auto right_expr = right_->analyze(catalog, query, allow_tlist_ref);
306  const auto& right_type = right_expr->get_type_info();
307  if (!right_type.is_integer()) {
308  throw std::runtime_error(right_->to_string() + " is not of integer type.");
309  }
310  return makeExpr<Analyzer::BinOper>(
311  left_type.get_elem_type(), false, kARRAY_AT, kONE, left_expr, right_expr);
312  }
313  auto right_expr = right_->analyze(catalog, query, allow_tlist_ref);
314  return normalize(optype_, opqualifier_, left_expr, right_expr);
315 }
316 
317 bool should_translate_strings(const std::shared_ptr<Analyzer::Expr>& lhs_expr,
318  const std::shared_ptr<Analyzer::Expr>& rhs_expr) {
319  if (dynamic_cast<Analyzer::Constant*>(rhs_expr.get())) {
320  // we must translate rhs string literal against lhs str dictionary
321  return true;
322  }
323  std::set<int> lhs_rte_idx;
324  lhs_expr->collect_rte_idx(lhs_rte_idx);
325  CHECK(!lhs_rte_idx.empty());
326  std::set<int> rhs_rte_idx;
327  rhs_expr->collect_rte_idx(rhs_rte_idx);
328  CHECK(!rhs_rte_idx.empty());
329  return lhs_rte_idx.size() == 1UL && lhs_rte_idx == rhs_rte_idx;
330 }
331 
332 SQLTypeInfo const& get_str_dict_cast_type(const SQLTypeInfo& lhs_type_info,
333  const SQLTypeInfo& rhs_type_info,
334  const Executor* executor) {
335  CHECK(lhs_type_info.is_string());
336  CHECK(lhs_type_info.get_compression() == kENCODING_DICT);
337  CHECK(rhs_type_info.is_string());
338  CHECK(rhs_type_info.get_compression() == kENCODING_DICT);
339  const auto& lhs_dict_key = lhs_type_info.getStringDictKey();
340  const auto& rhs_dict_key = rhs_type_info.getStringDictKey();
341  CHECK_NE(lhs_dict_key, rhs_dict_key);
342  if (lhs_dict_key.isTransientDict()) {
343  return rhs_type_info;
344  }
345  if (rhs_dict_key.isTransientDict()) {
346  return lhs_type_info;
347  }
348  // If here then neither lhs or rhs type was transient, we should see which
349  // type has the largest dictionary and make that the destination type
350  const auto lhs_sdp = executor->getStringDictionaryProxy(lhs_dict_key, true);
351  const auto rhs_sdp = executor->getStringDictionaryProxy(rhs_dict_key, true);
352  return lhs_sdp->entryCount() >= rhs_sdp->entryCount() ? lhs_type_info : rhs_type_info;
353 }
354 
356  const SQLTypeInfo& rhs_type_info,
357  const Executor* executor) {
358  CHECK(lhs_type_info.is_string());
359  CHECK(rhs_type_info.is_string());
360  if (lhs_type_info.is_dict_encoded_string() && rhs_type_info.is_dict_encoded_string()) {
361  const auto& lhs_dict_key = lhs_type_info.getStringDictKey();
362  const auto& rhs_dict_key = rhs_type_info.getStringDictKey();
363  if (lhs_dict_key == rhs_dict_key ||
364  (lhs_dict_key.db_id == rhs_dict_key.db_id &&
365  lhs_dict_key.dict_id == TRANSIENT_DICT(rhs_dict_key.dict_id))) {
366  return lhs_dict_key.dict_id <= rhs_dict_key.dict_id ? lhs_type_info : rhs_type_info;
367  }
368  return get_str_dict_cast_type(lhs_type_info, rhs_type_info, executor);
369  }
370  CHECK(lhs_type_info.is_none_encoded_string() || rhs_type_info.is_none_encoded_string());
371  SQLTypeInfo ret_ti =
372  rhs_type_info.is_none_encoded_string() ? lhs_type_info : rhs_type_info;
373  if (ret_ti.is_none_encoded_string()) {
374  ret_ti.set_dimension(
375  std::max(lhs_type_info.get_dimension(), rhs_type_info.get_dimension()));
376  }
377  return ret_ti;
378 }
379 
380 std::shared_ptr<Analyzer::Expr> OperExpr::normalize(
381  const SQLOps optype,
382  const SQLQualifier qual,
383  std::shared_ptr<Analyzer::Expr> left_expr,
384  std::shared_ptr<Analyzer::Expr> right_expr,
385  const Executor* executor) {
386  if (left_expr->get_type_info().is_date_in_days() ||
387  right_expr->get_type_info().is_date_in_days()) {
388  // Do not propogate encoding
389  left_expr = left_expr->decompress();
390  right_expr = right_expr->decompress();
391  }
392  const auto& left_type = left_expr->get_type_info();
393  auto right_type = right_expr->get_type_info();
394  if (qual != kONE) {
395  // subquery not supported yet.
396  CHECK(!std::dynamic_pointer_cast<Analyzer::Subquery>(right_expr));
397  if (right_type.get_type() != kARRAY) {
398  throw std::runtime_error(
399  "Existential or universal qualifiers can only be used in front of a subquery "
400  "or an "
401  "expression of array type.");
402  }
403  right_type = right_type.get_elem_type();
404  }
405  SQLTypeInfo new_left_type;
406  SQLTypeInfo new_right_type;
407  auto result_type = Analyzer::BinOper::analyze_type_info(
408  optype, left_type, right_type, &new_left_type, &new_right_type);
409  if (result_type.is_timeinterval()) {
410  return makeExpr<Analyzer::BinOper>(
411  result_type, false, optype, qual, left_expr, right_expr);
412  }
413  if (left_type != new_left_type) {
414  left_expr = left_expr->add_cast(new_left_type);
415  }
416  if (right_type != new_right_type) {
417  if (qual == kONE) {
418  right_expr = right_expr->add_cast(new_right_type);
419  } else {
420  right_expr = right_expr->add_cast(new_right_type.get_array_type());
421  }
422  }
423 
424  if (IS_COMPARISON(optype)) {
425  if (optype != kBBOX_INTERSECT && new_left_type.is_geometry() &&
426  new_right_type.is_geometry()) {
427  throw std::runtime_error(
428  "Comparison operators are not yet supported for geospatial types.");
429  }
430 
431  if (new_left_type.get_compression() == kENCODING_DICT &&
432  new_right_type.get_compression() == kENCODING_DICT) {
433  if (new_left_type.getStringDictKey() != new_right_type.getStringDictKey()) {
434  if (optype == kEQ || optype == kNE) {
435  // Join framework does its own string dictionary translation
436  // (at least partly since the rhs table projection does not use
437  // the normal runtime execution framework), so if we detect
438  // that the rte idxs of the two tables are different, bail
439  // on translating
440  if (should_translate_strings(left_expr, right_expr)) {
441  CHECK(executor);
442  // Make the type we're casting to the transient dictionary, if it exists,
443  // otherwise the largest dictionary in terms of number of entries
444  SQLTypeInfo ti(
445  get_str_dict_cast_type(new_left_type, new_right_type, executor));
446  auto& expr_to_cast = ti == new_left_type ? right_expr : left_expr;
447  ti.set_fixed_size();
449  expr_to_cast = expr_to_cast->add_cast(ti);
450  } else { // Ordered comparison operator
451  // We do not currently support ordered (i.e. >, <=) comparisons between
452  // dictionary-encoded columns, and need to decompress when translation
453  // is turned off even for kEQ and KNE
454  left_expr = left_expr->decompress();
455  right_expr = right_expr->decompress();
456  }
457  } else { // Ordered comparison operator
458  // We do not currently support ordered (i.e. >, <=) comparisons between
459  // dictionary-encoded columns, and need to decompress when translation
460  // is turned off even for kEQ and KNE
461  left_expr = left_expr->decompress();
462  right_expr = right_expr->decompress();
463  }
464  } else { // Strings shared comp param
465  if (!(optype == kEQ || optype == kNE)) {
466  // We do not currently support ordered (i.e. >, <=) comparisons between
467  // encoded columns, so try to decode (will only succeed with watchdog off)
468  left_expr = left_expr->decompress();
469  right_expr = right_expr->decompress();
470  } else {
471  // do nothing, can directly support equals/non-equals comparisons between two
472  // dictionary encoded columns sharing the same dictionary as these are
473  // effectively integer comparisons in the same dictionary space
474  }
475  }
476  } else if (new_left_type.get_compression() == kENCODING_DICT &&
477  new_right_type.get_compression() == kENCODING_NONE) {
478  SQLTypeInfo ti(new_right_type);
479  ti.set_compression(new_left_type.get_compression());
480  ti.set_comp_param(new_left_type.get_comp_param());
481  ti.setStringDictKey(new_left_type.getStringDictKey());
482  ti.set_fixed_size();
483  right_expr = right_expr->add_cast(ti);
484  } else if (new_right_type.get_compression() == kENCODING_DICT &&
485  new_left_type.get_compression() == kENCODING_NONE) {
486  SQLTypeInfo ti(new_left_type);
487  ti.set_compression(new_right_type.get_compression());
488  ti.set_comp_param(new_right_type.get_comp_param());
489  ti.setStringDictKey(new_right_type.getStringDictKey());
490  ti.set_fixed_size();
491  left_expr = left_expr->add_cast(ti);
492  } else {
493  left_expr = left_expr->decompress();
494  right_expr = right_expr->decompress();
495  }
496  } else {
497  // Is this now a no-op just for pairs of none-encoded string columns
498  left_expr = left_expr->decompress();
499  right_expr = right_expr->decompress();
500  }
501  bool has_agg = (left_expr->get_contains_agg() || right_expr->get_contains_agg());
502  return makeExpr<Analyzer::BinOper>(
503  result_type, has_agg, optype, qual, left_expr, right_expr);
504 }
505 
506 std::shared_ptr<Analyzer::Expr> SubqueryExpr::analyze(
507  const Catalog_Namespace::Catalog& catalog,
508  Analyzer::Query& query,
509  TlistRefType allow_tlist_ref) const {
510  throw std::runtime_error("Subqueries are not supported yet.");
511  return nullptr;
512 }
513 
514 std::shared_ptr<Analyzer::Expr> IsNullExpr::analyze(
515  const Catalog_Namespace::Catalog& catalog,
516  Analyzer::Query& query,
517  TlistRefType allow_tlist_ref) const {
518  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
519  auto result = makeExpr<Analyzer::UOper>(kBOOLEAN, kISNULL, arg_expr);
520  if (is_not_) {
521  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
522  }
523  return result;
524 }
525 
526 std::shared_ptr<Analyzer::Expr> InSubquery::analyze(
527  const Catalog_Namespace::Catalog& catalog,
528  Analyzer::Query& query,
529  TlistRefType allow_tlist_ref) const {
530  throw std::runtime_error("Subqueries are not supported yet.");
531  return nullptr;
532 }
533 
534 std::shared_ptr<Analyzer::Expr> InValues::analyze(
535  const Catalog_Namespace::Catalog& catalog,
536  Analyzer::Query& query,
537  TlistRefType allow_tlist_ref) const {
538  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
539  SQLTypeInfo ti = arg_expr->get_type_info();
540  bool dict_comp = ti.get_compression() == kENCODING_DICT;
541  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
542  for (auto& p : value_list_) {
543  auto e = p->analyze(catalog, query, allow_tlist_ref);
544  if (ti != e->get_type_info()) {
545  if (ti.is_string() && e->get_type_info().is_string()) {
546  // Todo(todd): Can we have this leverage the cast framework as well
547  ti = Analyzer::BinOper::common_string_type(ti, e->get_type_info());
548  } else if (ti.is_number() && e->get_type_info().is_number()) {
549  ti = Analyzer::BinOper::common_numeric_type(ti, e->get_type_info());
550  } else {
551  throw std::runtime_error("IN expressions must contain compatible types.");
552  }
553  }
554  if (dict_comp) {
555  value_exprs.push_back(e->add_cast(arg_expr->get_type_info()));
556  } else {
557  value_exprs.push_back(e);
558  }
559  }
560  if (!dict_comp) {
561  arg_expr = arg_expr->decompress();
562  arg_expr = arg_expr->add_cast(ti);
563  std::list<std::shared_ptr<Analyzer::Expr>> cast_vals;
564  for (auto p : value_exprs) {
565  cast_vals.push_back(p->add_cast(ti));
566  }
567  value_exprs.swap(cast_vals);
568  }
569  std::shared_ptr<Analyzer::Expr> result =
570  makeExpr<Analyzer::InValues>(arg_expr, value_exprs);
571  if (is_not_) {
572  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
573  }
574  return result;
575 }
576 
577 std::shared_ptr<Analyzer::Expr> BetweenExpr::analyze(
578  const Catalog_Namespace::Catalog& catalog,
579  Analyzer::Query& query,
580  TlistRefType allow_tlist_ref) const {
581  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
582  auto lower_expr = lower_->analyze(catalog, query, allow_tlist_ref);
583  auto upper_expr = upper_->analyze(catalog, query, allow_tlist_ref);
584  SQLTypeInfo new_left_type, new_right_type;
586  arg_expr->get_type_info(),
587  lower_expr->get_type_info(),
588  &new_left_type,
589  &new_right_type);
590  auto lower_pred =
591  makeExpr<Analyzer::BinOper>(kBOOLEAN,
592  kGE,
593  kONE,
594  arg_expr->add_cast(new_left_type)->decompress(),
595  lower_expr->add_cast(new_right_type)->decompress());
597  arg_expr->get_type_info(),
598  lower_expr->get_type_info(),
599  &new_left_type,
600  &new_right_type);
601  auto upper_pred = makeExpr<Analyzer::BinOper>(
602  kBOOLEAN,
603  kLE,
604  kONE,
605  arg_expr->deep_copy()->add_cast(new_left_type)->decompress(),
606  upper_expr->add_cast(new_right_type)->decompress());
607  std::shared_ptr<Analyzer::Expr> result =
608  makeExpr<Analyzer::BinOper>(kBOOLEAN, kAND, kONE, lower_pred, upper_pred);
609  if (is_not_) {
610  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
611  }
612  return result;
613 }
614 
615 std::shared_ptr<Analyzer::Expr> CharLengthExpr::analyze(
616  const Catalog_Namespace::Catalog& catalog,
617  Analyzer::Query& query,
618  TlistRefType allow_tlist_ref) const {
619  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
620  if (!arg_expr->get_type_info().is_string()) {
621  throw std::runtime_error(
622  "expression in char_length clause must be of a string type.");
623  }
624  std::shared_ptr<Analyzer::Expr> result =
625  makeExpr<Analyzer::CharLengthExpr>(arg_expr->decompress(), calc_encoded_length_);
626  return result;
627 }
628 
629 std::shared_ptr<Analyzer::Expr> CardinalityExpr::analyze(
630  const Catalog_Namespace::Catalog& catalog,
631  Analyzer::Query& query,
632  TlistRefType allow_tlist_ref) const {
633  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
634  if (!arg_expr->get_type_info().is_array()) {
635  throw std::runtime_error(
636  "expression in cardinality clause must be of an array type.");
637  }
638  std::shared_ptr<Analyzer::Expr> result =
639  makeExpr<Analyzer::CardinalityExpr>(arg_expr->decompress());
640  return result;
641 }
642 
643 void LikeExpr::check_like_expr(const std::string& like_str, char escape_char) {
644  if (like_str.back() == escape_char) {
645  throw std::runtime_error("LIKE pattern must not end with escape character.");
646  }
647 }
648 
649 bool LikeExpr::test_is_simple_expr(const std::string& like_str, char escape_char) {
650  // if not bounded by '%' then not a simple string
651  if (like_str.size() < 2 || like_str[0] != '%' || like_str[like_str.size() - 1] != '%') {
652  return false;
653  }
654  // if the last '%' is escaped then not a simple string
655  if (like_str[like_str.size() - 2] == escape_char &&
656  like_str[like_str.size() - 3] != escape_char) {
657  return false;
658  }
659  for (size_t i = 1; i < like_str.size() - 1; i++) {
660  if (like_str[i] == '%' || like_str[i] == '_' || like_str[i] == '[' ||
661  like_str[i] == ']') {
662  if (like_str[i - 1] != escape_char) {
663  return false;
664  }
665  }
666  }
667  return true;
668 }
669 
670 void LikeExpr::erase_cntl_chars(std::string& like_str, char escape_char) {
671  char prev_char = '\0';
672  // easier to create new string of allowable chars
673  // rather than erase chars from
674  // existing string
675  std::string new_str;
676  for (char& cur_char : like_str) {
677  if (cur_char == '%' || cur_char == escape_char) {
678  if (prev_char != escape_char) {
679  prev_char = cur_char;
680  continue;
681  }
682  }
683  new_str.push_back(cur_char);
684  prev_char = cur_char;
685  }
686  like_str = new_str;
687 }
688 
689 std::shared_ptr<Analyzer::Expr> LikeExpr::analyze(
690  const Catalog_Namespace::Catalog& catalog,
691  Analyzer::Query& query,
692  TlistRefType allow_tlist_ref) const {
693  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
694  auto like_expr = like_string_->analyze(catalog, query, allow_tlist_ref);
695  auto escape_expr = escape_string_ == nullptr
696  ? nullptr
697  : escape_string_->analyze(catalog, query, allow_tlist_ref);
698  return LikeExpr::get(arg_expr, like_expr, escape_expr, is_ilike_, is_not_);
699 }
700 
701 std::shared_ptr<Analyzer::Expr> LikeExpr::get(std::shared_ptr<Analyzer::Expr> arg_expr,
702  std::shared_ptr<Analyzer::Expr> like_expr,
703  std::shared_ptr<Analyzer::Expr> escape_expr,
704  const bool is_ilike,
705  const bool is_not) {
706  if (!arg_expr->get_type_info().is_string()) {
707  throw std::runtime_error("expression before LIKE must be of a string type.");
708  }
709  if (!like_expr->get_type_info().is_string()) {
710  throw std::runtime_error("expression after LIKE must be of a string type.");
711  }
712  char escape_char = '\\';
713  if (escape_expr != nullptr) {
714  if (!escape_expr->get_type_info().is_string()) {
715  throw std::runtime_error("expression after ESCAPE must be of a string type.");
716  }
717  if (!escape_expr->get_type_info().is_string()) {
718  throw std::runtime_error("expression after ESCAPE must be of a string type.");
719  }
720  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(escape_expr);
721  if (c != nullptr && c->get_constval().stringval->length() > 1) {
722  throw std::runtime_error("String after ESCAPE must have a single character.");
723  }
724  escape_char = (*c->get_constval().stringval)[0];
725  }
726  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(like_expr);
727  bool is_simple = false;
728  if (c != nullptr) {
729  std::string& pattern = *c->get_constval().stringval;
730  if (is_ilike) {
731  std::transform(pattern.begin(), pattern.end(), pattern.begin(), ::tolower);
732  }
733  check_like_expr(pattern, escape_char);
734  is_simple = test_is_simple_expr(pattern, escape_char);
735  if (is_simple) {
736  erase_cntl_chars(pattern, escape_char);
737  }
738  }
739  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::LikeExpr>(
740  arg_expr->decompress(), like_expr, escape_expr, is_ilike, is_simple);
741  if (is_not) {
742  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
743  }
744  return result;
745 }
746 
747 void RegexpExpr::check_pattern_expr(const std::string& pattern_str, char escape_char) {
748  if (pattern_str.back() == escape_char) {
749  throw std::runtime_error("REGEXP pattern must not end with escape character.");
750  }
751 }
752 
753 bool RegexpExpr::translate_to_like_pattern(std::string& pattern_str, char escape_char) {
754  char prev_char = '\0';
755  char prev_prev_char = '\0';
756  std::string like_str;
757  for (char& cur_char : pattern_str) {
758  if (prev_char == escape_char || isalnum(cur_char) || cur_char == ' ' ||
759  cur_char == '.') {
760  like_str.push_back((cur_char == '.') ? '_' : cur_char);
761  prev_prev_char = prev_char;
762  prev_char = cur_char;
763  continue;
764  }
765  if (prev_char == '.' && prev_prev_char != escape_char) {
766  if (cur_char == '*' || cur_char == '+') {
767  if (cur_char == '*') {
768  like_str.pop_back();
769  }
770  // .* --> %
771  // .+ --> _%
772  like_str.push_back('%');
773  prev_prev_char = prev_char;
774  prev_char = cur_char;
775  continue;
776  }
777  }
778  return false;
779  }
780  pattern_str = like_str;
781  return true;
782 }
783 
784 std::shared_ptr<Analyzer::Expr> RegexpExpr::analyze(
785  const Catalog_Namespace::Catalog& catalog,
786  Analyzer::Query& query,
787  TlistRefType allow_tlist_ref) const {
788  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
789  auto pattern_expr = pattern_string_->analyze(catalog, query, allow_tlist_ref);
790  auto escape_expr = escape_string_ == nullptr
791  ? nullptr
792  : escape_string_->analyze(catalog, query, allow_tlist_ref);
793  return RegexpExpr::get(arg_expr, pattern_expr, escape_expr, is_not_);
794 }
795 
796 std::shared_ptr<Analyzer::Expr> RegexpExpr::get(
797  std::shared_ptr<Analyzer::Expr> arg_expr,
798  std::shared_ptr<Analyzer::Expr> pattern_expr,
799  std::shared_ptr<Analyzer::Expr> escape_expr,
800  const bool is_not) {
801  if (!arg_expr->get_type_info().is_string()) {
802  throw std::runtime_error("expression before REGEXP must be of a string type.");
803  }
804  if (!pattern_expr->get_type_info().is_string()) {
805  throw std::runtime_error("expression after REGEXP must be of a string type.");
806  }
807  char escape_char = '\\';
808  if (escape_expr != nullptr) {
809  if (!escape_expr->get_type_info().is_string()) {
810  throw std::runtime_error("expression after ESCAPE must be of a string type.");
811  }
812  if (!escape_expr->get_type_info().is_string()) {
813  throw std::runtime_error("expression after ESCAPE must be of a string type.");
814  }
815  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(escape_expr);
816  if (c != nullptr && c->get_constval().stringval->length() > 1) {
817  throw std::runtime_error("String after ESCAPE must have a single character.");
818  }
819  escape_char = (*c->get_constval().stringval)[0];
820  if (escape_char != '\\') {
821  throw std::runtime_error("Only supporting '\\' escape character.");
822  }
823  }
824  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(pattern_expr);
825  if (c != nullptr) {
826  std::string& pattern = *c->get_constval().stringval;
827  if (translate_to_like_pattern(pattern, escape_char)) {
828  return LikeExpr::get(arg_expr, pattern_expr, escape_expr, false, is_not);
829  }
830  }
831  std::shared_ptr<Analyzer::Expr> result =
832  makeExpr<Analyzer::RegexpExpr>(arg_expr->decompress(), pattern_expr, escape_expr);
833  if (is_not) {
834  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
835  }
836  return result;
837 }
838 
839 std::shared_ptr<Analyzer::Expr> LikelihoodExpr::analyze(
840  const Catalog_Namespace::Catalog& catalog,
841  Analyzer::Query& query,
842  TlistRefType allow_tlist_ref) const {
843  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
844  return LikelihoodExpr::get(arg_expr, likelihood_, is_not_);
845 }
846 
847 std::shared_ptr<Analyzer::Expr> LikelihoodExpr::get(
848  std::shared_ptr<Analyzer::Expr> arg_expr,
849  float likelihood,
850  const bool is_not) {
851  if (!arg_expr->get_type_info().is_boolean()) {
852  throw std::runtime_error("likelihood expression expects boolean type.");
853  }
854  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::LikelihoodExpr>(
855  arg_expr->decompress(), is_not ? 1 - likelihood : likelihood);
856  return result;
857 }
858 
859 std::shared_ptr<Analyzer::Expr> WidthBucketExpr::analyze(
860  const Catalog_Namespace::Catalog& catalog,
861  Analyzer::Query& query,
862  TlistRefType allow_tlist_ref) const {
863  auto target_value = target_value_->analyze(catalog, query, allow_tlist_ref);
864  auto lower_bound = lower_bound_->analyze(catalog, query, allow_tlist_ref);
865  auto upper_bound = upper_bound_->analyze(catalog, query, allow_tlist_ref);
866  auto partition_count = partition_count_->analyze(catalog, query, allow_tlist_ref);
867  return WidthBucketExpr::get(target_value, lower_bound, upper_bound, partition_count);
868 }
869 
870 std::shared_ptr<Analyzer::Expr> WidthBucketExpr::get(
871  std::shared_ptr<Analyzer::Expr> target_value,
872  std::shared_ptr<Analyzer::Expr> lower_bound,
873  std::shared_ptr<Analyzer::Expr> upper_bound,
874  std::shared_ptr<Analyzer::Expr> partition_count) {
875  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::WidthBucketExpr>(
876  target_value, lower_bound, upper_bound, partition_count);
877  return result;
878 }
879 
880 std::shared_ptr<Analyzer::Expr> ExistsExpr::analyze(
881  const Catalog_Namespace::Catalog& catalog,
882  Analyzer::Query& query,
883  TlistRefType allow_tlist_ref) const {
884  throw std::runtime_error("Subqueries are not supported yet.");
885  return nullptr;
886 }
887 
888 std::shared_ptr<Analyzer::Expr> ColumnRef::analyze(
889  const Catalog_Namespace::Catalog& catalog,
890  Analyzer::Query& query,
891  TlistRefType allow_tlist_ref) const {
892  int table_id{0};
893  int rte_idx{0};
894  const ColumnDescriptor* cd{nullptr};
895  if (column_ == nullptr) {
896  throw std::runtime_error("invalid column name *.");
897  }
898  if (table_ != nullptr) {
899  rte_idx = query.get_rte_idx(*table_);
900  if (rte_idx < 0) {
901  throw std::runtime_error("range variable or table name " + *table_ +
902  " does not exist.");
903  }
904  Analyzer::RangeTableEntry* rte = query.get_rte(rte_idx);
905  cd = rte->get_column_desc(catalog, *column_);
906  if (cd == nullptr) {
907  throw std::runtime_error("Column name " + *column_ + " does not exist.");
908  }
909  table_id = rte->get_table_id();
910  } else {
911  bool found = false;
912  int i = 0;
913  for (auto rte : query.get_rangetable()) {
914  cd = rte->get_column_desc(catalog, *column_);
915  if (cd != nullptr && !found) {
916  found = true;
917  rte_idx = i;
918  table_id = rte->get_table_id();
919  } else if (cd != nullptr && found) {
920  throw std::runtime_error("Column name " + *column_ + " is ambiguous.");
921  }
922  i++;
923  }
924  if (cd == nullptr && allow_tlist_ref != TlistRefType::TLIST_NONE) {
925  // check if this is a reference to a targetlist entry
926  bool found = false;
927  int varno = -1;
928  int i = 1;
929  std::shared_ptr<Analyzer::TargetEntry> tle;
930  for (auto p : query.get_targetlist()) {
931  if (*column_ == p->get_resname() && !found) {
932  found = true;
933  varno = i;
934  tle = p;
935  } else if (*column_ == p->get_resname() && found) {
936  throw std::runtime_error("Output alias " + *column_ + " is ambiguous.");
937  }
938  i++;
939  }
940  if (found) {
941  if (dynamic_cast<Analyzer::Var*>(tle->get_expr())) {
942  Analyzer::Var* v = static_cast<Analyzer::Var*>(tle->get_expr());
944  return v->deep_copy();
945  }
946  }
947  if (allow_tlist_ref == TlistRefType::TLIST_COPY) {
948  return tle->get_expr()->deep_copy();
949  } else {
950  return makeExpr<Analyzer::Var>(
951  tle->get_expr()->get_type_info(), Analyzer::Var::kOUTPUT, varno);
952  }
953  }
954  }
955  if (cd == nullptr) {
956  throw std::runtime_error("Column name " + *column_ + " does not exist.");
957  }
958  }
959  return makeExpr<Analyzer::ColumnVar>(
960  cd->columnType,
961  shared::ColumnKey{catalog.getDatabaseId(), table_id, cd->columnId},
962  rte_idx);
963 }
964 
965 std::shared_ptr<Analyzer::Expr> FunctionRef::analyze(
966  const Catalog_Namespace::Catalog& catalog,
967  Analyzer::Query& query,
968  TlistRefType allow_tlist_ref) const {
969  SQLTypeInfo result_type;
970  SQLAgg agg_type;
971  std::shared_ptr<Analyzer::Expr> arg_expr;
972  bool is_distinct = false;
973  if (boost::iequals(*name_, "count")) {
974  result_type = SQLTypeInfo(kBIGINT, false);
975  agg_type = kCOUNT;
976  if (arg_) {
977  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
978  const SQLTypeInfo& ti = arg_expr->get_type_info();
979  if (ti.is_string() && (ti.get_compression() != kENCODING_DICT || !distinct_)) {
980  throw std::runtime_error(
981  "Strings must be dictionary-encoded in COUNT(DISTINCT).");
982  }
983  if (ti.get_type() == kARRAY && !distinct_) {
984  throw std::runtime_error("Only COUNT(DISTINCT) is supported on arrays.");
985  }
986  }
987  is_distinct = distinct_;
988  } else {
989  if (!arg_) {
990  throw std::runtime_error("Cannot compute " + *name_ + " with argument '*'.");
991  }
992  if (boost::iequals(*name_, "min")) {
993  agg_type = kMIN;
994  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
995  arg_expr = arg_expr->decompress();
996  result_type = arg_expr->get_type_info();
997  } else if (boost::iequals(*name_, "max")) {
998  agg_type = kMAX;
999  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1000  arg_expr = arg_expr->decompress();
1001  result_type = arg_expr->get_type_info();
1002  } else if (boost::iequals(*name_, "avg")) {
1003  agg_type = kAVG;
1004  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1005  if (!arg_expr->get_type_info().is_number()) {
1006  throw std::runtime_error("Cannot compute AVG on non-number-type arguments.");
1007  }
1008  arg_expr = arg_expr->decompress();
1009  result_type = SQLTypeInfo(kDOUBLE, false);
1010  } else if (boost::iequals(*name_, "sum")) {
1011  agg_type = kSUM;
1012  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1013  if (!arg_expr->get_type_info().is_number()) {
1014  throw std::runtime_error("Cannot compute SUM on non-number-type arguments.");
1015  }
1016  arg_expr = arg_expr->decompress();
1017  result_type = arg_expr->get_type_info().is_integer() ? SQLTypeInfo(kBIGINT, false)
1018  : arg_expr->get_type_info();
1019  } else if (boost::iequals(*name_, "unnest")) {
1020  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1021  const SQLTypeInfo& arg_ti = arg_expr->get_type_info();
1022  if (arg_ti.get_type() != kARRAY) {
1023  throw std::runtime_error(arg_->to_string() + " is not of array type.");
1024  }
1025  return makeExpr<Analyzer::UOper>(arg_ti.get_elem_type(), false, kUNNEST, arg_expr);
1026  } else {
1027  throw std::runtime_error("invalid function name: " + *name_);
1028  }
1029  if (arg_expr->get_type_info().is_string() ||
1030  arg_expr->get_type_info().get_type() == kARRAY) {
1031  throw std::runtime_error(
1032  "Only COUNT(DISTINCT ) aggregate is supported on strings and arrays.");
1033  }
1034  }
1035  int naggs = query.get_num_aggs();
1036  query.set_num_aggs(naggs + 1);
1037  return makeExpr<Analyzer::AggExpr>(
1038  result_type, agg_type, arg_expr, is_distinct, nullptr);
1039 }
1040 
1041 std::shared_ptr<Analyzer::Expr> CastExpr::analyze(
1042  const Catalog_Namespace::Catalog& catalog,
1043  Analyzer::Query& query,
1044  TlistRefType allow_tlist_ref) const {
1045  target_type_->check_type();
1046  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1047  SQLTypeInfo ti(target_type_->get_type(),
1048  target_type_->get_param1(),
1049  target_type_->get_param2(),
1050  arg_expr->get_type_info().get_notnull());
1051  if (arg_expr->get_type_info().get_type() != target_type_->get_type() &&
1052  arg_expr->get_type_info().get_compression() != kENCODING_NONE) {
1053  arg_expr->decompress();
1054  }
1055  return arg_expr->add_cast(ti);
1056 }
1057 
1058 std::shared_ptr<Analyzer::Expr> CaseExpr::analyze(
1059  const Catalog_Namespace::Catalog& catalog,
1060  Analyzer::Query& query,
1061  TlistRefType allow_tlist_ref) const {
1062  SQLTypeInfo ti;
1063  std::list<std::pair<std::shared_ptr<Analyzer::Expr>, std::shared_ptr<Analyzer::Expr>>>
1064  expr_pair_list;
1065  for (auto& p : when_then_list_) {
1066  auto e1 = p->get_expr1()->analyze(catalog, query, allow_tlist_ref);
1067  if (e1->get_type_info().get_type() != kBOOLEAN) {
1068  throw std::runtime_error("Only boolean expressions can be used after WHEN.");
1069  }
1070  auto e2 = p->get_expr2()->analyze(catalog, query, allow_tlist_ref);
1071  expr_pair_list.emplace_back(e1, e2);
1072  }
1073  auto else_e =
1074  else_expr_ ? else_expr_->analyze(catalog, query, allow_tlist_ref) : nullptr;
1075  return normalize(expr_pair_list, else_e);
1076 }
1077 
1078 namespace {
1079 
1081  const std::string* s = str_literal->get_stringval();
1082  if (*s == "t" || *s == "true" || *s == "T" || *s == "True") {
1083  return true;
1084  } else if (*s == "f" || *s == "false" || *s == "F" || *s == "False") {
1085  return false;
1086  } else {
1087  throw std::runtime_error("Invalid string for boolean " + *s);
1088  }
1089 }
1090 
1091 void parse_copy_params(const std::list<std::unique_ptr<NameValueAssign>>& options_,
1092  import_export::CopyParams& copy_params,
1093  std::vector<std::string>& warnings,
1094  std::string& deferred_copy_from_partitions_) {
1095  if (!options_.empty()) {
1096  for (auto& p : options_) {
1097  if (boost::iequals(*p->get_name(), "max_reject")) {
1098  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1099  if (int_literal == nullptr) {
1100  throw std::runtime_error("max_reject option must be an integer.");
1101  }
1102  copy_params.max_reject = int_literal->get_intval();
1103  } else if (boost::iequals(*p->get_name(), "max_import_batch_row_count")) {
1104  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1105  if (int_literal == nullptr) {
1106  throw std::runtime_error(
1107  "max_import_batch_row_count option must be an integer.");
1108  }
1109  if (int_literal->get_intval() <= 0) {
1110  throw std::runtime_error(
1111  "max_import_batch_row_count option must be a positive integer (greater "
1112  "than 0).");
1113  }
1114  copy_params.max_import_batch_row_count = int_literal->get_intval();
1115  } else if (boost::iequals(*p->get_name(), "buffer_size")) {
1116  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1117  if (int_literal == nullptr) {
1118  throw std::runtime_error("buffer_size option must be an integer.");
1119  }
1120  copy_params.buffer_size = int_literal->get_intval();
1121  } else if (boost::iequals(*p->get_name(), "threads")) {
1122  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1123  if (int_literal == nullptr) {
1124  throw std::runtime_error("Threads option must be an integer.");
1125  }
1126  copy_params.threads = int_literal->get_intval();
1127  } else if (boost::iequals(*p->get_name(), "delimiter")) {
1128  const StringLiteral* str_literal =
1129  dynamic_cast<const StringLiteral*>(p->get_value());
1130  if (str_literal == nullptr) {
1131  throw std::runtime_error("Delimiter option must be a string.");
1132  } else if (str_literal->get_stringval()->length() != 1) {
1133  throw std::runtime_error("Delimiter must be a single character string.");
1134  }
1135  copy_params.delimiter = (*str_literal->get_stringval())[0];
1136  } else if (boost::iequals(*p->get_name(), "nulls")) {
1137  const StringLiteral* str_literal =
1138  dynamic_cast<const StringLiteral*>(p->get_value());
1139  if (str_literal == nullptr) {
1140  throw std::runtime_error("Nulls option must be a string.");
1141  }
1142  copy_params.null_str = *str_literal->get_stringval();
1143  } else if (boost::iequals(*p->get_name(), "header")) {
1144  const StringLiteral* str_literal =
1145  dynamic_cast<const StringLiteral*>(p->get_value());
1146  if (str_literal == nullptr) {
1147  throw std::runtime_error("Header option must be a boolean.");
1148  }
1149  copy_params.has_header = bool_from_string_literal(str_literal)
1152 #ifdef ENABLE_IMPORT_PARQUET
1153  } else if (boost::iequals(*p->get_name(), "parquet")) {
1154  warnings.emplace_back(
1155  "Deprecation Warning: COPY FROM WITH (parquet='true') is deprecated. Use "
1156  "WITH (source_type='parquet_file') instead.");
1157  const StringLiteral* str_literal =
1158  dynamic_cast<const StringLiteral*>(p->get_value());
1159  if (str_literal == nullptr) {
1160  throw std::runtime_error("'parquet' option must be a boolean.");
1161  }
1162  if (bool_from_string_literal(str_literal)) {
1163  // not sure a parquet "table" type is proper, but to make code
1164  // look consistent in some places, let's set "table" type too
1166  }
1167 #endif // ENABLE_IMPORT_PARQUET
1168  } else if (boost::iequals(*p->get_name(), "s3_access_key")) {
1169  const StringLiteral* str_literal =
1170  dynamic_cast<const StringLiteral*>(p->get_value());
1171  if (str_literal == nullptr) {
1172  throw std::runtime_error("Option s3_access_key must be a string.");
1173  }
1174  copy_params.s3_access_key = *str_literal->get_stringval();
1175  } else if (boost::iequals(*p->get_name(), "s3_secret_key")) {
1176  const StringLiteral* str_literal =
1177  dynamic_cast<const StringLiteral*>(p->get_value());
1178  if (str_literal == nullptr) {
1179  throw std::runtime_error("Option s3_secret_key must be a string.");
1180  }
1181  copy_params.s3_secret_key = *str_literal->get_stringval();
1182  } else if (boost::iequals(*p->get_name(), "s3_session_token")) {
1183  const StringLiteral* str_literal =
1184  dynamic_cast<const StringLiteral*>(p->get_value());
1185  if (str_literal == nullptr) {
1186  throw std::runtime_error("Option s3_session_token must be a string.");
1187  }
1188  copy_params.s3_session_token = *str_literal->get_stringval();
1189  } else if (boost::iequals(*p->get_name(), "s3_region")) {
1190  const StringLiteral* str_literal =
1191  dynamic_cast<const StringLiteral*>(p->get_value());
1192  if (str_literal == nullptr) {
1193  throw std::runtime_error("Option s3_region must be a string.");
1194  }
1195  copy_params.s3_region = *str_literal->get_stringval();
1196  } else if (boost::iequals(*p->get_name(), "s3_endpoint")) {
1197  const StringLiteral* str_literal =
1198  dynamic_cast<const StringLiteral*>(p->get_value());
1199  if (str_literal == nullptr) {
1200  throw std::runtime_error("Option s3_endpoint must be a string.");
1201  }
1202  copy_params.s3_endpoint = *str_literal->get_stringval();
1203  } else if (boost::iequals(*p->get_name(), "s3_max_concurrent_downloads")) {
1204  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1205  if (int_literal == nullptr) {
1206  throw std::runtime_error(
1207  "'s3_max_concurrent_downloads' option must be an integer");
1208  }
1209  const int s3_max_concurrent_downloads = int_literal->get_intval();
1210  if (s3_max_concurrent_downloads > 0) {
1211  copy_params.s3_max_concurrent_downloads = s3_max_concurrent_downloads;
1212  } else {
1213  throw std::runtime_error(
1214  "Invalid value for 's3_max_concurrent_downloads' option (must be > 0): " +
1215  std::to_string(s3_max_concurrent_downloads));
1216  }
1217  } else if (boost::iequals(*p->get_name(), "quote")) {
1218  const StringLiteral* str_literal =
1219  dynamic_cast<const StringLiteral*>(p->get_value());
1220  if (str_literal == nullptr) {
1221  throw std::runtime_error("Quote option must be a string.");
1222  } else if (str_literal->get_stringval()->length() != 1) {
1223  throw std::runtime_error("Quote must be a single character string.");
1224  }
1225  copy_params.quote = (*str_literal->get_stringval())[0];
1226  } else if (boost::iequals(*p->get_name(), "escape")) {
1227  const StringLiteral* str_literal =
1228  dynamic_cast<const StringLiteral*>(p->get_value());
1229  if (str_literal == nullptr) {
1230  throw std::runtime_error("Escape option must be a string.");
1231  } else if (str_literal->get_stringval()->length() != 1) {
1232  throw std::runtime_error("Escape must be a single character string.");
1233  }
1234  copy_params.escape = (*str_literal->get_stringval())[0];
1235  } else if (boost::iequals(*p->get_name(), "line_delimiter")) {
1236  const StringLiteral* str_literal =
1237  dynamic_cast<const StringLiteral*>(p->get_value());
1238  if (str_literal == nullptr) {
1239  throw std::runtime_error("Line_delimiter option must be a string.");
1240  } else if (str_literal->get_stringval()->length() != 1) {
1241  throw std::runtime_error("Line_delimiter must be a single character string.");
1242  }
1243  copy_params.line_delim = (*str_literal->get_stringval())[0];
1244  } else if (boost::iequals(*p->get_name(), "quoted")) {
1245  const StringLiteral* str_literal =
1246  dynamic_cast<const StringLiteral*>(p->get_value());
1247  if (str_literal == nullptr) {
1248  throw std::runtime_error("Quoted option must be a boolean.");
1249  }
1250  copy_params.quoted = bool_from_string_literal(str_literal);
1251  } else if (boost::iequals(*p->get_name(), "plain_text")) {
1252  const StringLiteral* str_literal =
1253  dynamic_cast<const StringLiteral*>(p->get_value());
1254  if (str_literal == nullptr) {
1255  throw std::runtime_error("plain_text option must be a boolean.");
1256  }
1257  copy_params.plain_text = bool_from_string_literal(str_literal);
1258  } else if (boost::iequals(*p->get_name(), "trim_spaces")) {
1259  const StringLiteral* str_literal =
1260  dynamic_cast<const StringLiteral*>(p->get_value());
1261  if (str_literal == nullptr) {
1262  throw std::runtime_error("trim_spaces option must be a boolean.");
1263  }
1264  copy_params.trim_spaces = bool_from_string_literal(str_literal);
1265  } else if (boost::iequals(*p->get_name(), "array_marker")) {
1266  const StringLiteral* str_literal =
1267  dynamic_cast<const StringLiteral*>(p->get_value());
1268  if (str_literal == nullptr) {
1269  throw std::runtime_error("Array Marker option must be a string.");
1270  } else if (str_literal->get_stringval()->length() != 2) {
1271  throw std::runtime_error(
1272  "Array Marker option must be exactly two characters. Default is {}.");
1273  }
1274  copy_params.array_begin = (*str_literal->get_stringval())[0];
1275  copy_params.array_end = (*str_literal->get_stringval())[1];
1276  } else if (boost::iequals(*p->get_name(), "array_delimiter")) {
1277  const StringLiteral* str_literal =
1278  dynamic_cast<const StringLiteral*>(p->get_value());
1279  if (str_literal == nullptr) {
1280  throw std::runtime_error("Array Delimiter option must be a string.");
1281  } else if (str_literal->get_stringval()->length() != 1) {
1282  throw std::runtime_error("Array Delimiter must be a single character string.");
1283  }
1284  copy_params.array_delim = (*str_literal->get_stringval())[0];
1285  } else if (boost::iequals(*p->get_name(), "lonlat")) {
1286  const StringLiteral* str_literal =
1287  dynamic_cast<const StringLiteral*>(p->get_value());
1288  if (str_literal == nullptr) {
1289  throw std::runtime_error("Lonlat option must be a boolean.");
1290  }
1291  copy_params.lonlat = bool_from_string_literal(str_literal);
1292  } else if (boost::iequals(*p->get_name(), "geo")) {
1293  warnings.emplace_back(
1294  "Deprecation Warning: COPY FROM WITH (geo='true') is deprecated. Use WITH "
1295  "(source_type='geo_file') instead.");
1296  const StringLiteral* str_literal =
1297  dynamic_cast<const StringLiteral*>(p->get_value());
1298  if (str_literal == nullptr) {
1299  throw std::runtime_error("'geo' option must be a boolean.");
1300  }
1301  if (bool_from_string_literal(str_literal)) {
1303  }
1304  } else if (boost::iequals(*p->get_name(), "source_type")) {
1305  const StringLiteral* str_literal =
1306  dynamic_cast<const StringLiteral*>(p->get_value());
1307  if (str_literal == nullptr) {
1308  throw std::runtime_error("'source_type' option must be a string.");
1309  }
1310  const std::string* s = str_literal->get_stringval();
1311  if (boost::iequals(*s, "delimited_file")) {
1313  } else if (boost::iequals(*s, "geo_file")) {
1315 #if ENABLE_IMPORT_PARQUET
1316  } else if (boost::iequals(*s, "parquet_file")) {
1318 #endif
1319  } else if (boost::iequals(*s, "raster_file")) {
1321  } else if (boost::iequals(*s, "regex_parsed_file")) {
1323  } else {
1324  throw std::runtime_error(
1325  "Invalid string for 'source_type' option (must be 'GEO_FILE', 'RASTER_FILE'"
1326 #if ENABLE_IMPORT_PARQUET
1327  ", 'PARQUET_FILE'"
1328 #endif
1329  ", 'REGEX_PARSED_FILE'"
1330  " or 'DELIMITED_FILE'): " +
1331  *s);
1332  }
1333  } else if (boost::iequals(*p->get_name(), "geo_coords_type")) {
1334  const StringLiteral* str_literal =
1335  dynamic_cast<const StringLiteral*>(p->get_value());
1336  if (str_literal == nullptr) {
1337  throw std::runtime_error("'geo_coords_type' option must be a string");
1338  }
1339  const std::string* s = str_literal->get_stringval();
1340  if (boost::iequals(*s, "geography")) {
1341  throw std::runtime_error(
1342  "GEOGRAPHY coords type not yet supported. Please use GEOMETRY.");
1343  // copy_params.geo_coords_type = kGEOGRAPHY;
1344  } else if (boost::iequals(*s, "geometry")) {
1345  copy_params.geo_coords_type = kGEOMETRY;
1346  } else {
1347  throw std::runtime_error(
1348  "Invalid string for 'geo_coords_type' option (must be 'GEOGRAPHY' or "
1349  "'GEOMETRY'): " +
1350  *s);
1351  }
1352  } else if (boost::iequals(*p->get_name(), "raster_point_type")) {
1353  const StringLiteral* str_literal =
1354  dynamic_cast<const StringLiteral*>(p->get_value());
1355  if (str_literal == nullptr) {
1356  throw std::runtime_error("'raster_point_type' option must be a string");
1357  }
1358  const std::string* s = str_literal->get_stringval();
1359  if (boost::iequals(*s, "none")) {
1361  } else if (boost::iequals(*s, "auto")) {
1363  } else if (boost::iequals(*s, "smallint")) {
1365  } else if (boost::iequals(*s, "int")) {
1367  } else if (boost::iequals(*s, "float")) {
1369  } else if (boost::iequals(*s, "double")) {
1371  } else if (boost::iequals(*s, "point")) {
1373  } else {
1374  throw std::runtime_error(
1375  "Invalid string for 'raster_point_type' option (must be 'NONE', 'AUTO', "
1376  "'SMALLINT', 'INT', 'FLOAT', 'DOUBLE' or 'POINT'): " +
1377  *s);
1378  }
1379  } else if (boost::iequals(*p->get_name(), "raster_point_transform")) {
1380  const StringLiteral* str_literal =
1381  dynamic_cast<const StringLiteral*>(p->get_value());
1382  if (str_literal == nullptr) {
1383  throw std::runtime_error("'raster_point_transform' option must be a string");
1384  }
1385  const std::string* s = str_literal->get_stringval();
1386  if (boost::iequals(*s, "none")) {
1388  } else if (boost::iequals(*s, "auto")) {
1390  } else if (boost::iequals(*s, "file")) {
1392  } else if (boost::iequals(*s, "world")) {
1393  copy_params.raster_point_transform =
1395  } else {
1396  throw std::runtime_error(
1397  "Invalid string for 'raster_point_transform' option (must be 'NONE', "
1398  "'AUTO', 'FILE' or 'WORLD'): " +
1399  *s);
1400  }
1401  } else if (boost::iequals(*p->get_name(), "raster_import_bands")) {
1402  const StringLiteral* str_literal =
1403  dynamic_cast<const StringLiteral*>(p->get_value());
1404  if (str_literal == nullptr) {
1405  throw std::runtime_error("'raster_import_bands' option must be a string");
1406  }
1407  const std::string* raster_import_bands = str_literal->get_stringval();
1408  if (raster_import_bands) {
1409  copy_params.raster_import_bands = *raster_import_bands;
1410  } else {
1411  throw std::runtime_error("Invalid value for 'raster_import_bands' option");
1412  }
1413  } else if (boost::iequals(*p->get_name(), "raster_import_dimensions")) {
1414  const StringLiteral* str_literal =
1415  dynamic_cast<const StringLiteral*>(p->get_value());
1416  if (str_literal == nullptr) {
1417  throw std::runtime_error("'raster_import_dimensions' option must be a string");
1418  }
1419  const std::string* raster_import_dimensions = str_literal->get_stringval();
1420  if (raster_import_dimensions) {
1421  copy_params.raster_import_dimensions = *raster_import_dimensions;
1422  } else {
1423  throw std::runtime_error("Invalid value for 'raster_import_dimensions' option");
1424  }
1425  } else if (boost::iequals(*p->get_name(), "geo_coords_encoding")) {
1426  const StringLiteral* str_literal =
1427  dynamic_cast<const StringLiteral*>(p->get_value());
1428  if (str_literal == nullptr) {
1429  throw std::runtime_error("'geo_coords_encoding' option must be a string");
1430  }
1431  const std::string* s = str_literal->get_stringval();
1432  if (boost::iequals(*s, "none")) {
1433  copy_params.geo_coords_encoding = kENCODING_NONE;
1434  copy_params.geo_coords_comp_param = 0;
1435  } else if (boost::iequals(*s, "compressed(32)")) {
1436  copy_params.geo_coords_encoding = kENCODING_GEOINT;
1437  copy_params.geo_coords_comp_param = 32;
1438  } else {
1439  throw std::runtime_error(
1440  "Invalid string for 'geo_coords_encoding' option (must be 'NONE' or "
1441  "'COMPRESSED(32)'): " +
1442  *s);
1443  }
1444  } else if (boost::iequals(*p->get_name(), "raster_scanlines_per_thread")) {
1445  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1446  if (int_literal == nullptr) {
1447  throw std::runtime_error(
1448  "'raster_scanlines_per_thread' option must be an integer");
1449  }
1450  const int raster_scanlines_per_thread = int_literal->get_intval();
1451  if (raster_scanlines_per_thread < 0) {
1452  throw std::runtime_error(
1453  "'raster_scanlines_per_thread' option must be >= 0, with 0 denoting auto "
1454  "sizing");
1455  }
1456  copy_params.raster_scanlines_per_thread = raster_scanlines_per_thread;
1457  } else if (boost::iequals(*p->get_name(), "geo_coords_srid")) {
1458  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1459  if (int_literal == nullptr) {
1460  throw std::runtime_error("'geo_coords_srid' option must be an integer");
1461  }
1462  const int srid = int_literal->get_intval();
1463  if (srid == 4326 || srid == 3857 || srid == 900913) {
1464  copy_params.geo_coords_srid = srid;
1465  } else {
1466  throw std::runtime_error(
1467  "Invalid value for 'geo_coords_srid' option (must be 4326, 3857, or "
1468  "900913): " +
1469  std::to_string(srid));
1470  }
1471  } else if (boost::iequals(*p->get_name(), "geo_layer_name")) {
1472  const StringLiteral* str_literal =
1473  dynamic_cast<const StringLiteral*>(p->get_value());
1474  if (str_literal == nullptr) {
1475  throw std::runtime_error("'geo_layer_name' option must be a string");
1476  }
1477  const std::string* layer_name = str_literal->get_stringval();
1478  if (layer_name) {
1479  copy_params.geo_layer_name = *layer_name;
1480  } else {
1481  throw std::runtime_error("Invalid value for 'geo_layer_name' option");
1482  }
1483  } else if (boost::iequals(*p->get_name(), "partitions")) {
1484  const auto partitions =
1485  static_cast<const StringLiteral*>(p->get_value())->get_stringval();
1486  CHECK(partitions);
1487  const auto partitions_uc = boost::to_upper_copy<std::string>(*partitions);
1488  if (partitions_uc != "REPLICATED") {
1489  throw std::runtime_error(
1490  "Invalid value for 'partitions' option. Must be 'REPLICATED'.");
1491  }
1492  deferred_copy_from_partitions_ = partitions_uc;
1493  } else if (boost::iequals(*p->get_name(), "geo_explode_collections")) {
1494  const StringLiteral* str_literal =
1495  dynamic_cast<const StringLiteral*>(p->get_value());
1496  if (str_literal == nullptr) {
1497  throw std::runtime_error("geo_explode_collections option must be a boolean.");
1498  }
1499  copy_params.geo_explode_collections = bool_from_string_literal(str_literal);
1500  } else if (boost::iequals(*p->get_name(), "geo_validate_geometry")) {
1501  const StringLiteral* str_literal =
1502  dynamic_cast<const StringLiteral*>(p->get_value());
1503  if (str_literal == nullptr) {
1504  throw std::runtime_error("geo_validate_geometry option must be a boolean.");
1505  }
1506  copy_params.geo_validate_geometry = false;
1507  auto const value = bool_from_string_literal(str_literal);
1508  if (value) {
1510  copy_params.geo_validate_geometry = true;
1511  } else {
1512  throw std::runtime_error("GEOS geometry validation is not available.");
1513  }
1514  }
1515  } else if (boost::iequals(*p->get_name(), "source_srid")) {
1516  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1517  if (int_literal == nullptr) {
1518  throw std::runtime_error("'source_srid' option must be an integer");
1519  }
1520  const int srid = int_literal->get_intval();
1522  copy_params.source_srid = srid;
1523  } else {
1524  throw std::runtime_error(
1525  "'source_srid' option can only be used on csv/tsv files");
1526  }
1527  } else if (boost::iequals(*p->get_name(), "regex_path_filter")) {
1528  const StringLiteral* str_literal =
1529  dynamic_cast<const StringLiteral*>(p->get_value());
1530  if (str_literal == nullptr) {
1531  throw std::runtime_error("Option regex_path_filter must be a string.");
1532  }
1533  const auto string_val = *str_literal->get_stringval();
1534  copy_params.regex_path_filter =
1535  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1536  } else if (boost::iequals(*p->get_name(), "file_sort_order_by")) {
1537  const StringLiteral* str_literal =
1538  dynamic_cast<const StringLiteral*>(p->get_value());
1539  if (str_literal == nullptr) {
1540  throw std::runtime_error("Option file_sort_order_by must be a string.");
1541  }
1542  const auto string_val = *str_literal->get_stringval();
1543  copy_params.file_sort_order_by =
1544  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1545  } else if (boost::iequals(*p->get_name(), "file_sort_regex")) {
1546  const StringLiteral* str_literal =
1547  dynamic_cast<const StringLiteral*>(p->get_value());
1548  if (str_literal == nullptr) {
1549  throw std::runtime_error("Option file_sort_regex must be a string.");
1550  }
1551  const auto string_val = *str_literal->get_stringval();
1552  copy_params.file_sort_regex =
1553  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1554  } else if (boost::iequals(*p->get_name(), "raster_point_compute_angle")) {
1555  const StringLiteral* str_literal =
1556  dynamic_cast<const StringLiteral*>(p->get_value());
1557  if (str_literal == nullptr) {
1558  throw std::runtime_error(
1559  "'raster_point_compute_angle' option must be a boolean.");
1560  }
1561  if (bool_from_string_literal(str_literal)) {
1562  copy_params.raster_point_compute_angle = true;
1563  }
1564  } else if (boost::iequals(*p->get_name(), "raster_drop_if_all_null")) {
1565  const StringLiteral* str_literal =
1566  dynamic_cast<const StringLiteral*>(p->get_value());
1567  if (str_literal == nullptr) {
1568  throw std::runtime_error("'raster_drop_if_all_null' option must be a boolean.");
1569  }
1570  if (bool_from_string_literal(str_literal)) {
1571  copy_params.raster_drop_if_all_null = true;
1572  }
1573  } else if (boost::iequals(*p->get_name(), "sql_order_by")) {
1574  if (auto str_literal = dynamic_cast<const StringLiteral*>(p->get_value())) {
1575  copy_params.sql_order_by = *str_literal->get_stringval();
1576  } else {
1577  throw std::runtime_error("Option sql_order_by must be a string.");
1578  }
1579  } else if (boost::iequals(*p->get_name(), "username")) {
1580  const StringLiteral* str_literal =
1581  dynamic_cast<const StringLiteral*>(p->get_value());
1582  if (str_literal == nullptr) {
1583  throw std::runtime_error("Option username must be a string.");
1584  }
1585  const auto string_val = *str_literal->get_stringval();
1586  copy_params.username = string_val;
1587  } else if (boost::iequals(*p->get_name(), "password")) {
1588  const StringLiteral* str_literal =
1589  dynamic_cast<const StringLiteral*>(p->get_value());
1590  if (str_literal == nullptr) {
1591  throw std::runtime_error("Option password must be a string.");
1592  }
1593  const auto string_val = *str_literal->get_stringval();
1594  copy_params.password = string_val;
1595  } else if (boost::iequals(*p->get_name(), "credential_string")) {
1596  const StringLiteral* str_literal =
1597  dynamic_cast<const StringLiteral*>(p->get_value());
1598  if (str_literal == nullptr) {
1599  throw std::runtime_error("Option credential_string must be a string.");
1600  }
1601  const auto string_val = *str_literal->get_stringval();
1602  copy_params.credential_string = string_val;
1603  } else if (boost::iequals(*p->get_name(), "data_source_name")) {
1604  const StringLiteral* str_literal =
1605  dynamic_cast<const StringLiteral*>(p->get_value());
1606  if (str_literal == nullptr) {
1607  throw std::runtime_error("Option data_source_name must be a string.");
1608  }
1609  const auto string_val = *str_literal->get_stringval();
1610  copy_params.dsn = string_val;
1611  } else if (boost::iequals(*p->get_name(), "connection_string")) {
1612  const StringLiteral* str_literal =
1613  dynamic_cast<const StringLiteral*>(p->get_value());
1614  if (str_literal == nullptr) {
1615  throw std::runtime_error("Option connection_string must be a string.");
1616  }
1617  const auto string_val = *str_literal->get_stringval();
1618  copy_params.connection_string = string_val;
1619  } else if (boost::iequals(*p->get_name(), "line_start_regex")) {
1620  const StringLiteral* str_literal =
1621  dynamic_cast<const StringLiteral*>(p->get_value());
1622  if (str_literal == nullptr) {
1623  throw std::runtime_error("Option line_start_regex must be a string.");
1624  }
1625  const auto string_val = *str_literal->get_stringval();
1626  copy_params.line_start_regex = string_val;
1627  } else if (boost::iequals(*p->get_name(), "line_regex")) {
1628  const StringLiteral* str_literal =
1629  dynamic_cast<const StringLiteral*>(p->get_value());
1630  if (str_literal == nullptr) {
1631  throw std::runtime_error("Option line_regex must be a string.");
1632  }
1633  const auto string_val = *str_literal->get_stringval();
1634  copy_params.line_regex = string_val;
1635  } else if (boost::iequals(*p->get_name(), "add_metadata_columns") &&
1637  const StringLiteral* str_literal =
1638  dynamic_cast<const StringLiteral*>(p->get_value());
1639  if (str_literal == nullptr) {
1640  throw std::runtime_error("'add_metadata_columns' option must be a string.");
1641  }
1642  copy_params.add_metadata_columns = *str_literal->get_stringval();
1643  } else {
1644  throw std::runtime_error("Invalid option for COPY: " + *p->get_name());
1645  }
1646  }
1647  }
1648 }
1649 
1650 bool expr_is_null(const Analyzer::Expr* expr) {
1651  if (expr->get_type_info().get_type() == kNULLT) {
1652  return true;
1653  }
1654  auto array_expr = dynamic_cast<const Analyzer::ArrayExpr*>(expr);
1655  if (array_expr && array_expr->isNull()) {
1656  return true;
1657  }
1658  const auto const_expr = dynamic_cast<const Analyzer::Constant*>(expr);
1659  return const_expr && const_expr->get_is_null();
1660 }
1661 
1662 } // namespace
1663 
1664 std::shared_ptr<Analyzer::Expr> CaseExpr::normalize(
1665  const std::list<std::pair<std::shared_ptr<Analyzer::Expr>,
1666  std::shared_ptr<Analyzer::Expr>>>& expr_pair_list,
1667  const std::shared_ptr<Analyzer::Expr> else_e_in,
1668  const Executor* executor) {
1669  SQLTypeInfo ti;
1670  bool has_agg = false;
1671  // We need to keep track of whether there was at
1672  // least one none-encoded string literal expression
1673  // type among any of the case sub-expressions separately
1674  // from rest of type determination logic, as it will
1675  // be casted to the output dictionary type if all output
1676  // types are either dictionary encoded or none-encoded
1677  // literals, or a transient encoded dictionary if all
1678  // types are none-encoded (column or literal)
1679  SQLTypeInfo none_encoded_literal_ti;
1680 
1681  for (auto& p : expr_pair_list) {
1682  auto e1 = p.first;
1683  CHECK(e1->get_type_info().is_boolean());
1684  auto e2 = p.second;
1685  if (e2->get_contains_agg()) {
1686  has_agg = true;
1687  }
1688  const auto& e2_ti = e2->get_type_info();
1689  const auto col_var = std::dynamic_pointer_cast<const Analyzer::ColumnVar>(e2);
1690  if (e2_ti.is_string() && !e2_ti.is_dict_encoded_string() && !col_var) {
1691  CHECK(e2_ti.is_none_encoded_string());
1692  none_encoded_literal_ti =
1693  none_encoded_literal_ti.get_type() == kNULLT
1694  ? e2_ti
1695  : common_string_type(none_encoded_literal_ti, e2_ti, executor);
1696  continue;
1697  }
1698  if (ti.get_type() == kNULLT) {
1699  if (!expr_is_null(e2.get())) {
1700  ti = e2_ti;
1701  }
1702  } else if (expr_is_null(e2.get())) {
1703  ti.set_notnull(false);
1704  e2->set_type_info(ti);
1705  } else if (ti != e2_ti) {
1706  if (ti.is_string() && e2_ti.is_string()) {
1707  // Executor is needed to determine which dictionary is the largest
1708  // in case of two dictionary types with different encodings
1709  ti = common_string_type(ti, e2_ti, executor);
1710  } else if (ti.is_number() && e2_ti.is_number()) {
1712  } else if (ti.is_boolean() && e2_ti.is_boolean()) {
1714  } else {
1715  throw std::runtime_error(
1716  "Expressions in THEN clause must be of the same or compatible types.");
1717  }
1718  }
1719  }
1720  auto else_e = else_e_in;
1721  const auto& else_ti = else_e->get_type_info();
1722  if (else_e) {
1723  const auto col_var = std::dynamic_pointer_cast<const Analyzer::ColumnVar>(else_e);
1724  if (else_e->get_contains_agg()) {
1725  has_agg = true;
1726  }
1727  if (else_ti.is_string() && !else_ti.is_dict_encoded_string() && !col_var) {
1728  CHECK(else_ti.is_none_encoded_string());
1729  none_encoded_literal_ti =
1730  none_encoded_literal_ti.get_type() == kNULLT
1731  ? else_ti
1732  : common_string_type(none_encoded_literal_ti, else_ti, executor);
1733  } else {
1734  if (ti.get_type() == kNULLT) {
1735  ti = else_ti;
1736  } else if (expr_is_null(else_e.get())) {
1737  ti.set_notnull(false);
1738  else_e->set_type_info(ti);
1739  } else if (ti != else_ti) {
1740  ti.set_notnull(false);
1741  if (ti.is_string() && else_ti.is_string()) {
1742  // Executor is needed to determine which dictionary is the largest
1743  // in case of two dictionary types with different encodings
1744  ti = common_string_type(ti, else_ti, executor);
1745  } else if (ti.is_number() && else_ti.is_number()) {
1746  ti = Analyzer::BinOper::common_numeric_type(ti, else_ti);
1747  } else if (ti.is_boolean() && else_ti.is_boolean()) {
1748  ti = Analyzer::BinOper::common_numeric_type(ti, else_ti);
1749  } else if (get_logical_type_info(ti) != get_logical_type_info(else_ti)) {
1750  throw std::runtime_error(
1751  // types differing by encoding will be resolved at decode
1752  "Expressions in ELSE clause must be of the same or compatible types as "
1753  "those in the THEN clauses.");
1754  }
1755  }
1756  }
1757  }
1758 
1759  if (ti.get_type() == kNULLT && none_encoded_literal_ti.get_type() != kNULLT) {
1760  // If we haven't set a type so far it's because
1761  // every case sub-expression has a none-encoded
1762  // literal output. Output a transient-encoded dictionary
1763  // so we can use the output downstream
1765  }
1766 
1767  std::list<std::pair<std::shared_ptr<Analyzer::Expr>, std::shared_ptr<Analyzer::Expr>>>
1768  cast_expr_pair_list;
1769  for (auto p : expr_pair_list) {
1770  ti.set_notnull(false);
1771  cast_expr_pair_list.emplace_back(p.first, p.second->add_cast(ti));
1772  }
1773  if (else_e != nullptr) {
1774  else_e = else_e->add_cast(ti);
1775  } else {
1776  Datum d;
1777  // always create an else expr so that executor doesn't need to worry about it
1778  ti.set_notnull(false);
1779  else_e = makeExpr<Analyzer::Constant>(ti, true, d);
1780  }
1781  if (ti.get_type() == kNULLT) {
1782  throw std::runtime_error(
1783  "Cannot deduce the type for case expressions, all branches null");
1784  }
1785 
1786  auto case_expr = makeExpr<Analyzer::CaseExpr>(ti, has_agg, cast_expr_pair_list, else_e);
1787  return case_expr;
1788 }
1789 
1790 std::string CaseExpr::to_string() const {
1791  std::string str("CASE ");
1792  for (auto& p : when_then_list_) {
1793  str += "WHEN " + p->get_expr1()->to_string() + " THEN " +
1794  p->get_expr2()->to_string() + " ";
1795  }
1796  if (else_expr_ != nullptr) {
1797  str += "ELSE " + else_expr_->to_string();
1798  }
1799  str += " END";
1800  return str;
1801 }
1802 
1803 void UnionQuery::analyze(const Catalog_Namespace::Catalog& catalog,
1804  Analyzer::Query& query) const {
1805  left_->analyze(catalog, query);
1806  Analyzer::Query* right_query = new Analyzer::Query();
1807  right_->analyze(catalog, *right_query);
1808  query.set_next_query(right_query);
1809  query.set_is_unionall(is_unionall_);
1810 }
1811 
1812 void QuerySpec::analyze_having_clause(const Catalog_Namespace::Catalog& catalog,
1813  Analyzer::Query& query) const {
1814  std::shared_ptr<Analyzer::Expr> p;
1815  if (having_clause_ != nullptr) {
1816  p = having_clause_->analyze(catalog, query, Expr::TlistRefType::TLIST_COPY);
1817  if (p->get_type_info().get_type() != kBOOLEAN) {
1818  throw std::runtime_error("Only boolean expressions can be in HAVING clause.");
1819  }
1820  p->check_group_by(query.get_group_by());
1821  }
1822  query.set_having_predicate(p);
1823 }
1824 
1825 void QuerySpec::analyze_group_by(const Catalog_Namespace::Catalog& catalog,
1826  Analyzer::Query& query) const {
1827  std::list<std::shared_ptr<Analyzer::Expr>> groupby;
1828  if (!groupby_clause_.empty()) {
1829  int gexpr_no = 1;
1830  std::shared_ptr<Analyzer::Expr> gexpr;
1831  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
1832  query.get_targetlist();
1833  for (auto& c : groupby_clause_) {
1834  // special-case ordinal numbers in GROUP BY
1835  if (dynamic_cast<Literal*>(c.get())) {
1836  IntLiteral* i = dynamic_cast<IntLiteral*>(c.get());
1837  if (!i) {
1838  throw std::runtime_error("Invalid literal in GROUP BY clause.");
1839  }
1840  int varno = (int)i->get_intval();
1841  if (varno <= 0 || varno > static_cast<int>(tlist.size())) {
1842  throw std::runtime_error("Invalid ordinal number in GROUP BY clause.");
1843  }
1844  if (tlist[varno - 1]->get_expr()->get_contains_agg()) {
1845  throw std::runtime_error(
1846  "Ordinal number in GROUP BY cannot reference an expression containing "
1847  "aggregate "
1848  "functions.");
1849  }
1850  gexpr = makeExpr<Analyzer::Var>(
1851  tlist[varno - 1]->get_expr()->get_type_info(), Analyzer::Var::kOUTPUT, varno);
1852  } else {
1853  gexpr = c->analyze(catalog, query, Expr::TlistRefType::TLIST_REF);
1854  }
1855  const SQLTypeInfo gti = gexpr->get_type_info();
1856  bool set_new_type = false;
1857  SQLTypeInfo ti(gti);
1858  if (gti.is_string() && gti.get_compression() == kENCODING_NONE) {
1859  set_new_type = true;
1863  ti.set_fixed_size();
1864  }
1865  std::shared_ptr<Analyzer::Var> v;
1866  if (std::dynamic_pointer_cast<Analyzer::Var>(gexpr)) {
1867  v = std::static_pointer_cast<Analyzer::Var>(gexpr);
1868  int n = v->get_varno();
1869  gexpr = tlist[n - 1]->get_own_expr();
1870  auto cv = std::dynamic_pointer_cast<Analyzer::ColumnVar>(gexpr);
1871  if (cv != nullptr) {
1872  // inherit all ColumnVar info for lineage.
1873  *std::static_pointer_cast<Analyzer::ColumnVar>(v) = *cv;
1874  }
1875  v->set_which_row(Analyzer::Var::kGROUPBY);
1876  v->set_varno(gexpr_no);
1877  tlist[n - 1]->set_expr(v);
1878  }
1879  if (set_new_type) {
1880  auto new_e = gexpr->add_cast(ti);
1881  groupby.push_back(new_e);
1882  if (v != nullptr) {
1883  v->set_type_info(new_e->get_type_info());
1884  }
1885  } else {
1886  groupby.push_back(gexpr);
1887  }
1888  gexpr_no++;
1889  }
1890  }
1891  if (query.get_num_aggs() > 0 || !groupby.empty()) {
1892  for (auto t : query.get_targetlist()) {
1893  auto e = t->get_expr();
1894  e->check_group_by(groupby);
1895  }
1896  }
1897  query.set_group_by(groupby);
1898 }
1899 
1900 void QuerySpec::analyze_where_clause(const Catalog_Namespace::Catalog& catalog,
1901  Analyzer::Query& query) const {
1902  if (where_clause_ == nullptr) {
1903  query.set_where_predicate(nullptr);
1904  return;
1905  }
1906  auto p = where_clause_->analyze(catalog, query, Expr::TlistRefType::TLIST_COPY);
1907  if (p->get_type_info().get_type() != kBOOLEAN) {
1908  throw std::runtime_error("Only boolean expressions can be in WHERE clause.");
1909  }
1910  query.set_where_predicate(p);
1911 }
1912 
1913 void QuerySpec::analyze_select_clause(const Catalog_Namespace::Catalog& catalog,
1914  Analyzer::Query& query) const {
1915  std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
1916  query.get_targetlist_nonconst();
1917  if (select_clause_.empty()) {
1918  // this means SELECT *
1919  int rte_idx = 0;
1920  for (auto rte : query.get_rangetable()) {
1921  rte->expand_star_in_targetlist(catalog, tlist, rte_idx++);
1922  }
1923  } else {
1924  for (auto& p : select_clause_) {
1925  const Parser::Expr* select_expr = p->get_select_expr();
1926  // look for the case of range_var.*
1927  if (typeid(*select_expr) == typeid(ColumnRef) &&
1928  dynamic_cast<const ColumnRef*>(select_expr)->get_column() == nullptr) {
1929  const std::string* range_var_name =
1930  dynamic_cast<const ColumnRef*>(select_expr)->get_table();
1931  int rte_idx = query.get_rte_idx(*range_var_name);
1932  if (rte_idx < 0) {
1933  throw std::runtime_error("invalid range variable name: " + *range_var_name);
1934  }
1935  Analyzer::RangeTableEntry* rte = query.get_rte(rte_idx);
1936  rte->expand_star_in_targetlist(catalog, tlist, rte_idx);
1937  } else {
1938  auto e = select_expr->analyze(catalog, query);
1939  std::string resname;
1940 
1941  if (p->get_alias() != nullptr) {
1942  resname = *p->get_alias();
1943  } else if (std::dynamic_pointer_cast<Analyzer::ColumnVar>(e) &&
1944  !std::dynamic_pointer_cast<Analyzer::Var>(e)) {
1945  auto colvar = std::static_pointer_cast<Analyzer::ColumnVar>(e);
1946  const auto& column_key = colvar->getColumnKey();
1947  const ColumnDescriptor* col_desc =
1948  catalog.getMetadataForColumn(column_key.table_id, column_key.column_id);
1949  resname = col_desc->columnName;
1950  }
1951  if (e->get_type_info().get_type() == kNULLT) {
1952  throw std::runtime_error(
1953  "Untyped NULL in SELECT clause. Use CAST to specify a type.");
1954  }
1955  auto o = std::static_pointer_cast<Analyzer::UOper>(e);
1956  bool unnest = (o != nullptr && o->get_optype() == kUNNEST);
1957  auto tle = std::make_shared<Analyzer::TargetEntry>(resname, e, unnest);
1958  tlist.push_back(tle);
1959  }
1960  }
1961  }
1962 }
1963 
1964 void QuerySpec::analyze_from_clause(const Catalog_Namespace::Catalog& catalog,
1965  Analyzer::Query& query) const {
1967  for (auto& p : from_clause_) {
1968  const TableDescriptor* table_desc;
1969  table_desc = catalog.getMetadataForTable(*p->get_table_name());
1970  if (table_desc == nullptr) {
1971  throw std::runtime_error("Table " + *p->get_table_name() + " does not exist.");
1972  }
1973  std::string range_var;
1974  if (p->get_range_var() == nullptr) {
1975  range_var = *p->get_table_name();
1976  } else {
1977  range_var = *p->get_range_var();
1978  }
1979  rte = new Analyzer::RangeTableEntry(range_var, table_desc, nullptr);
1980  query.add_rte(rte);
1981  }
1982 }
1983 
1984 void QuerySpec::analyze(const Catalog_Namespace::Catalog& catalog,
1985  Analyzer::Query& query) const {
1986  query.set_is_distinct(is_distinct_);
1987  analyze_from_clause(catalog, query);
1988  analyze_select_clause(catalog, query);
1989  analyze_where_clause(catalog, query);
1990  analyze_group_by(catalog, query);
1991  analyze_having_clause(catalog, query);
1992 }
1993 
1994 namespace {
1995 
1996 // clean known escape'd chars without having to do a full json parse
1997 std::string unescape(std::string s) {
1998  boost::replace_all(s, "\\\\t", "\t");
1999  boost::replace_all(s, "\\t", "\t");
2000  boost::replace_all(s, "\\\\n", "\n");
2001  boost::replace_all(s, "\\n", "\n");
2002 
2003  // handle numerics
2004  std::smatch m;
2005 
2006  // "\x00"
2007  std::regex e1("(\\\\x[0-9A-Fa-f][0-9A-Fa-f])");
2008  while (std::regex_search(s, m, e1)) {
2009  std::string original(m[0].first, m[0].second);
2010  std::string replacement;
2011  long val = strtol(original.substr(2, 2).c_str(), NULL, 16);
2012  replacement.push_back(val);
2013  boost::replace_all(s, original, replacement);
2014  }
2015 
2016  // "\u0000"
2017  std::regex e2("(\\\\u[0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f])");
2018  while (std::regex_search(s, m, e2)) {
2019  std::string original(m[0].first, m[0].second);
2020  std::string replacement;
2021  long val = strtol(original.substr(2, 4).c_str(), NULL, 16);
2022  replacement.push_back(val);
2023  boost::replace_all(s, original, replacement);
2024  }
2025 
2026  return s;
2027 }
2028 
2029 void parse_options(const rapidjson::Value& payload,
2030  std::list<std::unique_ptr<NameValueAssign>>& nameValueList,
2031  bool stringToNull = false,
2032  bool stringToInteger = false) {
2033  if (payload.HasMember("options") && payload["options"].IsObject()) {
2034  const auto& options = payload["options"];
2035  for (auto itr = options.MemberBegin(); itr != options.MemberEnd(); ++itr) {
2036  auto option_name = std::make_unique<std::string>(itr->name.GetString());
2037  std::unique_ptr<Literal> literal_value;
2038  if (itr->value.IsString()) {
2039  std::string str = itr->value.GetString();
2040  if (stringToNull && str == "") {
2041  literal_value = std::make_unique<NullLiteral>();
2042  } else if (stringToInteger && std::all_of(str.begin(), str.end(), ::isdigit)) {
2043  int iVal = std::stoi(str);
2044  literal_value = std::make_unique<IntLiteral>(iVal);
2045  } else {
2046  // Rapidjson will deliberately provide escape'd strings when accessed
2047  // ... but the literal should have a copy of the raw unescaped string
2048  auto unique_literal_string = std::make_unique<std::string>(unescape(str));
2049  literal_value =
2050  std::make_unique<StringLiteral>(unique_literal_string.release());
2051  }
2052  } else if (itr->value.IsInt() || itr->value.IsInt64()) {
2053  literal_value = std::make_unique<IntLiteral>(json_i64(itr->value));
2054  } else if (itr->value.IsDouble()) {
2055  literal_value = std::make_unique<DoubleLiteral>(json_double(itr->value));
2056  } else if (itr->value.IsNull()) {
2057  literal_value = std::make_unique<NullLiteral>();
2058  } else {
2059  throw std::runtime_error("Unable to handle literal for " + *option_name);
2060  }
2061  CHECK(literal_value);
2062 
2063  nameValueList.emplace_back(std::make_unique<NameValueAssign>(
2064  option_name.release(), literal_value.release()));
2065  }
2066  }
2067 }
2068 } // namespace
2069 
2070 void SelectStmt::analyze(const Catalog_Namespace::Catalog& catalog,
2071  Analyzer::Query& query) const {
2072  query.set_stmt_type(kSELECT);
2073  query.set_limit(limit_);
2074  if (offset_ < 0) {
2075  throw std::runtime_error("OFFSET cannot be negative.");
2076  }
2077  query.set_offset(offset_);
2078  query_expr_->analyze(catalog, query);
2079  if (orderby_clause_.empty() && !query.get_is_distinct()) {
2080  query.set_order_by(nullptr);
2081  return;
2082  }
2083  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
2084  query.get_targetlist();
2085  std::list<Analyzer::OrderEntry>* order_by = new std::list<Analyzer::OrderEntry>();
2086  if (!orderby_clause_.empty()) {
2087  for (auto& p : orderby_clause_) {
2088  int tle_no = p->get_colno();
2089  if (tle_no == 0) {
2090  // use column name
2091  // search through targetlist for matching name
2092  const std::string* name = p->get_column()->get_column();
2093  tle_no = 1;
2094  bool found = false;
2095  for (auto tle : tlist) {
2096  if (tle->get_resname() == *name) {
2097  found = true;
2098  break;
2099  }
2100  tle_no++;
2101  }
2102  if (!found) {
2103  throw std::runtime_error("invalid name in order by: " + *name);
2104  }
2105  }
2106  order_by->push_back(
2107  Analyzer::OrderEntry(tle_no, p->get_is_desc(), p->get_nulls_first()));
2108  }
2109  }
2110  if (query.get_is_distinct()) {
2111  // extend order_by to include all targetlist entries.
2112  for (int i = 1; i <= static_cast<int>(tlist.size()); i++) {
2113  bool in_orderby = false;
2114  std::for_each(order_by->begin(),
2115  order_by->end(),
2116  [&in_orderby, i](const Analyzer::OrderEntry& oe) {
2117  in_orderby = in_orderby || (i == oe.tle_no);
2118  });
2119  if (!in_orderby) {
2120  order_by->push_back(Analyzer::OrderEntry(i, false, false));
2121  }
2122  }
2123  }
2124  query.set_order_by(order_by);
2125 }
2126 
2127 std::string SelectEntry::to_string() const {
2128  std::string str = select_expr_->to_string();
2129  if (alias_ != nullptr) {
2130  str += " AS " + *alias_;
2131  }
2132  return str;
2133 }
2134 
2135 std::string TableRef::to_string() const {
2136  std::string str = *table_name_;
2137  if (range_var_ != nullptr) {
2138  str += " " + *range_var_;
2139  }
2140  return str;
2141 }
2142 
2143 std::string ColumnRef::to_string() const {
2144  std::string str;
2145  if (table_ == nullptr) {
2146  str = *column_;
2147  } else if (column_ == nullptr) {
2148  str = *table_ + ".*";
2149  } else {
2150  str = *table_ + "." + *column_;
2151  }
2152  return str;
2153 }
2154 
2155 std::string OperExpr::to_string() const {
2156  std::string op_str[] = {
2157  "=", "===", "<>", "<", ">", "<=", ">=", " AND ", " OR ", "NOT", "-", "+", "*", "/"};
2158  std::string str;
2159  if (optype_ == kUMINUS) {
2160  str = "-(" + left_->to_string() + ")";
2161  } else if (optype_ == kNOT) {
2162  str = "NOT (" + left_->to_string() + ")";
2163  } else if (optype_ == kARRAY_AT) {
2164  str = left_->to_string() + "[" + right_->to_string() + "]";
2165  } else if (optype_ == kUNNEST) {
2166  str = "UNNEST(" + left_->to_string() + ")";
2167  } else if (optype_ == kIN) {
2168  str = "(" + left_->to_string() + " IN " + right_->to_string() + ")";
2169  } else {
2170  str = "(" + left_->to_string() + op_str[optype_] + right_->to_string() + ")";
2171  }
2172  return str;
2173 }
2174 
2175 std::string InExpr::to_string() const {
2176  std::string str = arg_->to_string();
2177  if (is_not_) {
2178  str += " NOT IN ";
2179  } else {
2180  str += " IN ";
2181  }
2182  return str;
2183 }
2184 
2185 std::string ExistsExpr::to_string() const {
2186  return "EXISTS (" + query_->to_string() + ")";
2187 }
2188 
2189 std::string SubqueryExpr::to_string() const {
2190  std::string str;
2191  str = "(";
2192  str += query_->to_string();
2193  str += ")";
2194  return str;
2195 }
2196 
2197 std::string IsNullExpr::to_string() const {
2198  std::string str = arg_->to_string();
2199  if (is_not_) {
2200  str += " IS NOT NULL";
2201  } else {
2202  str += " IS NULL";
2203  }
2204  return str;
2205 }
2206 
2207 std::string InSubquery::to_string() const {
2208  std::string str = InExpr::to_string();
2209  str += subquery_->to_string();
2210  return str;
2211 }
2212 
2213 std::string InValues::to_string() const {
2214  std::string str = InExpr::to_string() + "(";
2215  bool notfirst = false;
2216  for (auto& p : value_list_) {
2217  if (notfirst) {
2218  str += ", ";
2219  } else {
2220  notfirst = true;
2221  }
2222  str += p->to_string();
2223  }
2224  str += ")";
2225  return str;
2226 }
2227 
2228 std::string BetweenExpr::to_string() const {
2229  std::string str = arg_->to_string();
2230  if (is_not_) {
2231  str += " NOT BETWEEN ";
2232  } else {
2233  str += " BETWEEN ";
2234  }
2235  str += lower_->to_string() + " AND " + upper_->to_string();
2236  return str;
2237 }
2238 
2239 std::string CharLengthExpr::to_string() const {
2240  std::string str;
2241  if (calc_encoded_length_) {
2242  str = "CHAR_LENGTH (" + arg_->to_string() + ")";
2243  } else {
2244  str = "LENGTH (" + arg_->to_string() + ")";
2245  }
2246  return str;
2247 }
2248 
2249 std::string CardinalityExpr::to_string() const {
2250  std::string str = "CARDINALITY(" + arg_->to_string() + ")";
2251  return str;
2252 }
2253 
2254 std::string LikeExpr::to_string() const {
2255  std::string str = arg_->to_string();
2256  if (is_not_) {
2257  str += " NOT LIKE ";
2258  } else {
2259  str += " LIKE ";
2260  }
2261  str += like_string_->to_string();
2262  if (escape_string_ != nullptr) {
2263  str += " ESCAPE " + escape_string_->to_string();
2264  }
2265  return str;
2266 }
2267 
2268 std::string RegexpExpr::to_string() const {
2269  std::string str = arg_->to_string();
2270  if (is_not_) {
2271  str += " NOT REGEXP ";
2272  } else {
2273  str += " REGEXP ";
2274  }
2275  str += pattern_string_->to_string();
2276  if (escape_string_ != nullptr) {
2277  str += " ESCAPE " + escape_string_->to_string();
2278  }
2279  return str;
2280 }
2281 
2282 std::string WidthBucketExpr::to_string() const {
2283  std::string str = " WIDTH_BUCKET ";
2284  str += target_value_->to_string();
2285  str += " ";
2286  str += lower_bound_->to_string();
2287  str += " ";
2288  str += upper_bound_->to_string();
2289  str += " ";
2290  str += partition_count_->to_string();
2291  str += " ";
2292  return str;
2293 }
2294 
2295 std::string LikelihoodExpr::to_string() const {
2296  std::string str = " LIKELIHOOD ";
2297  str += arg_->to_string();
2298  str += " ";
2299  str += boost::lexical_cast<std::string>(is_not_ ? 1.0 - likelihood_ : likelihood_);
2300  return str;
2301 }
2302 
2303 std::string FunctionRef::to_string() const {
2304  std::string str = *name_ + "(";
2305  if (distinct_) {
2306  str += "DISTINCT ";
2307  }
2308  if (arg_ == nullptr) {
2309  str += "*)";
2310  } else {
2311  str += arg_->to_string() + ")";
2312  }
2313  return str;
2314 }
2315 
2316 std::string QuerySpec::to_string() const {
2317  std::string query_str = "SELECT ";
2318  if (is_distinct_) {
2319  query_str += "DISTINCT ";
2320  }
2321  if (select_clause_.empty()) {
2322  query_str += "* ";
2323  } else {
2324  bool notfirst = false;
2325  for (auto& p : select_clause_) {
2326  if (notfirst) {
2327  query_str += ", ";
2328  } else {
2329  notfirst = true;
2330  }
2331  query_str += p->to_string();
2332  }
2333  }
2334  query_str += " FROM ";
2335  bool notfirst = false;
2336  for (auto& p : from_clause_) {
2337  if (notfirst) {
2338  query_str += ", ";
2339  } else {
2340  notfirst = true;
2341  }
2342  query_str += p->to_string();
2343  }
2344  if (where_clause_) {
2345  query_str += " WHERE " + where_clause_->to_string();
2346  }
2347  if (!groupby_clause_.empty()) {
2348  query_str += " GROUP BY ";
2349  bool notfirst = false;
2350  for (auto& p : groupby_clause_) {
2351  if (notfirst) {
2352  query_str += ", ";
2353  } else {
2354  notfirst = true;
2355  }
2356  query_str += p->to_string();
2357  }
2358  }
2359  if (having_clause_) {
2360  query_str += " HAVING " + having_clause_->to_string();
2361  }
2362  query_str += ";";
2363  return query_str;
2364 }
2365 
2366 void InsertStmt::analyze(const Catalog_Namespace::Catalog& catalog,
2367  Analyzer::Query& query) const {
2368  query.set_stmt_type(kINSERT);
2369  const TableDescriptor* td = catalog.getMetadataForTable(*table_);
2370  if (td == nullptr) {
2371  throw std::runtime_error("Table " + *table_ + " does not exist.");
2372  }
2373  if (td->isView) {
2374  throw std::runtime_error("Insert to views is not supported yet.");
2375  }
2377  query.set_result_table_id(td->tableId);
2378  std::list<int> result_col_list;
2379  if (column_list_.empty()) {
2380  const std::list<const ColumnDescriptor*> all_cols =
2381  catalog.getAllColumnMetadataForTable(td->tableId, false, false, true);
2382  for (auto cd : all_cols) {
2383  result_col_list.push_back(cd->columnId);
2384  }
2385  } else {
2386  for (auto& c : column_list_) {
2387  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2388  if (cd == nullptr) {
2389  throw std::runtime_error("Column " + *c + " does not exist.");
2390  }
2391  result_col_list.push_back(cd->columnId);
2392  const auto& col_ti = cd->columnType;
2393  if (col_ti.get_physical_cols() > 0) {
2394  CHECK(cd->columnType.is_geometry());
2395  for (auto i = 1; i <= col_ti.get_physical_cols(); i++) {
2396  const ColumnDescriptor* pcd =
2397  catalog.getMetadataForColumn(td->tableId, cd->columnId + i);
2398  if (pcd == nullptr) {
2399  throw std::runtime_error("Column " + *c + "'s metadata is incomplete.");
2400  }
2401  result_col_list.push_back(pcd->columnId);
2402  }
2403  }
2404  }
2405  }
2406  query.set_result_col_list(result_col_list);
2407 }
2408 
2409 namespace {
2410 Literal* parse_insert_literal(const rapidjson::Value& literal) {
2411  CHECK(literal.IsObject());
2412  CHECK(literal.HasMember("literal"));
2413  CHECK(literal.HasMember("type"));
2414  auto type = json_str(literal["type"]);
2415  if (type == "NULL") {
2416  return new NullLiteral();
2417  } else if (type == "CHAR" || type == "BOOLEAN") {
2418  auto* val = new std::string(json_str(literal["literal"]));
2419  return new StringLiteral(val);
2420  } else if (type == "DECIMAL") {
2421  CHECK(literal.HasMember("scale"));
2422  CHECK(literal.HasMember("precision"));
2423  auto scale = json_i64(literal["scale"]);
2424  auto precision = json_i64(literal["precision"]);
2425  if (scale == 0) {
2426  auto int_val = std::stol(json_str(literal["literal"]));
2427  return new IntLiteral(int_val);
2428  } else if (precision > sql_constants::kMaxNumericPrecision) {
2429  auto dbl_val = std::stod(json_str(literal["literal"]));
2430  return new DoubleLiteral(dbl_val);
2431  } else {
2432  auto* val = new std::string(json_str(literal["literal"]));
2433  return new FixedPtLiteral(val);
2434  }
2435  } else if (type == "DOUBLE") {
2436  auto dbl_val = std::stod(json_str(literal["literal"]));
2437  return new DoubleLiteral(dbl_val);
2438  } else {
2439  CHECK(false) << "Unexpected calcite data type: " << type;
2440  }
2441  return nullptr;
2442 }
2443 
2444 ArrayLiteral* parse_insert_array_literal(const rapidjson::Value& array) {
2445  CHECK(array.IsArray());
2446  auto json_elements = array.GetArray();
2447  auto* elements = new std::list<Expr*>();
2448  for (const auto& e : json_elements) {
2449  elements->push_back(parse_insert_literal(e));
2450  }
2451  return new ArrayLiteral(elements);
2452 }
2453 } // namespace
2454 
2455 InsertValuesStmt::InsertValuesStmt(const Catalog_Namespace::Catalog& catalog,
2456  const rapidjson::Value& payload)
2457  : InsertStmt(nullptr, nullptr) {
2458  CHECK(payload.HasMember("name"));
2459  table_ = std::make_unique<std::string>(json_str(payload["name"]));
2460 
2461  if (payload.HasMember("columns")) {
2462  CHECK(payload["columns"].IsArray());
2463  for (auto& column : payload["columns"].GetArray()) {
2464  std::string s = json_str(column);
2465  column_list_.emplace_back(std::make_unique<std::string>(s));
2466  }
2467  }
2468 
2469  CHECK(payload.HasMember("values") && payload["values"].IsArray());
2470  auto tuples = payload["values"].GetArray();
2471  if (tuples.Empty()) {
2472  throw std::runtime_error("Values statement cannot be empty");
2473  }
2474  values_lists_.reserve(tuples.Size());
2475  int column_offset = 0;
2476  try {
2477  for (const auto& json_tuple : tuples) {
2478  auto values_list = std::make_unique<ValuesList>();
2479  CHECK(json_tuple.IsArray());
2480  auto tuple = json_tuple.GetArray();
2481  column_offset = 0;
2482  for (const auto& value : tuple) {
2483  CHECK(value.IsObject());
2484  if (value.HasMember("array")) {
2485  values_list->push_back(parse_insert_array_literal(value["array"]));
2486  } else {
2487  values_list->push_back(parse_insert_literal(value));
2488  }
2489  ++column_offset;
2490  }
2491  values_lists_.push_back(std::move(values_list));
2492  }
2493  } catch (std::out_of_range const& e) {
2494  auto* td = catalog.getMetadataForTable(*table_, false);
2495  CHECK(td);
2496  auto cds = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
2497  auto target_col_iter = cds.begin();
2498  std::advance(target_col_iter, column_offset);
2499  auto* cd = *target_col_iter;
2500  CHECK(cd);
2501  auto const col_identifier = td->tableName + "." + cd->columnName;
2502  throw std::runtime_error(
2503  "Detected an out-of-range exception when inserting a value into column \"" +
2504  col_identifier + "\"");
2505  }
2506 }
2507 
2509  Analyzer::Query& query) const {
2510  InsertStmt::analyze(catalog, query);
2511  size_t list_size = values_lists_[0]->get_value_list().size();
2512  if (!column_list_.empty()) {
2513  if (list_size != column_list_.size()) {
2514  throw std::runtime_error(
2515  "Numbers of columns and values don't match for the "
2516  "insert.");
2517  }
2518  } else {
2519  const auto tableId = query.get_result_table_id();
2520  const std::list<const ColumnDescriptor*> non_phys_cols =
2521  catalog.getAllColumnMetadataForTable(tableId, false, false, false);
2522  if (non_phys_cols.size() != list_size) {
2523  throw std::runtime_error(
2524  "Number of columns in table does not match the list of values given in the "
2525  "insert.");
2526  }
2527  }
2528  std::vector<const ColumnDescriptor*> cds;
2529  cds.reserve(query.get_result_col_list().size());
2530  for (auto id : query.get_result_col_list()) {
2531  const auto* cd = catalog.getMetadataForColumn(query.get_result_table_id(), id);
2532  CHECK(cd);
2533  cds.push_back(cd);
2534  }
2535  auto& query_values_lists = query.get_values_lists();
2536  query_values_lists.resize(values_lists_.size());
2537  for (size_t i = 0; i < values_lists_.size(); ++i) {
2538  const auto& values_list = values_lists_[i]->get_value_list();
2539  if (values_list.size() != list_size) {
2540  throw std::runtime_error(
2541  "Insert values lists should be of the same size. Expected: " +
2542  std::to_string(list_size) + ", Got: " + std::to_string(values_list.size()));
2543  }
2544  auto& query_values_list = query_values_lists[i];
2545  size_t cds_id = 0;
2546  for (auto& v : values_list) {
2547  auto e = v->analyze(catalog, query);
2548  const auto* cd = cds[cds_id];
2549  const auto& col_ti = cd->columnType;
2550  if (col_ti.get_notnull()) {
2551  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2552  if (c != nullptr && c->get_is_null()) {
2553  throw std::runtime_error("Cannot insert NULL into column " + cd->columnName);
2554  }
2555  }
2556  e = e->add_cast(col_ti);
2557  query_values_list.emplace_back(new Analyzer::TargetEntry("", e, false));
2558  ++cds_id;
2559 
2560  if (col_ti.get_physical_cols() > 0) {
2561  CHECK(cd->columnType.is_geometry());
2562  auto c = dynamic_cast<const Analyzer::Constant*>(e.get());
2563  if (!c) {
2564  auto uoper = std::dynamic_pointer_cast<Analyzer::UOper>(e);
2565  if (uoper && uoper->get_optype() == kCAST) {
2566  c = dynamic_cast<const Analyzer::Constant*>(uoper->get_operand());
2567  }
2568  }
2569  bool is_null = false;
2570  std::string* geo_string{nullptr};
2571  if (c) {
2572  is_null = c->get_is_null();
2573  if (!is_null) {
2574  geo_string = c->get_constval().stringval;
2575  }
2576  }
2577  if (!is_null && !geo_string) {
2578  throw std::runtime_error("Expecting a WKT or WKB hex string for column " +
2579  cd->columnName);
2580  }
2581  std::vector<double> coords;
2582  std::vector<double> bounds;
2583  std::vector<int> ring_sizes;
2584  std::vector<int> poly_rings;
2585  SQLTypeInfo import_ti{cd->columnType};
2586  if (!is_null) {
2587  const bool validate_with_geos_if_available = false;
2589  *geo_string,
2590  import_ti,
2591  coords,
2592  bounds,
2593  ring_sizes,
2594  poly_rings,
2595  validate_with_geos_if_available)) {
2596  throw std::runtime_error("Cannot read geometry to insert into column " +
2597  cd->columnName);
2598  }
2599  if (coords.empty()) {
2600  // Importing from geo_string WKT resulted in empty coords: dealing with a NULL
2601  is_null = true;
2602  }
2603  if (!geo_promoted_type_match(import_ti.get_type(), cd->columnType.get_type())) {
2604  throw std::runtime_error(
2605  "Imported geometry doesn't match the type of column " + cd->columnName);
2606  }
2607  } else {
2608  // Special case for NULL POINT, push NULL representation to coords
2609  if (cd->columnType.get_type() == kPOINT) {
2610  if (!coords.empty()) {
2611  throw std::runtime_error(
2612  "NULL POINT with unexpected coordinates in column " + cd->columnName);
2613  }
2614  coords.push_back(NULL_ARRAY_DOUBLE);
2615  coords.push_back(NULL_DOUBLE);
2616  }
2617  }
2618 
2619  // TODO: check if import SRID matches columns SRID, may need to transform before
2620  // inserting
2621 
2622  const auto* cd_coords = cds[cds_id];
2623  CHECK_EQ(cd_coords->columnType.get_type(), kARRAY);
2624  CHECK_EQ(cd_coords->columnType.get_subtype(), kTINYINT);
2625  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2626  if (!is_null || cd->columnType.get_type() == kPOINT) {
2627  auto compressed_coords = Geospatial::compress_coords(coords, col_ti);
2628  for (auto cc : compressed_coords) {
2629  Datum d;
2630  d.tinyintval = cc;
2631  auto e = makeExpr<Analyzer::Constant>(kTINYINT, false, d);
2632  value_exprs.push_back(e);
2633  }
2634  }
2635  query_values_list.emplace_back(new Analyzer::TargetEntry(
2636  "",
2637  makeExpr<Analyzer::Constant>(cd_coords->columnType, is_null, value_exprs),
2638  false));
2639  ++cds_id;
2640 
2641  if (cd->columnType.get_type() == kMULTILINESTRING ||
2642  cd->columnType.get_type() == kPOLYGON ||
2643  cd->columnType.get_type() == kMULTIPOLYGON) {
2644  // Put [linest]ring sizes array into separate physical column
2645  const auto* cd_ring_sizes = cds[cds_id];
2646  CHECK(cd_ring_sizes);
2647  CHECK_EQ(cd_ring_sizes->columnType.get_type(), kARRAY);
2648  CHECK_EQ(cd_ring_sizes->columnType.get_subtype(), kINT);
2649  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2650  if (!is_null) {
2651  for (auto c : ring_sizes) {
2652  Datum d;
2653  d.intval = c;
2654  auto e = makeExpr<Analyzer::Constant>(kINT, false, d);
2655  value_exprs.push_back(e);
2656  }
2657  }
2658  query_values_list.emplace_back(new Analyzer::TargetEntry(
2659  "",
2660  makeExpr<Analyzer::Constant>(
2661  cd_ring_sizes->columnType, is_null, value_exprs),
2662  false));
2663  ++cds_id;
2664 
2665  if (cd->columnType.get_type() == kMULTIPOLYGON) {
2666  // Put poly_rings array into separate physical column
2667  const auto* cd_poly_rings = cds[cds_id];
2668  CHECK(cd_poly_rings);
2669  CHECK_EQ(cd_poly_rings->columnType.get_type(), kARRAY);
2670  CHECK_EQ(cd_poly_rings->columnType.get_subtype(), kINT);
2671  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2672  if (!is_null) {
2673  for (auto c : poly_rings) {
2674  Datum d;
2675  d.intval = c;
2676  auto e = makeExpr<Analyzer::Constant>(kINT, false, d);
2677  value_exprs.push_back(e);
2678  }
2679  }
2680  query_values_list.emplace_back(new Analyzer::TargetEntry(
2681  "",
2682  makeExpr<Analyzer::Constant>(
2683  cd_poly_rings->columnType, is_null, value_exprs),
2684  false));
2685  ++cds_id;
2686  }
2687  }
2688 
2689  if (cd->columnType.get_type() == kMULTIPOINT ||
2690  cd->columnType.get_type() == kLINESTRING ||
2691  cd->columnType.get_type() == kMULTILINESTRING ||
2692  cd->columnType.get_type() == kPOLYGON ||
2693  cd->columnType.get_type() == kMULTIPOLYGON) {
2694  const auto* cd_bounds = cds[cds_id];
2695  CHECK(cd_bounds);
2696  CHECK_EQ(cd_bounds->columnType.get_type(), kARRAY);
2697  CHECK_EQ(cd_bounds->columnType.get_subtype(), kDOUBLE);
2698  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2699  if (!is_null) {
2700  for (auto b : bounds) {
2701  Datum d;
2702  d.doubleval = b;
2703  auto e = makeExpr<Analyzer::Constant>(kDOUBLE, false, d);
2704  value_exprs.push_back(e);
2705  }
2706  }
2707  query_values_list.emplace_back(new Analyzer::TargetEntry(
2708  "",
2709  makeExpr<Analyzer::Constant>(cd_bounds->columnType, is_null, value_exprs),
2710  false));
2711  ++cds_id;
2712  }
2713  }
2714  }
2715  }
2716 }
2717 
2719  bool read_only_mode) {
2720  if (read_only_mode) {
2721  throw std::runtime_error("INSERT values invalid in read only mode.");
2722  }
2723  auto execute_read_lock = legacylockmgr::getExecuteReadLock();
2724  auto& catalog = session.getCatalog();
2725  const auto td_with_lock =
2727  catalog, *table_);
2730  *table_)) {
2731  throw std::runtime_error("User has no insert privileges on " + *table_ + ".");
2732  }
2733  Analyzer::Query query;
2734  analyze(catalog, query);
2735 
2736  // Take an insert data write lock, which prevents concurrent inserts.
2737  const auto insert_data_lock =
2739 
2740  // NOTE(max): we do the same checks as below just a few calls earlier in analyze().
2741  // Do we keep those intentionally to make sure nothing changed in between w/o
2742  // catalog locks or is it just a duplicate work?
2743  auto td = td_with_lock();
2744  CHECK(td);
2745  if (td->isView) {
2746  throw std::runtime_error("Singleton inserts on views is not supported.");
2747  }
2749 
2751  RelAlgExecutor ra_executor(executor.get());
2752 
2753  if (!leafs_connector_) {
2754  leafs_connector_ = std::make_unique<Fragmenter_Namespace::LocalInsertConnector>();
2755  }
2757  try {
2758  ra_executor.executeSimpleInsert(query, insert_data_loader, session);
2759  } catch (...) {
2760  try {
2761  leafs_connector_->rollback(session, td->tableId);
2762  } catch (std::exception& e) {
2763  LOG(ERROR) << "An error occurred during insert rollback attempt. Table id: "
2764  << td->tableId << ", Error: " << e.what();
2765  }
2766  throw;
2767  }
2768  if (!td->isTemporaryTable()) {
2769  leafs_connector_->checkpoint(session, td->tableId);
2770  }
2771 }
2772 
2774  Analyzer::Query& query) const {
2775  throw std::runtime_error("UPDATE statement not supported yet.");
2776 }
2777 
2779  Analyzer::Query& query) const {
2780  throw std::runtime_error("DELETE statement not supported yet.");
2781 }
2782 
2783 namespace {
2784 
2786  const auto& col_ti = cd.columnType;
2787  if (!col_ti.is_integer() && !col_ti.is_time() &&
2788  !(col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT)) {
2789  throw std::runtime_error("Cannot shard on type " + col_ti.get_type_name() +
2790  ", encoding " + col_ti.get_compression_name());
2791  }
2792 }
2793 
2794 size_t shard_column_index(const std::string& name,
2795  const std::list<ColumnDescriptor>& columns) {
2796  size_t index = 1;
2797  for (const auto& cd : columns) {
2798  if (cd.columnName == name) {
2800  return index;
2801  }
2802  ++index;
2803  if (cd.columnType.is_geometry()) {
2804  index += cd.columnType.get_physical_cols();
2805  }
2806  }
2807  // Not found, return 0
2808  return 0;
2809 }
2810 
2811 size_t sort_column_index(const std::string& name,
2812  const std::list<ColumnDescriptor>& columns) {
2813  size_t index = 1;
2814  for (const auto& cd : columns) {
2815  if (boost::to_upper_copy<std::string>(cd.columnName) == name) {
2816  return index;
2817  }
2818  ++index;
2819  if (cd.columnType.is_geometry()) {
2820  index += cd.columnType.get_physical_cols();
2821  }
2822  }
2823  // Not found, return 0
2824  return 0;
2825 }
2826 
2827 void set_string_field(rapidjson::Value& obj,
2828  const std::string& field_name,
2829  const std::string& field_value,
2830  rapidjson::Document& document) {
2831  rapidjson::Value field_name_json_str;
2832  field_name_json_str.SetString(
2833  field_name.c_str(), field_name.size(), document.GetAllocator());
2834  rapidjson::Value field_value_json_str;
2835  field_value_json_str.SetString(
2836  field_value.c_str(), field_value.size(), document.GetAllocator());
2837  obj.AddMember(field_name_json_str, field_value_json_str, document.GetAllocator());
2838 }
2839 
2841  const ShardKeyDef* shard_key_def,
2842  const std::vector<SharedDictionaryDef>& shared_dict_defs) {
2843  rapidjson::Document document;
2844  auto& allocator = document.GetAllocator();
2845  rapidjson::Value arr(rapidjson::kArrayType);
2846  if (shard_key_def) {
2847  rapidjson::Value shard_key_obj(rapidjson::kObjectType);
2848  set_string_field(shard_key_obj, "type", "SHARD KEY", document);
2849  set_string_field(shard_key_obj, "name", shard_key_def->get_column(), document);
2850  arr.PushBack(shard_key_obj, allocator);
2851  }
2852  for (const auto& shared_dict_def : shared_dict_defs) {
2853  rapidjson::Value shared_dict_obj(rapidjson::kObjectType);
2854  set_string_field(shared_dict_obj, "type", "SHARED DICTIONARY", document);
2855  set_string_field(shared_dict_obj, "name", shared_dict_def.get_column(), document);
2857  shared_dict_obj, "foreign_table", shared_dict_def.get_foreign_table(), document);
2858  set_string_field(shared_dict_obj,
2859  "foreign_column",
2860  shared_dict_def.get_foreign_column(),
2861  document);
2862  arr.PushBack(shared_dict_obj, allocator);
2863  }
2864  rapidjson::StringBuffer buffer;
2865  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
2866  arr.Accept(writer);
2867  return buffer.GetString();
2868 }
2869 
2870 template <typename LITERAL_TYPE,
2871  typename ASSIGNMENT,
2872  typename VALIDATE = DefaultValidate<LITERAL_TYPE>>
2873 decltype(auto) get_property_value(const NameValueAssign* p,
2874  ASSIGNMENT op,
2875  VALIDATE validate = VALIDATE()) {
2876  const auto val = validate(p);
2877  return op(val);
2878 }
2879 
2881  const NameValueAssign* p,
2882  const std::list<ColumnDescriptor>& columns) {
2883  auto assignment = [&td](const auto val) { td.storageType = val; };
2884  return get_property_value<StringLiteral, decltype(assignment), CaseSensitiveValidate>(
2885  p, assignment);
2886 }
2887 
2889  const NameValueAssign* p,
2890  const std::list<ColumnDescriptor>& columns) {
2891  return get_property_value<IntLiteral>(p, [&td](const auto val) {
2892  td.maxFragRows = validate_and_get_fragment_size(std::to_string(val));
2893  });
2894 }
2895 
2897  const NameValueAssign* p,
2898  const std::list<ColumnDescriptor>& columns) {
2899  return get_property_value<IntLiteral>(
2900  p, [&df_td](const auto val) { df_td.maxFragRows = val; });
2901 }
2902 
2904  const NameValueAssign* p,
2905  const std::list<ColumnDescriptor>& columns) {
2906  return get_property_value<IntLiteral>(p,
2907  [&td](const auto val) { td.maxChunkSize = val; });
2908 }
2909 
2911  DataframeTableDescriptor& df_td,
2912  const NameValueAssign* p,
2913  const std::list<ColumnDescriptor>& columns) {
2914  return get_property_value<IntLiteral>(
2915  p, [&df_td](const auto val) { df_td.maxChunkSize = val; });
2916 }
2917 
2919  const NameValueAssign* p,
2920  const std::list<ColumnDescriptor>& columns) {
2921  return get_property_value<StringLiteral>(p, [&df_td](const auto val) {
2922  if (val.size() != 1) {
2923  throw std::runtime_error("Length of DELIMITER must be equal to 1.");
2924  }
2925  df_td.delimiter = val;
2926  });
2927 }
2928 
2930  const NameValueAssign* p,
2931  const std::list<ColumnDescriptor>& columns) {
2932  return get_property_value<StringLiteral>(p, [&df_td](const auto val) {
2933  if (val == "FALSE") {
2934  df_td.hasHeader = false;
2935  } else if (val == "TRUE") {
2936  df_td.hasHeader = true;
2937  } else {
2938  throw std::runtime_error("Option HEADER support only 'true' or 'false' values.");
2939  }
2940  });
2941 }
2942 
2944  const NameValueAssign* p,
2945  const std::list<ColumnDescriptor>& columns) {
2946  return get_property_value<IntLiteral>(p, [&td](const auto val) {
2947  const auto min_page_size = File_Namespace::FileBuffer::getMinPageSize();
2948  if (val < min_page_size) {
2949  throw std::runtime_error("page_size cannot be less than " +
2950  std::to_string(min_page_size));
2951  }
2952  td.fragPageSize = val;
2953  });
2954 }
2956  const NameValueAssign* p,
2957  const std::list<ColumnDescriptor>& columns) {
2958  return get_property_value<IntLiteral>(p, [&td](const auto val) { td.maxRows = val; });
2959 }
2960 
2962  const NameValueAssign* p,
2963  const std::list<ColumnDescriptor>& columns) {
2964  return get_property_value<IntLiteral>(
2965  p, [&df_td](const auto val) { df_td.skipRows = val; });
2966 }
2967 
2969  const NameValueAssign* p,
2970  const std::list<ColumnDescriptor>& columns) {
2971  return get_property_value<StringLiteral>(p, [&td](const auto partitions_uc) {
2972  if (partitions_uc != "SHARDED" && partitions_uc != "REPLICATED") {
2973  throw std::runtime_error("PARTITIONS must be SHARDED or REPLICATED");
2974  }
2975  if (td.shardedColumnId != 0 && partitions_uc == "REPLICATED") {
2976  throw std::runtime_error(
2977  "A table cannot be sharded and replicated at the same time");
2978  };
2979  td.partitions = partitions_uc;
2980  });
2981 }
2983  const NameValueAssign* p,
2984  const std::list<ColumnDescriptor>& columns) {
2985  if (!td.shardedColumnId) {
2986  throw std::runtime_error("SHARD KEY must be defined.");
2987  }
2988  return get_property_value<IntLiteral>(p, [&td](const auto shard_count) {
2989  if (g_leaf_count && shard_count % g_leaf_count) {
2990  throw std::runtime_error(
2991  "SHARD_COUNT must be a multiple of the number of leaves in the cluster.");
2992  }
2993  td.nShards = g_leaf_count ? shard_count / g_leaf_count : shard_count;
2994  if (!td.shardedColumnId && !td.nShards) {
2995  throw std::runtime_error(
2996  "Must specify the number of shards through the SHARD_COUNT option");
2997  };
2998  });
2999 }
3000 
3001 decltype(auto) get_vacuum_def(TableDescriptor& td,
3002  const NameValueAssign* p,
3003  const std::list<ColumnDescriptor>& columns) {
3004  return get_property_value<StringLiteral>(p, [&td](const auto vacuum_uc) {
3005  if (vacuum_uc != "IMMEDIATE" && vacuum_uc != "DELAYED") {
3006  throw std::runtime_error("VACUUM must be IMMEDIATE or DELAYED");
3007  }
3008  td.hasDeletedCol = boost::iequals(vacuum_uc, "IMMEDIATE") ? false : true;
3009  });
3010 }
3011 
3013  const NameValueAssign* p,
3014  const std::list<ColumnDescriptor>& columns) {
3015  return get_property_value<StringLiteral>(p, [&td, &columns](const auto sort_upper) {
3016  td.sortedColumnId = sort_column_index(sort_upper, columns);
3017  if (!td.sortedColumnId) {
3018  throw std::runtime_error("Specified sort column " + sort_upper + " doesn't exist");
3019  }
3020  });
3021 }
3022 
3024  const NameValueAssign* p,
3025  const std::list<ColumnDescriptor>& columns) {
3026  auto assignment = [&td](const auto val) {
3027  td.maxRollbackEpochs =
3028  val < 0 ? -1 : val; // Anything < 0 means unlimited rollbacks. Note that 0
3029  // still means keeping a shadow copy of data/metdata
3030  // between epochs so bad writes can be rolled back
3031  };
3032  return get_property_value<IntLiteral, decltype(assignment), PositiveOrZeroValidate>(
3033  p, assignment);
3034 }
3035 
3036 static const std::map<const std::string, const TableDefFuncPtr> tableDefFuncMap = {
3037  {"fragment_size"s, get_frag_size_def},
3038  {"max_chunk_size"s, get_max_chunk_size_def},
3039  {"page_size"s, get_page_size_def},
3040  {"max_rows"s, get_max_rows_def},
3041  {"partitions"s, get_partions_def},
3042  {"shard_count"s, get_shard_count_def},
3043  {"vacuum"s, get_vacuum_def},
3044  {"sort_column"s, get_sort_column_def},
3045  {"storage_type"s, get_storage_type},
3046  {"max_rollback_epochs", get_max_rollback_epochs_def}};
3047 
3049  const std::unique_ptr<NameValueAssign>& p,
3050  const std::list<ColumnDescriptor>& columns) {
3051  const auto it = tableDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
3052  if (it == tableDefFuncMap.end()) {
3053  throw std::runtime_error(
3054  "Invalid CREATE TABLE option " + *p->get_name() +
3055  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, PAGE_SIZE, MAX_ROLLBACK_EPOCHS, "
3056  "MAX_ROWS, "
3057  "PARTITIONS, SHARD_COUNT, VACUUM, SORT_COLUMN, STORAGE_TYPE.");
3058  }
3059  return it->second(td, p.get(), columns);
3060 }
3061 
3063  const std::unique_ptr<NameValueAssign>& p,
3064  const std::list<ColumnDescriptor>& columns) {
3065  const auto it = tableDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
3066  if (it == tableDefFuncMap.end()) {
3067  throw std::runtime_error(
3068  "Invalid CREATE TABLE AS option " + *p->get_name() +
3069  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, PAGE_SIZE, MAX_ROLLBACK_EPOCHS, "
3070  "MAX_ROWS, "
3071  "PARTITIONS, SHARD_COUNT, VACUUM, SORT_COLUMN, STORAGE_TYPE, "
3072  "USE_SHARED_DICTIONARIES or FORCE_GEO_COMPRESSION.");
3073  }
3074  return it->second(td, p.get(), columns);
3075 }
3076 
3077 static const std::map<const std::string, const DataframeDefFuncPtr> dataframeDefFuncMap =
3078  {{"fragment_size"s, get_frag_size_dataframe_def},
3079  {"max_chunk_size"s, get_max_chunk_size_dataframe_def},
3080  {"skip_rows"s, get_skip_rows_def},
3081  {"delimiter"s, get_delimiter_def},
3082  {"header"s, get_header_def}};
3083 
3085  const std::unique_ptr<NameValueAssign>& p,
3086  const std::list<ColumnDescriptor>& columns) {
3087  const auto it =
3088  dataframeDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
3089  if (it == dataframeDefFuncMap.end()) {
3090  throw std::runtime_error(
3091  "Invalid CREATE DATAFRAME option " + *p->get_name() +
3092  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, SKIP_ROWS, DELIMITER or HEADER.");
3093  }
3094  return it->second(df_td, p.get(), columns);
3095 }
3096 
3097 void parse_elements(const rapidjson::Value& payload,
3098  std::string element_name,
3099  std::string& table_name,
3100  std::list<std::unique_ptr<TableElement>>& table_element_list) {
3101  const auto elements = payload[element_name].GetArray();
3102  for (const auto& element : elements) {
3103  CHECK(element.IsObject());
3104  CHECK(element.HasMember("type"));
3105  if (json_str(element["type"]) == "SQL_COLUMN_DECLARATION") {
3106  auto col_def = column_from_json(element);
3107  table_element_list.emplace_back(std::move(col_def));
3108  } else if (json_str(element["type"]) == "SQL_COLUMN_CONSTRAINT") {
3109  CHECK(element.HasMember("name"));
3110  if (json_str(element["name"]) == "SHARD_KEY") {
3111  CHECK(element.HasMember("columns"));
3112  CHECK(element["columns"].IsArray());
3113  const auto& columns = element["columns"].GetArray();
3114  if (columns.Size() != size_t(1)) {
3115  throw std::runtime_error("Only one shard column is currently supported.");
3116  }
3117  auto shard_key_def = std::make_unique<ShardKeyDef>(json_str(columns[0]));
3118  table_element_list.emplace_back(std::move(shard_key_def));
3119  } else if (json_str(element["name"]) == "SHARED_DICT") {
3120  CHECK(element.HasMember("columns"));
3121  CHECK(element["columns"].IsArray());
3122  const auto& columns = element["columns"].GetArray();
3123  if (columns.Size() != size_t(1)) {
3124  throw std::runtime_error(
3125  R"(Only one column per shared dictionary entry is currently supported. Use multiple SHARED DICT statements to share dictionaries from multiple columns.)");
3126  }
3127  CHECK(element.HasMember("references") && element["references"].IsObject());
3128  const auto& references = element["references"].GetObject();
3129  std::string references_table_name;
3130  if (references.HasMember("table")) {
3131  references_table_name = json_str(references["table"]);
3132  } else {
3133  references_table_name = table_name;
3134  }
3135  CHECK(references.HasMember("column"));
3136 
3137  auto shared_dict_def = std::make_unique<SharedDictionaryDef>(
3138  json_str(columns[0]), references_table_name, json_str(references["column"]));
3139  table_element_list.emplace_back(std::move(shared_dict_def));
3140 
3141  } else {
3142  LOG(FATAL) << "Unsupported type for SQL_COLUMN_CONSTRAINT: "
3143  << json_str(element["name"]);
3144  }
3145  } else {
3146  LOG(FATAL) << "Unsupported element type for CREATE TABLE: "
3147  << element["type"].GetString();
3148  }
3149  }
3150 }
3151 } // namespace
3152 
3153 std::unique_ptr<ColumnDef> column_from_json(const rapidjson::Value& element) {
3154  CHECK(element.HasMember("name"));
3155  auto col_name = std::make_unique<std::string>(json_str(element["name"]));
3156  CHECK(element.HasMember("sqltype"));
3157  const auto sql_types = to_sql_type(json_str(element["sqltype"]));
3158 
3159  // decimal / numeric precision / scale
3160  int precision = -1;
3161  int scale = -1;
3162  if (element.HasMember("precision")) {
3163  precision = json_i64(element["precision"]);
3164  }
3165  if (element.HasMember("scale")) {
3166  scale = json_i64(element["scale"]);
3167  }
3168 
3169  std::optional<int64_t> array_size;
3170  if (element.HasMember("arraySize")) {
3171  // We do not yet support geo arrays
3172  array_size = json_i64(element["arraySize"]);
3173  }
3174  std::unique_ptr<SQLType> sql_type;
3175  if (element.HasMember("subtype")) {
3176  CHECK(element.HasMember("coordinateSystem"));
3177  const auto subtype_sql_types = to_sql_type(json_str(element["subtype"]));
3178  sql_type =
3179  std::make_unique<SQLType>(subtype_sql_types,
3180  static_cast<int>(sql_types),
3181  static_cast<int>(json_i64(element["coordinateSystem"])),
3182  false);
3183  } else if (precision > 0 && scale > 0) {
3184  sql_type = std::make_unique<SQLType>(sql_types,
3185  precision,
3186  scale,
3187  /*is_array=*/array_size.has_value(),
3188  array_size ? *array_size : -1);
3189  } else if (precision > 0) {
3190  sql_type = std::make_unique<SQLType>(sql_types,
3191  precision,
3192  0,
3193  /*is_array=*/array_size.has_value(),
3194  array_size ? *array_size : -1);
3195  } else {
3196  sql_type = std::make_unique<SQLType>(sql_types,
3197  /*is_array=*/array_size.has_value(),
3198  array_size ? *array_size : -1);
3199  }
3200  CHECK(sql_type);
3201 
3202  CHECK(element.HasMember("nullable"));
3203  const auto nullable = json_bool(element["nullable"]);
3204  std::unique_ptr<ColumnConstraintDef> constraint_def;
3205  StringLiteral* str_literal = nullptr;
3206  if (element.HasMember("default") && !element["default"].IsNull()) {
3207  std::string* defaultval = new std::string(json_str(element["default"]));
3208  boost::algorithm::trim_if(*defaultval, boost::is_any_of(" \"'`"));
3209  str_literal = new StringLiteral(defaultval);
3210  }
3211 
3212  constraint_def = std::make_unique<ColumnConstraintDef>(/*notnull=*/!nullable,
3213  /*unique=*/false,
3214  /*primarykey=*/false,
3215  /*defaultval=*/str_literal);
3216  std::unique_ptr<CompressDef> compress_def;
3217  if (element.HasMember("encodingType") && !element["encodingType"].IsNull()) {
3218  std::string encoding_type = json_str(element["encodingType"]);
3219  CHECK(element.HasMember("encodingSize"));
3220  auto encoding_name = std::make_unique<std::string>(json_str(element["encodingType"]));
3221  compress_def = std::make_unique<CompressDef>(encoding_name.release(),
3222  json_i64(element["encodingSize"]));
3223  }
3224  return std::make_unique<ColumnDef>(col_name.release(),
3225  sql_type.release(),
3226  compress_def ? compress_def.release() : nullptr,
3227  constraint_def ? constraint_def.release() : nullptr);
3228 }
3229 
3230 std::list<ColumnDef> get_columns_from_json_payload(const std::string& payload_key,
3231  const rapidjson::Value& payload) {
3232  std::list<ColumnDef> table_element_list;
3233  CHECK(payload[payload_key].IsArray());
3234 
3235  const auto elements = payload[payload_key].GetArray();
3236  for (const auto& element : elements) {
3237  CHECK(element.IsObject());
3238  CHECK(element.HasMember("type"));
3239  if (json_str(element["type"]) == "SQL_COLUMN_DECLARATION") {
3240  auto col_def = column_from_json(element);
3241  table_element_list.emplace_back(std::move(*col_def));
3242  } else {
3243  LOG(FATAL) << "Unsupported element type for ALTER TABLE: "
3244  << element["type"].GetString();
3245  }
3246  }
3247  return table_element_list;
3248 }
3249 
3250 CreateTableStmt::CreateTableStmt(const rapidjson::Value& payload) {
3251  CHECK(payload.HasMember("name"));
3252  table_ = std::make_unique<std::string>(json_str(payload["name"]));
3253  CHECK(payload.HasMember("elements"));
3254  CHECK(payload["elements"].IsArray());
3255 
3256  is_temporary_ = false;
3257  if (payload.HasMember("temporary")) {
3258  is_temporary_ = json_bool(payload["temporary"]);
3259  }
3260 
3261  if_not_exists_ = false;
3262  if (payload.HasMember("ifNotExists")) {
3263  if_not_exists_ = json_bool(payload["ifNotExists"]);
3264  }
3265 
3266  parse_elements(payload, "elements", *table_, table_element_list_);
3267 
3268  parse_options(payload, storage_options_);
3269 }
3270 
3272  TableDescriptor& td,
3273  std::list<ColumnDescriptor>& columns,
3274  std::vector<SharedDictionaryDef>& shared_dict_defs) {
3275  std::unordered_set<std::string> uc_col_names;
3276  const auto& catalog = session.getCatalog();
3277  const ShardKeyDef* shard_key_def{nullptr};
3278  for (auto& e : table_element_list_) {
3279  if (dynamic_cast<SharedDictionaryDef*>(e.get())) {
3280  auto shared_dict_def = static_cast<SharedDictionaryDef*>(e.get());
3282  this, shared_dict_def, columns, shared_dict_defs, catalog);
3283  shared_dict_defs.push_back(*shared_dict_def);
3284  continue;
3285  }
3286  if (dynamic_cast<ShardKeyDef*>(e.get())) {
3287  if (shard_key_def) {
3288  throw std::runtime_error("Specified more than one shard key");
3289  }
3290  shard_key_def = static_cast<const ShardKeyDef*>(e.get());
3291  continue;
3292  }
3293  if (!dynamic_cast<ColumnDef*>(e.get())) {
3294  throw std::runtime_error("Table constraints are not supported yet.");
3295  }
3296  ColumnDef* coldef = static_cast<ColumnDef*>(e.get());
3297  ColumnDescriptor cd;
3298  cd.columnName = *coldef->get_column_name();
3300  setColumnDescriptor(cd, coldef);
3301  columns.push_back(cd);
3302  }
3303 
3304  ddl_utils::set_default_table_attributes(*table_, td, columns.size());
3305 
3306  if (shard_key_def) {
3307  td.shardedColumnId = shard_column_index(shard_key_def->get_column(), columns);
3308  if (!td.shardedColumnId) {
3309  throw std::runtime_error("Specified shard column " + shard_key_def->get_column() +
3310  " doesn't exist");
3311  }
3312  }
3313  if (is_temporary_) {
3315  } else {
3317  }
3318  if (!storage_options_.empty()) {
3319  for (auto& p : storage_options_) {
3320  get_table_definitions(td, p, columns);
3321  }
3322  }
3323  if (td.shardedColumnId && !td.nShards) {
3324  throw std::runtime_error("SHARD_COUNT needs to be specified with SHARD_KEY.");
3325  }
3326  td.keyMetainfo = serialize_key_metainfo(shard_key_def, shared_dict_defs);
3327 }
3328 
3330  bool read_only_mode) {
3331  if (read_only_mode) {
3332  throw std::runtime_error("CREATE TABLE invalid in read only mode.");
3333  }
3334  auto& catalog = session.getCatalog();
3335 
3336  // Until we create the table we don't have a table descriptor to lock and guarantee
3337  // exclusive use of. Because of that we need a global write lock to make sure we have
3338  // exclusive access to the system for now.
3339  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
3340 
3341  // check access privileges
3344  throw std::runtime_error("Table " + *table_ +
3345  " will not be created. User has no create privileges.");
3346  }
3347 
3348  if (!catalog.validateNonExistentTableOrView(*table_, if_not_exists_)) {
3349  return;
3350  }
3351 
3352  TableDescriptor td;
3353  std::list<ColumnDescriptor> columns;
3354  std::vector<SharedDictionaryDef> shared_dict_defs;
3355 
3356  executeDryRun(session, td, columns, shared_dict_defs);
3357  td.userId = session.get_currentUser().userId;
3358 
3359  catalog.createShardedTable(td, columns, shared_dict_defs);
3360  // TODO (max): It's transactionally unsafe, should be fixed: we may create object w/o
3361  // privileges
3362  SysCatalog::instance().createDBObject(
3363  session.get_currentUser(), td.tableName, TableDBObjectType, catalog);
3364 }
3365 
3366 CreateDataframeStmt::CreateDataframeStmt(const rapidjson::Value& payload) {
3367  CHECK(payload.HasMember("name"));
3368  table_ = std::make_unique<std::string>(json_str(payload["name"]));
3369 
3370  CHECK(payload.HasMember("elementList"));
3371  parse_elements(payload, "elementList", *table_, table_element_list_);
3372 
3373  CHECK(payload.HasMember("filePath"));
3374  std::string fs = json_str(payload["filePath"]);
3375  // strip leading/trailing spaces/quotes/single quotes
3376  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
3377  filename_ = std::make_unique<std::string>(fs);
3378 
3379  parse_options(payload, storage_options_);
3380 }
3381 
3383  bool read_only_mode) {
3384  if (read_only_mode) {
3385  throw std::runtime_error("CREATE DATAFRAME invalid in read only mode.");
3386  }
3387  auto& catalog = session.getCatalog();
3388 
3389  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
3390 
3391  // check access privileges
3394  throw std::runtime_error("Table " + *table_ +
3395  " will not be created. User has no create privileges.");
3396  }
3397 
3398  if (catalog.getMetadataForTable(*table_) != nullptr) {
3399  throw std::runtime_error("Table " + *table_ + " already exists.");
3400  }
3402  std::list<ColumnDescriptor> columns;
3403  std::vector<SharedDictionaryDef> shared_dict_defs;
3404 
3405  std::unordered_set<std::string> uc_col_names;
3406  for (auto& e : table_element_list_) {
3407  if (dynamic_cast<SharedDictionaryDef*>(e.get())) {
3408  auto shared_dict_def = static_cast<SharedDictionaryDef*>(e.get());
3410  this, shared_dict_def, columns, shared_dict_defs, catalog);
3411  shared_dict_defs.push_back(*shared_dict_def);
3412  continue;
3413  }
3414  if (!dynamic_cast<ColumnDef*>(e.get())) {
3415  throw std::runtime_error("Table constraints are not supported yet.");
3416  }
3417  ColumnDef* coldef = static_cast<ColumnDef*>(e.get());
3418  ColumnDescriptor cd;
3419  cd.columnName = *coldef->get_column_name();
3420  const auto uc_col_name = boost::to_upper_copy<std::string>(cd.columnName);
3421  const auto it_ok = uc_col_names.insert(uc_col_name);
3422  if (!it_ok.second) {
3423  throw std::runtime_error("Column '" + cd.columnName + "' defined more than once");
3424  }
3425  setColumnDescriptor(cd, coldef);
3426  columns.push_back(cd);
3427  }
3428 
3429  df_td.tableName = *table_;
3430  df_td.nColumns = columns.size();
3431  df_td.isView = false;
3432  df_td.fragmenter = nullptr;
3437  df_td.maxRows = DEFAULT_MAX_ROWS;
3439  if (!storage_options_.empty()) {
3440  for (auto& p : storage_options_) {
3441  get_dataframe_definitions(df_td, p, columns);
3442  }
3443  }
3444  df_td.keyMetainfo = serialize_key_metainfo(nullptr, shared_dict_defs);
3445  df_td.userId = session.get_currentUser().userId;
3446  df_td.storageType = *filename_;
3447 
3448  catalog.createShardedTable(df_td, columns, shared_dict_defs);
3449  // TODO (max): It's transactionally unsafe, should be fixed: we may create object w/o
3450  // privileges
3451  SysCatalog::instance().createDBObject(
3452  session.get_currentUser(), df_td.tableName, TableDBObjectType, catalog);
3453 }
3454 
3455 CreateModelStmt::CreateModelStmt(const rapidjson::Value& payload) {
3456  if (!g_enable_ml_functions) {
3457  throw std::runtime_error("Cannot create model. ML functions are disabled.");
3458  }
3459  CHECK(payload.HasMember("name"));
3460  const std::string model_type_str = json_str(payload["type"]);
3461  model_type_ = get_ml_model_type_from_str(model_type_str);
3462  model_name_ = json_str(payload["name"]);
3463  replace_ = false;
3464  if (payload.HasMember("replace")) {
3465  replace_ = json_bool(payload["replace"]);
3466  }
3467 
3468  if_not_exists_ = false;
3469  if (payload.HasMember("ifNotExists")) {
3470  if_not_exists_ = json_bool(payload["ifNotExists"]);
3471  }
3472 
3473  CHECK(payload.HasMember("query"));
3474  select_query_ = json_str(payload["query"]);
3475  std::regex newline_re("\\n");
3476  std::regex backtick_re("`");
3477  select_query_ = std::regex_replace(select_query_, newline_re, " ");
3478  select_query_ = std::regex_replace(select_query_, backtick_re, "");
3479 
3480  // No need to ensure trailing semicolon as we will wrap this select statement
3481  // in a CURSOR as input to the train model table function
3482  parse_options(payload, model_options_);
3483 }
3484 
3485 std::string write_model_params_to_json(const std::string& predicted,
3486  const std::vector<std::string>& features,
3487  const std::string& training_query,
3488  const double data_split_train_fraction,
3489  const double data_split_eval_fraction,
3490  const std::vector<int64_t>& feature_permutations) {
3491  // Create a RapidJSON document
3492  rapidjson::Document doc;
3493  doc.SetObject();
3494 
3495  // Add the fields to the document
3496  rapidjson::Value predicted_value;
3497  predicted_value.SetString(predicted.c_str(), predicted.length(), doc.GetAllocator());
3498  doc.AddMember("predicted", predicted_value, doc.GetAllocator());
3499 
3500  rapidjson::Value features_array(rapidjson::kArrayType);
3501  for (const auto& feature : features) {
3502  rapidjson::Value feature_value;
3503  feature_value.SetString(feature.c_str(), feature.length(), doc.GetAllocator());
3504  features_array.PushBack(feature_value, doc.GetAllocator());
3505  }
3506  doc.AddMember("features", features_array, doc.GetAllocator());
3507 
3508  rapidjson::Value training_query_value;
3509  training_query_value.SetString(
3510  training_query.c_str(), training_query.length(), doc.GetAllocator());
3511  doc.AddMember("training_query", training_query_value, doc.GetAllocator());
3512 
3513  rapidjson::Value data_split_train_fraction_key("data_split_train_fraction",
3514  doc.GetAllocator());
3515 
3516  rapidjson::Value data_split_train_fraction_value(data_split_train_fraction);
3517 
3518  doc.AddMember(
3519  data_split_train_fraction_key, data_split_train_fraction_value, doc.GetAllocator());
3520 
3521  rapidjson::Value data_split_eval_fraction_key("data_split_eval_fraction",
3522  doc.GetAllocator());
3523 
3524  rapidjson::Value data_split_eval_fraction_value(data_split_eval_fraction);
3525 
3526  doc.AddMember(
3527  data_split_eval_fraction_key, data_split_eval_fraction_value, doc.GetAllocator());
3528 
3529  rapidjson::Value feature_permutations_array(rapidjson::kArrayType);
3530  for (const auto& feature_permutation : feature_permutations) {
3531  rapidjson::Value feature_permutation_value;
3532  feature_permutation_value.SetInt64(feature_permutation);
3533  feature_permutations_array.PushBack(feature_permutation_value, doc.GetAllocator());
3534  }
3535  doc.AddMember("feature_permutations", feature_permutations_array, doc.GetAllocator());
3536 
3537  // Convert the document to a JSON string
3538  rapidjson::StringBuffer buffer;
3539  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
3540  doc.Accept(writer);
3541 
3542  return buffer.GetString();
3543 }
3544 
3547  if (if_not_exists_) {
3548  // Returning true tells the caller we should just return early and silently (without
3549  // error)
3550  return true;
3551  }
3552  if (!replace_) {
3553  std::ostringstream error_oss;
3554  error_oss << "Model " << get_model_name() << " already exists.";
3555  throw std::runtime_error(error_oss.str());
3556  }
3557  }
3558  // Returning false tells the caller all is clear to proceed with the create model,
3559  // whether that means creating a new one or overwriting an existing model
3560  return false;
3561 }
3562 
3564  bool train_fraction_specified = false;
3565  bool eval_fraction_specified = false;
3566  for (auto& p : model_options_) {
3567  const auto key = boost::to_lower_copy<std::string>(*p->get_name());
3568  if (key == "train_fraction" || key == "data_split_train_fraction") {
3569  if (train_fraction_specified) {
3570  throw std::runtime_error(
3571  "Error parsing DATA_SPLIT_TRAIN_FRACTION value. "
3572  "Expected only one value.");
3573  }
3574  const DoubleLiteral* fp_literal =
3575  dynamic_cast<const DoubleLiteral*>(p->get_value());
3576  if (fp_literal != nullptr) {
3577  data_split_train_fraction_ = fp_literal->get_doubleval();
3578  if (data_split_train_fraction_ <= 0.0 || data_split_train_fraction_ > 1.0) {
3579  throw std::runtime_error(
3580  "Error parsing DATA_SPLIT_TRAIN_FRACTION value. "
3581  "Expected value between 0.0 and 1.0.");
3582  }
3583  } else {
3584  throw std::runtime_error(
3585  "Error parsing DATA_SPLIT_TRAIN_FRACTION value. "
3586  "Expected floating point value betwen 0.0 and 1.0.");
3587  }
3588  train_fraction_specified = true;
3589  continue;
3590  }
3591  if (key == "eval_fraction" || key == "data_split_eval_fraction") {
3592  if (eval_fraction_specified) {
3593  throw std::runtime_error(
3594  "Error parsing DATA_SPLIT_EVAL_FRACTION value. "
3595  "Expected only one value.");
3596  }
3597  const DoubleLiteral* fp_literal =
3598  dynamic_cast<const DoubleLiteral*>(p->get_value());
3599  if (fp_literal != nullptr) {
3600  data_split_eval_fraction_ = fp_literal->get_doubleval();
3601  if (data_split_eval_fraction_ < 0.0 || data_split_eval_fraction_ >= 1.0) {
3602  throw std::runtime_error(
3603  "Error parsing DATA_SPLIT_EVAL_FRACTION value. "
3604  "Expected value between 0.0 and 1.0.");
3605  }
3606  } else {
3607  throw std::runtime_error(
3608  "Error parsing DATA_SPLIT_EVAL_FRACTION value. "
3609  "Expected floating point value betwen 0.0 and 1.0.");
3610  }
3611  eval_fraction_specified = true;
3612  continue;
3613  }
3614  if (num_options_) {
3615  options_oss_ << ", ";
3616  }
3617  num_options_++;
3618  options_oss_ << key << " => ";
3619  const StringLiteral* str_literal = dynamic_cast<const StringLiteral*>(p->get_value());
3620  if (str_literal != nullptr) {
3621  options_oss_ << "'"
3622  << boost::to_lower_copy<std::string>(*str_literal->get_stringval())
3623  << "'";
3624  continue;
3625  }
3626  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
3627  if (int_literal != nullptr) {
3628  options_oss_ << int_literal->get_intval();
3629  continue;
3630  }
3631  const DoubleLiteral* fp_literal = dynamic_cast<const DoubleLiteral*>(p->get_value());
3632  if (fp_literal != nullptr) {
3633  options_oss_ << fp_literal->get_doubleval();
3634  continue;
3635  }
3636  throw std::runtime_error("Error parsing value.");
3637  }
3638 
3639  // First handle case where data_split_train_fraction was left to default value
3640  // and data_split_eval_fraction was specified. We shouldn't error here,
3641  // but rather set data_split_train_fraction to 1.0 - data_split_eval_fraction
3642  // Likewise if data_split_eval_fraction was left to default value and we have
3643  // a specified data_split_train_fraction, we should set data_split_eval_fraction
3644  // to 1.0 - data_split_train_fraction
3647  } else if (data_split_eval_fraction_ == 0.0 && data_split_train_fraction_ < 1.0) {
3649  }
3650 
3651  // If data_split_train_fraction was specified, and data_split_train_fraction +
3652  // data_split_eval_fraction > 1.0, then we should error
3654  throw std::runtime_error(
3655  "Error parsing DATA_SPLIT_TRAIN_FRACTION and DATA_SPLIT_EVAL_FRACTION values. "
3656  "Expected sum of values to be less than or equal to 1.0.");
3657  }
3658 }
3660  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr) {
3661  auto validate_query_state = query_state::QueryState::create(session_ptr, select_query_);
3662 
3663  LocalQueryConnector local_connector;
3664 
3665  auto validate_result = local_connector.query(
3666  validate_query_state->createQueryStateProxy(), select_query_, {}, true, false);
3667 
3668  auto column_descriptors_for_model_create =
3669  local_connector.getColumnDescriptors(validate_result, true);
3670 
3671  std::vector<size_t> categorical_feature_idxs;
3672  std::vector<size_t> numeric_feature_idxs;
3673  bool numeric_feature_seen = false;
3674  bool all_categorical_features_placed_first = true;
3675  bool model_has_predicted_var = is_regression_model(model_type_);
3676  model_feature_vars_.reserve(column_descriptors_for_model_create.size() -
3677  (model_has_predicted_var ? 1 : 0));
3678  bool is_predicted = model_has_predicted_var ? true : false;
3679  size_t feature_idx = 0;
3680  for (auto& cd : column_descriptors_for_model_create) {
3681  // Check to see if the projected column is an expression without a user-provided
3682  // alias, as we don't allow this.
3683  if (cd.columnName.rfind("EXPR$", 0) == 0) {
3684  throw std::runtime_error(
3685  "All projected expressions (i.e. col * 2) that are not column references (i.e. "
3686  "col) must be aliased.");
3687  }
3688  if (is_predicted) {
3689  model_predicted_var_ = cd.columnName;
3690  if (!cd.columnType.is_number()) {
3691  throw std::runtime_error(
3692  "Numeric predicted column expression should be first argument to CREATE "
3693  "MODEL.");
3694  }
3695  is_predicted = false;
3696  } else {
3697  if (cd.columnType.is_number()) {
3698  numeric_feature_idxs.emplace_back(feature_idx);
3699  numeric_feature_seen = true;
3700  } else if (cd.columnType.is_string()) {
3701  categorical_feature_idxs.emplace_back(feature_idx);
3702  if (numeric_feature_seen) {
3703  all_categorical_features_placed_first = false;
3704  }
3705  } else {
3706  throw std::runtime_error("Feature column expression should be numeric or TEXT.");
3707  }
3708  model_feature_vars_.emplace_back(cd.columnName);
3709  feature_idx++;
3710  }
3711  }
3712  auto modified_select_query = select_query_;
3713  if (!all_categorical_features_placed_first) {
3714  std::ostringstream modified_query_oss;
3715  modified_query_oss << "SELECT ";
3716  if (model_has_predicted_var) {
3717  modified_query_oss << model_predicted_var_ << ", ";
3718  }
3719  for (auto categorical_feature_idx : categorical_feature_idxs) {
3720  modified_query_oss << model_feature_vars_[categorical_feature_idx] << ", ";
3721  feature_permutations_.emplace_back(static_cast<int64_t>(categorical_feature_idx));
3722  }
3723  for (auto numeric_feature_idx : numeric_feature_idxs) {
3724  modified_query_oss << model_feature_vars_[numeric_feature_idx];
3725  feature_permutations_.emplace_back(static_cast<int64_t>(numeric_feature_idx));
3726  if (numeric_feature_idx != numeric_feature_idxs.back()) {
3727  modified_query_oss << ", ";
3728  }
3729  }
3730  modified_query_oss << " FROM (" << modified_select_query << ")";
3731  modified_select_query = modified_query_oss.str();
3732  }
3733 
3734  if (data_split_train_fraction_ < 1.0) {
3735  std::ostringstream modified_query_oss;
3736  if (all_categorical_features_placed_first) {
3737  modified_query_oss << "SELECT * FROM (" << modified_select_query << ")";
3738  } else {
3739  modified_query_oss << modified_select_query;
3740  }
3741  modified_query_oss << " WHERE SAMPLE_RATIO(" << data_split_train_fraction_ << ")";
3742  modified_select_query = modified_query_oss.str();
3743  }
3744  return modified_select_query;
3745 }
3746 
3748  if (check_model_exists()) {
3749  // Will return true if model exists and if_not_exists_ is true, in this
3750  // case we should return only
3751  return;
3752  }
3753 
3755 
3756  auto session_copy = session;
3757  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
3758  &session_copy, boost::null_deleter());
3759 
3760  // We need to do various manipulations on the raw select query, such
3761  // as adding in any sampling or feature permutation logic. All of this
3762  // work is encapsulated in build_model_query
3763 
3764  const auto modified_select_query = build_model_query(session_ptr);
3765 
3766  // We have to base64 encode the model metadata because depending on the query,
3767  // the training data can have single quotes that trips up the parsing of the combined
3768  // select query with this metadata embedded.
3769 
3770  // This is just a temporary workaround until we store this info in the Catalog
3771  // rather than in the stored model pointer itself (and have to pass the metadata
3772  // down through the table function call)
3773  const auto model_metadata =
3776  select_query_,
3780  if (num_options_) {
3781  // The options string does not have a trailing comma,
3782  // so add it
3783  options_oss_ << ", ";
3784  }
3785  options_oss_ << "model_metadata => '" << model_metadata << "'";
3786 
3787  const std::string options_str = options_oss_.str();
3788 
3789  const std::string model_train_func = get_ml_model_type_str(model_type_) + "_FIT";
3790 
3791  std::ostringstream model_query_oss;
3792  model_query_oss << "SELECT * FROM TABLE(" << model_train_func << "(model_name=>'"
3793  << get_model_name() << "', data=>CURSOR(" << modified_select_query
3794  << ")";
3795  model_query_oss << ", " << options_str;
3796  model_query_oss << "))";
3797 
3798  std::string wrapped_model_query = model_query_oss.str();
3799  auto query_state = query_state::QueryState::create(session_ptr, wrapped_model_query);
3800  // Don't need result back from query, as the query will create the model
3801  LocalQueryConnector local_connector;
3802  local_connector.query(
3803  query_state->createQueryStateProxy(), wrapped_model_query, {}, false);
3804 }
3805 
3807  bool read_only_mode) {
3808  if (read_only_mode) {
3809  throw std::runtime_error("CREATE MODEL invalid in read only mode.");
3810  }
3811 
3812  try {
3813  train_model(session);
3814  } catch (std::exception& e) {
3815  std::ostringstream error_oss;
3816  // Error messages from table functions come back like this:
3817  // Error executing table function: MLTableFunctions.hpp:269 linear_reg_fit_impl: No
3818  // rows exist in training input. Training input must at least contain 1 row.
3819 
3820  // We want to take everything after the function name, so we will search for the
3821  // third colon.
3822  // Todo(todd): Look at making this less hacky by setting a mode for the table
3823  // function that will return only the core error string and not the preprending
3824  // metadata
3825 
3826  auto get_error_substring = [](const std::string& message) -> std::string {
3827  size_t colon_position = std::string::npos;
3828  for (int i = 0; i < 3; ++i) {
3829  colon_position = message.find(':', colon_position + 1);
3830  if (colon_position == std::string::npos) {
3831  return message;
3832  }
3833  }
3834 
3835  if (colon_position + 2 >= message.length()) {
3836  return message;
3837  }
3838  return message.substr(colon_position + 2);
3839  };
3840 
3841  const auto error_substr = get_error_substring(e.what());
3842 
3843  error_oss << "Could not create model " << model_name_ << ". " << error_substr;
3844  throw std::runtime_error(error_oss.str());
3845  }
3846 }
3847 
3848 DropModelStmt::DropModelStmt(const rapidjson::Value& payload) {
3849  CHECK(payload.HasMember("modelName"));
3850  model_name_ = json_str(payload["modelName"]);
3851 
3852  if_exists_ = false;
3853  if (payload.HasMember("ifExists")) {
3854  if_exists_ = json_bool(payload["ifExists"]);
3855  }
3856 }
3857 
3859  bool read_only_mode) {
3860  if (read_only_mode) {
3861  throw std::runtime_error("DROP MODEL invalid in read only mode.");
3862  }
3863  try {
3865  } catch (std::runtime_error& e) {
3866  if (!if_exists_) {
3867  throw e;
3868  }
3869  // If NOT EXISTS is set, ignore the error
3870  }
3871 }
3872 
3873 std::shared_ptr<ResultSet> getResultSet(QueryStateProxy query_state_proxy,
3874  const std::string select_stmt,
3875  std::vector<TargetMetaInfo>& targets,
3876  bool validate_only = false,
3877  std::vector<size_t> outer_fragment_indices = {},
3878  bool allow_interrupt = false) {
3879  auto const session = query_state_proxy->getConstSessionInfo();
3880  auto& catalog = session->getCatalog();
3881 
3883 #ifdef HAVE_CUDA
3884  const auto device_type = session->get_executor_device_type();
3885 #else
3886  const auto device_type = ExecutorDeviceType::CPU;
3887 #endif // HAVE_CUDA
3888  auto calcite_mgr = catalog.getCalciteMgr();
3889 
3890  // TODO MAT this should actually get the global or the session parameter for
3891  // view optimization
3892  const auto calciteQueryParsingOption =
3893  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
3894  const auto calciteOptimizationOption = calcite_mgr->getCalciteOptimizationOption(
3895  false,
3897  {},
3899  const auto query_ra = calcite_mgr
3900  ->process(query_state_proxy,
3901  pg_shim(select_stmt),
3902  calciteQueryParsingOption,
3903  calciteOptimizationOption)
3904  .plan_result;
3905  RelAlgExecutor ra_executor(
3906  executor.get(), query_ra, query_state_proxy->shared_from_this());
3908  // TODO(adb): Need a better method of dropping constants into this ExecutionOptions
3909  // struct
3910  ExecutionOptions eo = {false,
3911  false,
3912  true,
3913  false,
3914  true,
3915  false,
3916  false,
3917  validate_only,
3918  false,
3919  10000,
3920  false,
3921  false,
3922  1000,
3923  allow_interrupt,
3928  false,
3929  std::numeric_limits<size_t>::max(),
3931  outer_fragment_indices};
3932 
3933  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3936  nullptr,
3937  0,
3938  0),
3939  {}};
3940  result = ra_executor.executeRelAlgQuery(co, eo, false, false, nullptr);
3941  targets = result.getTargetsMeta();
3942 
3943  return result.getRows();
3944 }
3945 
3947  std::string& sql_query_string) {
3948  auto const session = query_state_proxy->getConstSessionInfo();
3949  auto& catalog = session->getCatalog();
3950 
3952 #ifdef HAVE_CUDA
3953  const auto device_type = session->get_executor_device_type();
3954 #else
3955  const auto device_type = ExecutorDeviceType::CPU;
3956 #endif // HAVE_CUDA
3957  auto calcite_mgr = catalog.getCalciteMgr();
3958 
3959  // TODO MAT this should actually get the global or the session parameter for
3960  // view optimization
3961  const auto calciteQueryParsingOption =
3962  calcite_mgr->getCalciteQueryParsingOption(true, false, true, false);
3963  const auto calciteOptimizationOption = calcite_mgr->getCalciteOptimizationOption(
3964  false,
3966  {},
3968  const auto query_ra = calcite_mgr
3969  ->process(query_state_proxy,
3970  pg_shim(sql_query_string),
3971  calciteQueryParsingOption,
3972  calciteOptimizationOption)
3973  .plan_result;
3974  RelAlgExecutor ra_executor(executor.get(), query_ra);
3975  CompilationOptions co = {device_type, true, ExecutorOptLevel::Default, false};
3976  // TODO(adb): Need a better method of dropping constants into this ExecutionOptions
3977  // struct
3978  ExecutionOptions eo = {false,
3979  false,
3980  true,
3981  false,
3982  true,
3983  false,
3984  false,
3985  false,
3986  false,
3987  10000,
3988  false,
3989  false,
3990  0.9,
3991  false,
3992  false};
3993  return ra_executor.getOuterFragmentCount(co, eo);
3994 }
3995 
3997  std::string& sql_query_string,
3998  std::vector<size_t> outer_frag_indices,
3999  bool validate_only,
4000  bool allow_interrupt) {
4001  // TODO(PS): Should we be using the shimmed query in getResultSet?
4002  std::string pg_shimmed_select_query = pg_shim(sql_query_string);
4003 
4004  std::vector<TargetMetaInfo> target_metainfos;
4006  auto const session = query_state_proxy->getConstSessionInfo();
4007  auto query_session = session ? session->get_session_id() : "";
4008  auto query_submitted_time = query_state_proxy->getQuerySubmittedTime();
4009  if (allow_interrupt && !validate_only && !query_session.empty()) {
4010  executor->enrollQuerySession(query_session,
4011  sql_query_string,
4012  query_submitted_time,
4014  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR);
4015  }
4016  auto result_rows = getResultSet(query_state_proxy,
4017  sql_query_string,
4018  target_metainfos,
4019  validate_only,
4020  outer_frag_indices,
4021  allow_interrupt);
4022  AggregatedResult res = {result_rows, target_metainfos};
4023  return res;
4024 }
4025 
4026 std::vector<AggregatedResult> LocalQueryConnector::query(
4027  QueryStateProxy query_state_proxy,
4028  std::string& sql_query_string,
4029  std::vector<size_t> outer_frag_indices,
4030  bool allow_interrupt) {
4031  auto res = query(
4032  query_state_proxy, sql_query_string, outer_frag_indices, false, allow_interrupt);
4033  return {res};
4034 }
4035 
4036 std::list<ColumnDescriptor> LocalQueryConnector::getColumnDescriptors(
4038  bool for_create) {
4039  std::list<ColumnDescriptor> column_descriptors;
4040  std::list<ColumnDescriptor> column_descriptors_for_create;
4041 
4042  int rowid_suffix = 0;
4043  for (const auto& target_metainfo : result.targets_meta) {
4044  ColumnDescriptor cd;
4045  cd.columnName = target_metainfo.get_resname();
4046  if (cd.columnName == "rowid") {
4047  cd.columnName += std::to_string(rowid_suffix++);
4048  }
4049  cd.columnType = target_metainfo.get_physical_type_info();
4050 
4051  ColumnDescriptor cd_for_create = cd;
4052 
4054  // we need to reset the comp param (as this points to the actual dictionary)
4055  if (cd.columnType.is_array()) {
4056  // for dict encoded arrays, it is always 4 bytes
4057  cd_for_create.columnType.set_comp_param(32);
4058  } else {
4059  cd_for_create.columnType.set_comp_param(cd.columnType.get_size() * 8);
4060  }
4061  }
4062 
4063  if (cd.columnType.is_date() && !cd.columnType.is_date_in_days()) {
4064  // default to kENCODING_DATE_IN_DAYS encoding
4066  cd_for_create.columnType.set_comp_param(0);
4067  }
4068 
4069  column_descriptors_for_create.push_back(cd_for_create);
4070  column_descriptors.push_back(cd);
4071  }
4072 
4073  if (for_create) {
4074  return column_descriptors_for_create;
4075  }
4076 
4077  return column_descriptors;
4078 }
4079 
4081  const rapidjson::Value& payload) {
4082  CHECK(payload.HasMember("name"));
4083  table_name_ = json_str(payload["name"]);
4084 
4085  CHECK(payload.HasMember("query"));
4086  select_query_ = json_str(payload["query"]);
4087 
4088  boost::replace_all(select_query_, "\n", " ");
4089  select_query_ = "(" + select_query_ + ")";
4090 
4091  if (payload.HasMember("columns")) {
4092  CHECK(payload["columns"].IsArray());
4093  for (auto& column : payload["columns"].GetArray()) {
4094  std::string s = json_str(column);
4095  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
4096  }
4097  }
4098 }
4099 
4101  const TableDescriptor* td,
4102  bool validate_table,
4103  bool for_CTAS) {
4104  auto const session = query_state_proxy->getConstSessionInfo();
4105  auto& catalog = session->getCatalog();
4107  bool populate_table = false;
4108 
4109  if (leafs_connector_) {
4110  populate_table = true;
4111  } else {
4112  leafs_connector_ = std::make_unique<LocalQueryConnector>();
4113  if (!g_cluster) {
4114  populate_table = true;
4115  }
4116  }
4117 
4118  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
4119  std::vector<const ColumnDescriptor*> target_column_descriptors;
4120  if (column_list_.empty()) {
4121  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
4122  target_column_descriptors = {std::begin(list), std::end(list)};
4123  } else {
4124  for (auto& c : column_list_) {
4125  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
4126  if (cd == nullptr) {
4127  throw std::runtime_error("Column " + *c + " does not exist.");
4128  }
4129  target_column_descriptors.push_back(cd);
4130  }
4131  }
4132 
4133  return target_column_descriptors;
4134  };
4135 
4136  bool is_temporary = table_is_temporary(td);
4137 
4138  if (validate_table) {
4139  // check access privileges
4140  if (!td) {
4141  throw std::runtime_error("Table " + table_name_ + " does not exist.");
4142  }
4143  if (td->isView) {
4144  throw std::runtime_error("Insert to views is not supported yet.");
4145  }
4146 
4147  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
4149  table_name_)) {
4150  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
4151  }
4152 
4153  // only validate the select query so we get the target types
4154  // correctly, but do not populate the result set
4155  LocalQueryConnector local_connector;
4156  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
4157  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
4158 
4159  std::vector<const ColumnDescriptor*> target_column_descriptors =
4160  get_target_column_descriptors(td);
4161 
4162  if (source_column_descriptors.size() != target_column_descriptors.size()) {
4163  throw std::runtime_error("The number of source and target columns does not match.");
4164  }
4165 
4166  for (int i = 0; i < source_column_descriptors.size(); i++) {
4167  const ColumnDescriptor* source_cd =
4168  &(*std::next(source_column_descriptors.begin(), i));
4169  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
4170 
4171  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
4172  auto type_cannot_be_cast = [](const auto& col_type) {
4173  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
4174  col_type.is_boolean());
4175  };
4176 
4177  if (type_cannot_be_cast(source_cd->columnType) ||
4178  type_cannot_be_cast(target_cd->columnType)) {
4179  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4180  source_cd->columnType.get_type_name() +
4181  "' and target '" + target_cd->columnName + " " +
4182  target_cd->columnType.get_type_name() +
4183  "' column types do not match.");
4184  }
4185  }
4186  if (source_cd->columnType.is_array()) {
4187  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
4188  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4189  source_cd->columnType.get_type_name() +
4190  "' and target '" + target_cd->columnName + " " +
4191  target_cd->columnType.get_type_name() +
4192  "' array column element types do not match.");
4193  }
4194  }
4195 
4196  if (target_cd->columnType.is_string() && !source_cd->columnType.is_string()) {
4197  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4198  source_cd->columnType.get_type_name() +
4199  "' and target '" + target_cd->columnName + " " +
4200  target_cd->columnType.get_type_name() +
4201  "' column types do not match.");
4202  }
4203 
4204  if (source_cd->columnType.is_decimal() ||
4205  source_cd->columnType.get_elem_type().is_decimal()) {
4206  SQLTypeInfo sourceType = source_cd->columnType;
4207  SQLTypeInfo targetType = target_cd->columnType;
4208 
4209  if (source_cd->columnType.is_array()) {
4210  sourceType = source_cd->columnType.get_elem_type();
4211  targetType = target_cd->columnType.get_elem_type();
4212  }
4213 
4214  if (sourceType.get_scale() != targetType.get_scale()) {
4215  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4216  source_cd->columnType.get_type_name() +
4217  "' and target '" + target_cd->columnName + " " +
4218  target_cd->columnType.get_type_name() +
4219  "' decimal columns scales do not match.");
4220  }
4221  }
4222 
4223  if (source_cd->columnType.is_string()) {
4224  if (!target_cd->columnType.is_string()) {
4225  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4226  source_cd->columnType.get_type_name() +
4227  "' and target '" + target_cd->columnName + " " +
4228  target_cd->columnType.get_type_name() +
4229  "' column types do not match.");
4230  }
4231  if (source_cd->columnType.get_compression() !=
4232  target_cd->columnType.get_compression()) {
4233  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4234  source_cd->columnType.get_type_name() +
4235  "' and target '" + target_cd->columnName + " " +
4236  target_cd->columnType.get_type_name() +
4237  "' columns string encodings do not match.");
4238  }
4239  }
4240 
4241  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
4242  if (source_cd->columnType.get_dimension() !=
4243  target_cd->columnType.get_dimension()) {
4244  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4245  source_cd->columnType.get_type_name() +
4246  "' and target '" + target_cd->columnName + " " +
4247  target_cd->columnType.get_type_name() +
4248  "' timestamp column precisions do not match.");
4249  }
4250  }
4251 
4252  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
4253  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
4254  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
4255  !source_cd->columnType.is_timestamp() &&
4256  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
4257  throw std::runtime_error("Source '" + source_cd->columnName + " " +
4258  source_cd->columnType.get_type_name() +
4259  "' and target '" + target_cd->columnName + " " +
4260  target_cd->columnType.get_type_name() +
4261  "' column encoding sizes do not match.");
4262  }
4263  }
4264  }
4265 
4266  if (!populate_table) {
4267  return;
4268  }
4269 
4270  int64_t total_row_count = 0;
4271  int64_t total_source_query_time_ms = 0;
4272  int64_t total_target_value_translate_time_ms = 0;
4273  int64_t total_data_load_time_ms = 0;
4274 
4276  auto target_column_descriptors = get_target_column_descriptors(td);
4277  auto outer_frag_count =
4278  leafs_connector_->getOuterFragmentCount(query_state_proxy, select_query_);
4279 
4280  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
4281  auto query_session = session ? session->get_session_id() : "";
4283  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
4284  try {
4285  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
4286  std::vector<size_t> allowed_outer_fragment_indices;
4287 
4288  if (outer_frag_count) {
4289  allowed_outer_fragment_indices.push_back(outer_frag_idx);
4290  }
4291 
4292  const auto query_clock_begin = timer_start();
4293  std::vector<AggregatedResult> query_results =
4294  leafs_connector_->query(query_state_proxy,
4295  select_query_,
4296  allowed_outer_fragment_indices,
4298  total_source_query_time_ms += timer_stop(query_clock_begin);
4299 
4300  auto start_time = query_state_proxy->getQuerySubmittedTime();
4301  auto query_str = "INSERT_DATA for " + work_type_str;
4303  // In the clean-up phase of the query execution for collecting aggregated result
4304  // of SELECT query, we remove its query session info, so we need to enroll the
4305  // session info again
4306  executor->enrollQuerySession(query_session,
4307  query_str,
4308  start_time,
4310  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4311  }
4312 
4313  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
4314  // this data population is non-kernel operation, so we manually cleanup
4315  // the query session info in the cleanup phase
4317  executor->clearQuerySessionStatus(query_session, start_time);
4318  }
4319  };
4320 
4321  for (auto& res : query_results) {
4322  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
4323  throw std::runtime_error(
4324  "Query execution has been interrupted while performing " + work_type_str);
4325  }
4326  auto& result_rows = res.rs;
4327  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
4328  const auto num_rows = result_rows->rowCount();
4329 
4330  if (0 == num_rows) {
4331  continue;
4332  }
4333 
4334  total_row_count += num_rows;
4335 
4336  size_t leaf_count = leafs_connector_->leafCount();
4337 
4338  // ensure that at least 1 row is processed per block up to a maximum of 65536 rows
4339  const size_t rows_per_block =
4340  std::max(std::min(num_rows / leaf_count, size_t(64 * 1024)), size_t(1));
4341 
4342  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
4343 
4345 
4346  const int num_worker_threads = std::thread::hardware_concurrency();
4347 
4348  std::vector<size_t> thread_start_idx(num_worker_threads),
4349  thread_end_idx(num_worker_threads);
4350  bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
4351 
4352  std::atomic<size_t> crt_row_idx{0};
4353 
4354  auto do_work = [&result_rows, &value_converters, &crt_row_idx](
4355  const size_t idx,
4356  const size_t block_end,
4357  const size_t num_cols,
4358  const size_t thread_id,
4359  bool& stop_convert) {
4360  const auto result_row = result_rows->getRowAtNoTranslations(idx);
4361  if (!result_row.empty()) {
4362  size_t target_row = crt_row_idx.fetch_add(1);
4363  if (target_row >= block_end) {
4364  stop_convert = true;
4365  return;
4366  }
4367  for (unsigned int col = 0; col < num_cols; col++) {
4368  const auto& mapd_variant = result_row[col];
4369  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
4370  }
4371  }
4372  };
4373 
4374  auto convert_function = [&thread_start_idx,
4375  &thread_end_idx,
4376  &value_converters,
4377  &executor,
4378  &query_session,
4379  &work_type_str,
4380  &do_work](const int thread_id, const size_t block_end) {
4381  const int num_cols = value_converters.size();
4382  const size_t start = thread_start_idx[thread_id];
4383  const size_t end = thread_end_idx[thread_id];
4384  size_t idx = 0;
4385  bool stop_convert = false;
4387  size_t local_idx = 0;
4388  for (idx = start; idx < end; ++idx, ++local_idx) {
4389  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
4390  check_session_interrupted(query_session, executor))) {
4391  throw std::runtime_error(
4392  "Query execution has been interrupted while performing " +
4393  work_type_str);
4394  }
4395  do_work(idx, block_end, num_cols, thread_id, stop_convert);
4396  if (stop_convert) {
4397  break;
4398  }
4399  }
4400  } else {
4401  for (idx = start; idx < end; ++idx) {
4402  do_work(idx, block_end, num_cols, thread_id, stop_convert);
4403  if (stop_convert) {
4404  break;
4405  }
4406  }
4407  }
4408  thread_start_idx[thread_id] = idx;
4409  };
4410 
4411  auto single_threaded_value_converter =
4412  [&crt_row_idx, &value_converters, &result_rows](const size_t idx,
4413  const size_t block_end,
4414  const size_t num_cols,
4415  bool& stop_convert) {
4416  size_t target_row = crt_row_idx.fetch_add(1);
4417  if (target_row >= block_end) {
4418  stop_convert = true;
4419  return;
4420  }
4421  const auto result_row = result_rows->getNextRow(false, false);
4422  CHECK(!result_row.empty());
4423  for (unsigned int col = 0; col < num_cols; col++) {
4424  const auto& mapd_variant = result_row[col];
4425  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
4426  }
4427  };
4428 
4429  auto single_threaded_convert_function = [&value_converters,
4430  &thread_start_idx,
4431  &thread_end_idx,
4432  &executor,
4433  &query_session,
4434  &work_type_str,
4435  &single_threaded_value_converter](
4436  const int thread_id,
4437  const size_t block_end) {
4438  const int num_cols = value_converters.size();
4439  const size_t start = thread_start_idx[thread_id];
4440  const size_t end = thread_end_idx[thread_id];
4441  size_t idx = 0;
4442  bool stop_convert = false;
4444  size_t local_idx = 0;
4445  for (idx = start; idx < end; ++idx, ++local_idx) {
4446  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
4447  check_session_interrupted(query_session, executor))) {
4448  throw std::runtime_error(
4449  "Query execution has been interrupted while performing " +
4450  work_type_str);
4451  }
4452  single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
4453  if (stop_convert) {
4454  break;
4455  }
4456  }
4457  } else {
4458  for (idx = start; idx < end; ++idx) {
4459  single_threaded_value_converter(idx, end, num_cols, stop_convert);
4460  if (stop_convert) {
4461  break;
4462  }
4463  }
4464  }
4465  thread_start_idx[thread_id] = idx;
4466  };
4467 
4468  if (can_go_parallel) {
4469  const size_t entry_count = result_rows->entryCount();
4470  for (size_t
4471  i = 0,
4472  start_entry = 0,
4473  stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
4474  i < num_worker_threads && start_entry < entry_count;
4475  ++i, start_entry += stride) {
4476  const auto end_entry = std::min(start_entry + stride, entry_count);
4477  thread_start_idx[i] = start_entry;
4478  thread_end_idx[i] = end_entry;
4479  }
4480  } else {
4481  thread_start_idx[0] = 0;
4482  thread_end_idx[0] = result_rows->entryCount();
4483  }
4484 
4485  for (size_t block_start = 0; block_start < num_rows;
4486  block_start += rows_per_block) {
4487  const auto num_rows_this_itr = block_start + rows_per_block < num_rows
4488  ? rows_per_block
4489  : num_rows - block_start;
4490  crt_row_idx = 0; // reset block tracker
4491  value_converters.clear();
4492  int colNum = 0;
4493  for (const auto targetDescriptor : target_column_descriptors) {
4494  auto sourceDataMetaInfo = res.targets_meta[colNum++];
4496  num_rows_this_itr,
4497  sourceDataMetaInfo,
4498  targetDescriptor,
4499  catalog,
4500  targetDescriptor->columnType,
4501  !targetDescriptor->columnType.get_notnull(),
4502  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
4504  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
4505  ? executor->getStringDictionaryProxy(
4506  sourceDataMetaInfo.get_type_info().getStringDictKey(),
4507  result_rows->getRowSetMemOwner(),
4508  true)
4509  : nullptr};
4510  auto converter = factory.create(param);
4511  value_converters.push_back(std::move(converter));
4512  }
4513 
4514  const auto translate_clock_begin = timer_start();
4515  if (can_go_parallel) {
4516  std::vector<std::future<void>> worker_threads;
4517  for (int i = 0; i < num_worker_threads; ++i) {
4518  worker_threads.push_back(
4519  std::async(std::launch::async, convert_function, i, num_rows_this_itr));
4520  }
4521 
4522  for (auto& child : worker_threads) {
4523  child.wait();
4524  }
4525  for (auto& child : worker_threads) {
4526  child.get();
4527  }
4528 
4529  } else {
4530  single_threaded_convert_function(0, num_rows_this_itr);
4531  }
4532 
4533  // finalize the insert data
4534  auto finalizer_func =
4535  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
4536  targetValueConverter->finalizeDataBlocksForInsertData();
4537  };
4538 
4539  std::vector<std::future<void>> worker_threads;
4540  for (auto& converterPtr : value_converters) {
4541  worker_threads.push_back(
4542  std::async(std::launch::async, finalizer_func, converterPtr.get()));
4543  }
4544 
4545  for (auto& child : worker_threads) {
4546  child.wait();
4547  }
4548  for (auto& child : worker_threads) {
4549  child.get();
4550  }
4551 
4553  insert_data.databaseId = catalog.getCurrentDB().dbId;
4554  CHECK(td);
4555  insert_data.tableId = td->tableId;
4556  insert_data.numRows = num_rows_this_itr;
4557 
4558  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
4560  check_session_interrupted(query_session, executor))) {
4561  throw std::runtime_error(
4562  "Query execution has been interrupted while performing " +
4563  work_type_str);
4564  }
4565  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
4566  }
4567  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
4568 
4569  const auto data_load_clock_begin = timer_start();
4570  auto data_memory_holder =
4571  import_export::fill_missing_columns(&catalog, insert_data);
4572  insertDataLoader.insertData(*session, insert_data);
4573  total_data_load_time_ms += timer_stop(data_load_clock_begin);
4574  }
4575  }
4576  }
4577  } catch (...) {
4578  try {
4579  leafs_connector_->rollback(*session, td->tableId);
4580  } catch (std::exception& e) {
4581  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
4582  << td->tableId << ", Error: " << e.what();
4583  }
4584  throw;
4585  }
4586 
4587  int64_t total_time_ms = total_source_query_time_ms +
4588  total_target_value_translate_time_ms + total_data_load_time_ms;
4589 
4590  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
4591  << "ms (outer_frag_count=" << outer_frag_count
4592  << ", query_time=" << total_source_query_time_ms
4593  << "ms, translation_time=" << total_target_value_translate_time_ms
4594  << "ms, data_load_time=" << total_data_load_time_ms
4595  << "ms)\nquery: " << select_query_;
4596 
4597  if (!is_temporary) {
4598  leafs_connector_->checkpoint(*session, td->tableId);
4599  }
4600 }
4601 
4602 namespace {
4603 shared::TableKey get_table_key(const std::vector<std::string>& table) {
4604  const auto catalog = SysCatalog::instance().getCatalog(table[1]);
4605  CHECK(catalog);
4606  const auto table_id = catalog->getTableId(table[0]);
4607  if (!table_id.has_value()) {
4608  throw std::runtime_error{"Table \"" + table[0] +
4609  "\" does not exist in catalog: " + table[1] + "."};
4610  }
4611  return {catalog->getDatabaseId(), table_id.value()};
4612 }
4613 
4615  const std::string& insert_table_db_name,
4616  const std::string& query_str,
4617  const QueryStateProxy& query_state_proxy,
4618  const std::optional<std::string>& insert_table_name = {}) {
4619  auto& sys_catalog = SysCatalog::instance();
4620  auto& calcite_mgr = sys_catalog.getCalciteMgr();
4621  const auto calciteQueryParsingOption =
4622  calcite_mgr.getCalciteQueryParsingOption(true, false, true, false);
4623  const auto calciteOptimizationOption = calcite_mgr.getCalciteOptimizationOption(
4624  false, g_enable_watchdog, {}, sys_catalog.isAggregator());
4625  const auto result = calcite_mgr.process(query_state_proxy,
4626  pg_shim(query_str),
4627  calciteQueryParsingOption,
4628  calciteOptimizationOption);
4629  // force sort into tableid order in case of name change to guarantee fixed order of
4630  // mutex access
4631  auto comparator = [](const std::vector<std::string>& table_1,
4632  const std::vector<std::string>& table_2) {
4633  return get_table_key(table_1) < get_table_key(table_2);
4634  };
4635  std::set<std::vector<std::string>, decltype(comparator)> tables(comparator);
4636  for (auto& tab : result.resolved_accessed_objects.tables_selected_from) {
4637  tables.emplace(tab);
4638  }
4639  if (insert_table_name.has_value()) {
4640  tables.emplace(
4641  std::vector<std::string>{insert_table_name.value(), insert_table_db_name});
4642  }
4644  for (const auto& table : tables) {
4645  const auto catalog = sys_catalog.getCatalog(table[1]);
4646  CHECK(catalog);
4647  locks.emplace_back(
4650  *catalog, table[0])));
4651  if (insert_table_name.has_value() && table[0] == insert_table_name.value() &&
4652  table[1] == insert_table_db_name) {
4653  locks.emplace_back(
4656  catalog->getDatabaseId(), (*locks.back())())));
4657  } else {
4658  locks.emplace_back(
4661  catalog->getDatabaseId(), (*locks.back())())));
4662  }
4663  }
4664  return locks;
4665 }
4666 } // namespace
4667 
4669  bool read_only_mode) {
4670  if (read_only_mode) {
4671  throw std::runtime_error("INSERT INTO TABLE invalid in read only mode.");
4672  }
4673  auto session_copy = session;
4674  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4675  &session_copy, boost::null_deleter());
4676  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4677  auto stdlog = STDLOG(query_state);
4678  auto& catalog = session_ptr->getCatalog();
4679 
4680  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
4681 
4682  if (catalog.getMetadataForTable(table_name_) == nullptr) {
4683  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
4684  }
4685 
4686  auto locks = acquire_query_table_locks(
4687  catalog.name(), select_query_, query_state->createQueryStateProxy(), table_name_);
4688  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4689 
4690  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
4691 
4692  try {
4693  populateData(query_state->createQueryStateProxy(), td, true, false);
4694  } catch (...) {
4695  throw;
4696  }
4697 }
4698 
4700  : InsertIntoTableAsSelectStmt(payload) {
4701  if (payload.HasMember("temporary")) {
4702  is_temporary_ = json_bool(payload["temporary"]);
4703  } else {
4704  is_temporary_ = false;
4705  }
4706 
4707  if (payload.HasMember("ifNotExists")) {
4708  if_not_exists_ = json_bool(payload["ifNotExists"]);
4709  } else {
4710  if_not_exists_ = false;
4711  }
4712 
4713  parse_options(payload, storage_options_);
4714 }
4715 
4717  bool read_only_mode) {
4718  if (read_only_mode) {
4719  throw std::runtime_error("CREATE TABLE invalid in read only mode.");
4720  }
4721  auto session_copy = session;
4722  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4723  &session_copy, boost::null_deleter());
4724  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4725  auto stdlog = STDLOG(query_state);
4726  LocalQueryConnector local_connector;
4727  auto& catalog = session.getCatalog();
4728  bool create_table = nullptr == leafs_connector_;
4729 
4730  std::set<std::string> select_tables;
4731  if (create_table) {
4732  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
4733 
4734  // check access privileges
4737  throw std::runtime_error("CTAS failed. Table " + table_name_ +
4738  " will not be created. User has no create privileges.");
4739  }
4740 
4741  if (catalog.getMetadataForTable(table_name_) != nullptr) {
4742  if (if_not_exists_) {
4743  return;
4744  }
4745  throw std::runtime_error("Table " + table_name_ +
4746  " already exists and no data was loaded.");
4747  }
4748 
4749  // only validate the select query so we get the target types
4750  // correctly, but do not populate the result set
4751  // we currently have exclusive access to the system so this is safe
4752  auto validate_result = local_connector.query(
4753  query_state->createQueryStateProxy(), select_query_, {}, true, false);
4754 
4755  auto column_descriptors_for_create =
4756  local_connector.getColumnDescriptors(validate_result, true);
4757 
4758  // some validation as the QE might return some out of range column types
4759  for (auto& cd : column_descriptors_for_create) {
4760  if (cd.columnType.is_decimal() &&
4761  cd.columnType.get_precision() > sql_constants::kMaxNumericPrecision) {
4762  throw std::runtime_error(cd.columnName + ": Precision too high, max " +
4764  ".");
4765  }
4766  // flatbuffer storage in real tables is not implemented, so we
4767  // reset the flatbuffer storage flag for CTAS cases that select
4768  // from UDTF columns which may use flatbuffer storage:
4769  cd.columnType.setUsesFlatBuffer(false);
4770  }
4771 
4772  TableDescriptor td;
4773  td.tableName = table_name_;
4774  td.userId = session.get_currentUser().userId;
4775  td.nColumns = column_descriptors_for_create.size();
4776  td.isView = false;
4777  td.fragmenter = nullptr;
4784  if (is_temporary_) {
4786  } else {
4788  }
4789 
4790  bool use_shared_dictionaries = true;
4791  bool force_geo_compression = true;
4792 
4793  if (!storage_options_.empty()) {
4794  for (auto& p : storage_options_) {
4795  if (boost::to_lower_copy<std::string>(*p->get_name()) ==
4796  "use_shared_dictionaries") {
4797  const StringLiteral* literal =
4798  dynamic_cast<const StringLiteral*>(p->get_value());
4799  if (nullptr == literal) {
4800  throw std::runtime_error(
4801  "USE_SHARED_DICTIONARIES must be a string parameter");
4802  }
4803  std::string val = boost::to_lower_copy<std::string>(*literal->get_stringval());
4804  use_shared_dictionaries = val == "true" || val == "1" || val == "t";
4805  } else if (boost::to_lower_copy<std::string>(*p->get_name()) ==
4806  "force_geo_compression") {
4807  const StringLiteral* literal =
4808  dynamic_cast<const StringLiteral*>(p->get_value());
4809  if (nullptr == literal) {
4810  throw std::runtime_error("FORCE_GEO_COMPRESSION must be a string parameter");
4811  }
4812  std::string val = boost::to_lower_copy<std::string>(*literal->get_stringval());
4813  force_geo_compression = val == "true" || val == "1" || val == "t";
4814  } else {
4815  get_table_definitions_for_ctas(td, p, column_descriptors_for_create);
4816  }
4817  }
4818  }
4819 
4820  std::vector<SharedDictionaryDef> sharedDictionaryRefs;
4821 
4822  if (use_shared_dictionaries) {
4823  const auto source_column_descriptors =
4824  local_connector.getColumnDescriptors(validate_result, false);
4825  const auto mapping = catalog.getDictionaryToColumnMapping();
4826 
4827  for (auto& source_cd : source_column_descriptors) {
4828  const auto& ti = source_cd.columnType;
4829  if (ti.is_string()) {
4830  if (ti.get_compression() == kENCODING_DICT) {
4831  int dict_id = ti.get_comp_param();
4832  auto it = mapping.find(dict_id);
4833  if (mapping.end() != it) {
4834  const auto targetColumn = it->second;
4835  auto targetTable =
4836  catalog.getMetadataForTable(targetColumn->tableId, false);
4837  CHECK(targetTable);
4838  LOG(INFO) << "CTAS: sharing text dictionary on column "
4839  << source_cd.columnName << " with " << targetTable->tableName
4840  << "." << targetColumn->columnName;
4841  sharedDictionaryRefs.emplace_back(
4842  source_cd.columnName, targetTable->tableName, targetColumn->columnName);
4843  }
4844  }
4845  }
4846  }
4847  }
4848 
4849  if (force_geo_compression) {
4850  for (auto& cd_for_create : column_descriptors_for_create) {
4851  auto& ti = cd_for_create.columnType;
4852  if (ti.is_geometry() && ti.get_output_srid() == 4326) {
4853  // turn on GEOINT32 compression
4854  ti.set_compression(kENCODING_GEOINT);
4855  ti.set_comp_param(32);
4856  }
4857  }
4858  }
4859 
4860  // currently no means of defining sharding in CTAS
4861  td.keyMetainfo = serialize_key_metainfo(nullptr, sharedDictionaryRefs);
4862 
4863  catalog.createTable(td, column_descriptors_for_create, sharedDictionaryRefs, true);
4864  // TODO (max): It's transactionally unsafe, should be fixed: we may create object
4865  // w/o privileges
4866  SysCatalog::instance().createDBObject(
4867  session.get_currentUser(), td.tableName, TableDBObjectType, catalog);
4868  }
4869 
4870  // note there is a time where we do not have any executor outer lock here. someone could
4871  // come along and mess with the data or other tables.
4872  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
4873 
4874  auto locks = acquire_query_table_locks(
4875  catalog.name(), select_query_, query_state->createQueryStateProxy(), table_name_);
4876  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4877  try {
4878  populateData(query_state->createQueryStateProxy(), td, false, true);
4879  } catch (...) {
4880  if (!g_cluster) {
4881  const TableDescriptor* created_td = catalog.getMetadataForTable(table_name_);
4882  if (created_td) {
4883  catalog.dropTable(created_td);
4884  }
4885  }
4886  throw;
4887  }
4888 }
4889 
4890 DropTableStmt::DropTableStmt(const rapidjson::Value& payload) {
4891  CHECK(payload.HasMember("tableName"));
4892  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
4893 
4894  if_exists_ = false;
4895  if (payload.HasMember("ifExists")) {
4896  if_exists_ = json_bool(payload["ifExists"]);
4897  }
4898 }
4899 
4901  bool read_only_mode) {
4902  if (read_only_mode) {
4903  throw std::runtime_error("DROP TABLE invalid in read only mode.");
4904  }
4905  // Because we are able to acquire a unique_lock on the table descriptor to be dropped we
4906  // can get away with only using a shared_lock on the executor, as anything that will
4907  // touch the table being dropped with block, but other transactions are ok.
4908  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
4909  auto& catalog = session.getCatalog();
4910  const TableDescriptor* td{nullptr};
4911  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>> td_with_lock;
4912  try {
4913  td_with_lock =
4914  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
4916  catalog, *table_, false));
4917  td = (*td_with_lock)();
4918  } catch (const std::runtime_error& e) {
4919  if (if_exists_) {
4920  return;
4921  } else {
4922  throw e;
4923  }
4924  }
4925 
4926  CHECK(td);
4927  CHECK(td_with_lock);
4928 
4929  // check access privileges
4930  if (!session.checkDBAccessPrivileges(
4932  throw std::runtime_error("Table " + *table_ +
4933  " will not be dropped. User has no proper privileges.");
4934  }
4935 
4937 
4938  {
4939  auto table_data_read_lock =
4941  Executor::clearExternalCaches(false, td, catalog.getCurrentDB().dbId);
4942  }
4943 
4944  auto table_data_write_lock =
4946  catalog.dropTable(td);
4947 }
4948 
4949 TruncateTableStmt::TruncateTableStmt(const rapidjson::Value& payload) {
4950  CHECK(payload.HasMember("tableName"));
4951  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
4952 }
4953 
4955  bool read_only_mode) {
4956  if (read_only_mode) {
4957  throw std::runtime_error("TRUNCATE TABLE invalid in read only mode.");
4958  }
4959  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
4960  auto& catalog = session.getCatalog();
4961  const auto td_with_lock =
4963  catalog, *table_, true);
4964  const auto td = td_with_lock();
4965  if (!td) {
4966  throw std::runtime_error("Table " + *table_ + " does not exist.");
4967  }
4968 
4969  // check access privileges
4970  std::vector<DBObject> privObjects;
4971  DBObject dbObject(*table_, TableDBObjectType);
4972  dbObject.loadKey(catalog);
4974  privObjects.push_back(dbObject);
4975  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
4976  throw std::runtime_error("Table " + *table_ + " will not be truncated. User " +
4977  session.get_currentUser().userLoggable() +
4978  " has no proper privileges.");
4979  }
4980 
4981  if (td->isView) {
4982  throw std::runtime_error(*table_ + " is a view. Cannot Truncate.");
4983  }
4985 
4986  // invalidate cached item
4987  {
4988  auto table_data_read_lock =
4990  Executor::clearExternalCaches(false, td, catalog.getCurrentDB().dbId);
4991  }
4992 
4993  auto table_data_write_lock =
4995  catalog.truncateTable(td);
4996 }
4997 
4998 OptimizeTableStmt::OptimizeTableStmt(const rapidjson::Value& payload) {
4999  CHECK(payload.HasMember("tableName"));
5000  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
5001  parse_options(payload, options_);
5002 }
5003 
5004 namespace {
5006  const TableDescriptor* td,
5007  const AccessPrivileges access_priv) {
5008  CHECK(td);
5009  auto& cat = session_info.getCatalog();
5010  std::vector<DBObject> privObjects;
5011  DBObject dbObject(td->tableName, TableDBObjectType);
5012  dbObject.loadKey(cat);
5013  dbObject.setPrivileges(access_priv);
5014  privObjects.push_back(dbObject);
5015  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
5016  privObjects);
5017 };
5018 } // namespace
5019 
5021  bool read_only_mode) {
5022  if (read_only_mode) {
5023  throw std::runtime_error("OPTIMIZE TABLE invalid in read only mode.");
5024  }
5025  auto& catalog = session.getCatalog();
5026 
5027  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
5028 
5029  const auto td_with_lock =
5031  catalog, *table_);
5032  const auto td = td_with_lock();
5033 
5034  if (!td || !user_can_access_table(session, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5035  throw std::runtime_error("Table " + *table_ + " does not exist.");
5036  }
5037 
5038  if (td->isView) {
5039  throw std::runtime_error("OPTIMIZE TABLE command is not supported on views.");
5040  }
5041 
5042  // invalidate cached item
5043  Executor::clearExternalCaches(true, td, catalog.getDatabaseId());
5044 
5046  const TableOptimizer optimizer(td, executor, catalog);
5047  if (shouldVacuumDeletedRows()) {
5048  optimizer.vacuumDeletedRows();
5049  }
5050  optimizer.recomputeMetadata();
5051 }
5052 
5053 bool repair_type(std::list<std::unique_ptr<NameValueAssign>>& options) {
5054  for (const auto& opt : options) {
5055  if (boost::iequals(*opt->get_name(), "REPAIR_TYPE")) {
5056  const auto repair_type =
5057  static_cast<const StringLiteral*>(opt->get_value())->get_stringval();
5058  CHECK(repair_type);
5059  if (boost::iequals(*repair_type, "REMOVE")) {
5060  return true;
5061  } else {
5062  throw std::runtime_error("REPAIR_TYPE must be REMOVE.");
5063  }
5064  } else {
5065  throw std::runtime_error("The only VALIDATE WITH options is REPAIR_TYPE.");
5066  }
5067  }
5068  return false;
5069 }
5070 
5071 ValidateStmt::ValidateStmt(std::string* type, std::list<NameValueAssign*>* with_opts)
5072  : type_(type) {
5073  if (!type) {
5074  throw std::runtime_error("Validation Type is required for VALIDATE command.");
5075  }
5076  std::list<std::unique_ptr<NameValueAssign>> options;
5077  if (with_opts) {
5078  for (const auto e : *with_opts) {
5079  options.emplace_back(e);
5080  }
5081  delete with_opts;
5082 
5083  isRepairTypeRemove_ = repair_type(options);
5084  }
5085 }
5086 
5087 ValidateStmt::ValidateStmt(const rapidjson::Value& payload) {
5088  CHECK(payload.HasMember("type"));
5089  type_ = std::make_unique<std::string>(json_str(payload["type"]));
5090 
5091  std::list<std::unique_ptr<NameValueAssign>> options;
5092  parse_options(payload, options);
5093 
5094  isRepairTypeRemove_ = repair_type(options);
5095 }
5096 
5098  const TableDescriptor* td) {
5099  if (session.get_currentUser().isSuper ||
5100  session.get_currentUser().userId == td->userId) {
5101  return;
5102  }
5103  std::vector<DBObject> privObjects;
5104  DBObject dbObject(td->tableName, TableDBObjectType);
5105  dbObject.loadKey(session.getCatalog());
5107  privObjects.push_back(dbObject);
5108  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5109  throw std::runtime_error("Current user does not have the privilege to alter table: " +
5110  td->tableName);
5111  }
5112 }
5113 
5114 RenameUserStmt::RenameUserStmt(const rapidjson::Value& payload) {
5115  CHECK(payload.HasMember("name"));
5116  username_ = std::make_unique<std::string>(json_str(payload["name"]));
5117  CHECK(payload.HasMember("newName"));
5118  new_username_ = std::make_unique<std::string>(json_str(payload["newName"]));
5119 }
5120 
5122  bool read_only_mode) {
5123  if (read_only_mode) {
5124  throw std::runtime_error("RENAME TABLE invalid in read only mode.");
5125  }
5126  if (!session.get_currentUser().isSuper) {
5127  throw std::runtime_error("Only a super user can rename users.");
5128  }
5129 
5131  if (!SysCatalog::instance().getMetadataForUser(*username_, user)) {
5132  throw std::runtime_error("User " + *username_ + " does not exist.");
5133  }
5134 
5135  SysCatalog::instance().renameUser(*username_, *new_username_);
5136 }
5137 
5138 RenameDBStmt::RenameDBStmt(const rapidjson::Value& payload) {
5139  CHECK(payload.HasMember("name"));
5140  database_name_ = std::make_unique<std::string>(json_str(payload["name"]));
5141  CHECK(payload.HasMember("newName"));
5142  new_database_name_ = std::make_unique<std::string>(json_str(payload["newName"]));
5143 }
5144 
5146  bool read_only_mode) {
5147  if (read_only_mode) {
5148  throw std::runtime_error("RENAME DATABASE invalid in read only mode.");
5149  }
5151 
5152  // TODO: use database lock instead
5153  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
5154 
5155  if (!SysCatalog::instance().getMetadataForDB(*database_name_, db)) {
5156  throw std::runtime_error("Database " + *database_name_ + " does not exist.");
5157  }
5158 
5159  if (!session.get_currentUser().isSuper &&
5160  session.get_currentUser().userId != db.dbOwner) {
5161  throw std::runtime_error("Only a super user or the owner can rename the database.");
5162  }
5163 
5165 }
5166 
5167 RenameTableStmt::RenameTableStmt(const rapidjson::Value& payload) {
5168  CHECK(payload.HasMember("tableNames"));
5169  CHECK(payload["tableNames"].IsArray());
5170  const auto elements = payload["tableNames"].GetArray();
5171  for (const auto& element : elements) {
5172  CHECK(element.HasMember("name"));
5173  CHECK(element.HasMember("newName"));
5174  tablesToRename_.emplace_back(new std::string(json_str(element["name"])),
5175  new std::string(json_str(element["newName"])));
5176  }
5177 }
5178 
5179 RenameTableStmt::RenameTableStmt(std::string* tab_name, std::string* new_tab_name) {
5180  tablesToRename_.emplace_back(tab_name, new_tab_name);
5181 }
5182 
5184  std::list<std::pair<std::string, std::string>> tableNames) {
5185  for (auto item : tableNames) {
5186  tablesToRename_.emplace_back(new std::string(item.first),
5187  new std::string(item.second));
5188  }
5189 }
5190 
5191 using SubstituteMap = std::map<std::string, std::string>;
5192 
5193 // Namespace fns used to track a left-to-right execution of RENAME TABLE
5194 // and verify that the command should be (entirely/mostly) valid
5195 //
5196 namespace {
5197 
5198 static constexpr char const* EMPTY_NAME{""};
5199 
5200 std::string generateUniqueTableName(std::string name) {
5201  // TODO - is there a "better" way to create a tmp name for the table
5202  std::time_t result = std::time(nullptr);
5203  return name + "_tmp" + std::to_string(result);
5204 }
5205 
5206 void recordRename(SubstituteMap& sMap, std::string oldName, std::string newName) {
5207  sMap[oldName] = newName;
5208 }
5209 
5211  SubstituteMap& sMap,
5212  std::string tableName) {
5213  if (sMap.find(tableName) != sMap.end()) {
5214  if (sMap[tableName] == EMPTY_NAME) {
5215  return tableName;
5216  }
5217  return sMap[tableName];
5218  } else {
5219  // lookup table in src catalog
5220  const TableDescriptor* td = catalog.getMetadataForTable(tableName);
5221  if (td) {
5222  sMap[tableName] = tableName;
5223  } else {
5224  sMap[tableName] = EMPTY_NAME;
5225  }
5226  }
5227  return tableName;
5228 }
5229 
5230 bool hasData(SubstituteMap& sMap, std::string tableName) {
5231  // assumes loadTable has been previously called
5232  return (sMap[tableName] != EMPTY_NAME);
5233 }
5234 
5236  // Substition map should be clean at end of rename:
5237  // all items in map must (map to self) or (map to EMPTY_STRING) by end
5238 
5239  for (auto it : sMap) {
5240  if ((it.second) != EMPTY_NAME && (it.first) != (it.second)) {
5241  throw std::runtime_error(
5242  "Error: Attempted to overwrite and lose data in table: \'" + (it.first) + "\'");
5243  }
5244  }
5245 }
5246 } // namespace
5247 
5248 namespace {
5251  throw std::runtime_error(td->tableName + " is a foreign table. " +
5252  "Use ALTER FOREIGN TABLE.");
5253  }
5254 }
5255 } // namespace
5256 
5258  bool read_only_mode) {
5259  if (read_only_mode) {
5260  throw std::runtime_error("RENAME TABLE invalid in read only mode.");
5261  }
5262  auto& catalog = session.getCatalog();
5263 
5264  // TODO(adb): the catalog should be handling this locking (see AddColumStmt)
5265  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
5266 
5267  // accumulated vector of table names: oldName->newName
5268  std::vector<std::pair<std::string, std::string>> names;
5269 
5270  SubstituteMap tableSubtituteMap;
5271 
5272  for (auto& item : tablesToRename_) {
5273  std::string curTableName = *(item.first);
5274  std::string newTableName = *(item.second);
5275 
5276  // Note: if rename (a->b, b->a)
5277  // requires a tmp name change (a->tmp, b->a, tmp->a),
5278  // inject that here because
5279  // catalog.renameTable() assumes cleanliness else will fail
5280 
5281  std::string altCurTableName = loadTable(catalog, tableSubtituteMap, curTableName);
5282  std::string altNewTableName = loadTable(catalog, tableSubtituteMap, newTableName);
5283 
5284  if (altCurTableName != curTableName && altCurTableName != EMPTY_NAME) {
5285  // rename is a one-shot deal, reset the mapping once used
5286  recordRename(tableSubtituteMap, curTableName, curTableName);
5287  }
5288 
5289  // Check to see if the command (as-entered) will likely execute cleanly (logic-wise)
5290  // src tables exist before coping from
5291  // destination table collisions
5292  // handled (a->b, b->a)
5293  // or flagged (pre-existing a,b ... "RENAME TABLE a->c, b->c" )
5294  // handle mulitple chained renames, tmp names (a_>tmp, b->a, tmp->a)
5295  // etc.
5296  //
5297  if (hasData(tableSubtituteMap, altCurTableName)) {
5298  const TableDescriptor* td = catalog.getMetadataForTable(altCurTableName);
5299  if (td) {
5300  // Tables *and* views may be renamed here, foreign tables not
5301  // -> just block foreign tables
5303  check_alter_table_privilege(session, td);
5304  }
5305 
5306  if (hasData(tableSubtituteMap, altNewTableName)) {
5307  std::string tmpNewTableName = generateUniqueTableName(altNewTableName);
5308  // rename: newTableName to tmpNewTableName to get it out of the way
5309  // because it was full
5310  recordRename(tableSubtituteMap, altCurTableName, EMPTY_NAME);
5311  recordRename(tableSubtituteMap, altNewTableName, tmpNewTableName);
5312  recordRename(tableSubtituteMap, tmpNewTableName, tmpNewTableName);
5313  names.emplace_back(altNewTableName, tmpNewTableName);
5314  names.emplace_back(altCurTableName, altNewTableName);
5315  } else {
5316  // rename: curNewTableName to newTableName
5317  recordRename(tableSubtituteMap, altCurTableName, EMPTY_NAME);
5318  recordRename(tableSubtituteMap, altNewTableName, altNewTableName);
5319  names.emplace_back(altCurTableName, altNewTableName);
5320  }
5321  } else {
5322  throw std::runtime_error("Source table \'" + curTableName + "\' does not exist.");
5323  }
5324  }
5325  checkNameSubstition(tableSubtituteMap);
5326 
5327  catalog.renameTables(names);
5328 
5329  // just to be explicit, clean out the list, the unique_ptr will delete
5330  while (!tablesToRename_.empty()) {
5331  tablesToRename_.pop_front();
5332  }
5333 } // namespace Parser
5334 
5336  bool not_null;
5337  const ColumnConstraintDef* cc = coldef->get_column_constraint();
5338  if (cc == nullptr) {
5339  not_null = false;
5340  } else {
5341  not_null = cc->get_notnull();
5342  }
5343  std::string default_value;
5344  const std::string* default_value_ptr = nullptr;
5345  if (cc) {
5346  if (auto def_val_literal = cc->get_defaultval()) {
5347  auto defaultsp = dynamic_cast<const StringLiteral*>(def_val_literal);
5348  default_value =
5349  defaultsp ? *defaultsp->get_stringval() : def_val_literal->to_string();
5350  // The preprocessing below is needed because:
5351  // a) TypedImportBuffer expects arrays in the {...} format
5352  // b) TypedImportBuffer expects string literals inside arrays w/o any quotes
5353  if (coldef->get_column_type()->get_is_array()) {
5354  std::regex array_re(R"(^ARRAY\s*\[(.*)\]$)", std::regex_constants::icase);
5355  default_value = std::regex_replace(default_value, array_re, "{$1}");
5356  boost::erase_all(default_value, "\'");
5357  }
5358  default_value_ptr = &default_value;
5359  }
5360  }
5362  cd,
5363  coldef->get_column_type(),
5364  not_null,
5365  coldef->get_compression(),
5366  default_value_ptr);
5367 }
5368 
5370  set_column_descriptor(cd, coldef);
5371 }
5372 
5374  const TableDescriptor* td) {
5375  auto& catalog = session.getCatalog();
5376  if (!td) {
5377  throw std::runtime_error("Table " + *table_ + " does not exist.");
5378  } else {
5379  if (td->isView) {
5380  throw std::runtime_error("Adding columns to a view is not supported.");
5381  }
5383  if (table_is_temporary(td)) {
5384  throw std::runtime_error(
5385  "Adding columns to temporary tables is not yet supported.");
5386  }
5387  }
5388 
5389  check_alter_table_privilege(session, td);
5390 
5391  if (0 == coldefs_.size()) {
5392  coldefs_.push_back(std::move(coldef_));
5393  }
5394 
5395  for (const auto& coldef : coldefs_) {
5396  auto& new_column_name = *coldef->get_column_name();
5397  if (catalog.getMetadataForColumn(td->tableId, new_column_name) != nullptr) {
5398  throw std::runtime_error("Column " + new_column_name + " already exists.");
5399  }
5400  }
5401 }
5402 
5404  bool read_only_mode) {
5405  if (read_only_mode) {
5406  throw std::runtime_error("ADD COLUMN invalid in read only mode.");
5407  }
5408  // TODO: Review add and drop column implementation
5409  const auto execute_write_lock = legacylockmgr::getExecuteWriteLock();
5410  auto& catalog = session.getCatalog();
5411  const auto td_with_lock =
5413  catalog, *table_, true);
5414  const auto td = td_with_lock();
5415 
5416  check_executable(session, td);
5417 
5418  CHECK(td->fragmenter);
5419  if (std::dynamic_pointer_c