OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParserNode.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ParserNode.h"
24 
25 #include <boost/algorithm/string.hpp>
26 #include <boost/core/null_deleter.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/function.hpp>
29 
30 #include <rapidjson/document.h>
31 #include <rapidjson/stringbuffer.h>
32 #include <rapidjson/writer.h>
33 
34 #include <cassert>
35 #include <cmath>
36 #include <limits>
37 #include <random>
38 #include <regex>
39 #include <stdexcept>
40 #include <string>
41 #include <type_traits>
42 #include <typeinfo>
43 
45 #include "Catalog/Catalog.h"
51 #include "Geospatial/Compression.h"
52 #include "Geospatial/Types.h"
54 #include "ImportExport/Importer.h"
56 #include "LockMgr/LockMgr.h"
60 #include "QueryEngine/Execute.h"
65 #include "ReservedKeywords.h"
66 #include "Shared/StringTransform.h"
67 #include "Shared/SysDefinitions.h"
69 #include "Shared/measure.h"
70 #include "Shared/shard_key.h"
72 #include "Utils/FsiUtils.h"
73 
74 #include "gen-cpp/CalciteServer.h"
75 
76 size_t g_leaf_count{0};
78 extern bool g_enable_string_functions;
79 extern bool g_enable_fsi;
80 
82 #ifdef ENABLE_IMPORT_PARQUET
83 bool g_enable_legacy_parquet_import{false};
84 #endif
86 
88 
90 using namespace std::string_literals;
91 
92 using TableDefFuncPtr = boost::function<void(TableDescriptor&,
94  const std::list<ColumnDescriptor>& columns)>;
95 
96 using DataframeDefFuncPtr =
97  boost::function<void(DataframeTableDescriptor&,
99  const std::list<ColumnDescriptor>& columns)>;
100 
101 namespace Parser {
102 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
103  // we call this function with unitary executor but is okay since
104  // we know the exact session info from a global session map object
105  // in the executor
108  executor->getSessionLock());
109  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
110  }
111  return false;
112 }
113 
114 std::vector<int> getTableChunkKey(const TableDescriptor* td,
115  Catalog_Namespace::Catalog& catalog) {
116  std::vector<int> table_chunk_key_prefix;
117  if (td) {
118  if (td->fragmenter) {
119  table_chunk_key_prefix = td->fragmenter->getFragmentsForQuery().chunkKeyPrefix;
120  } else {
121  table_chunk_key_prefix.push_back(catalog.getCurrentDB().dbId);
122  table_chunk_key_prefix.push_back(td->tableId);
123  }
124  }
125  return table_chunk_key_prefix;
126 }
127 
128 std::shared_ptr<Analyzer::Expr> NullLiteral::analyze(
129  const Catalog_Namespace::Catalog& catalog,
130  Analyzer::Query& query,
131  TlistRefType allow_tlist_ref) const {
132  return makeExpr<Analyzer::Constant>(kNULLT, true);
133 }
134 
135 std::shared_ptr<Analyzer::Expr> StringLiteral::analyze(
136  const Catalog_Namespace::Catalog& catalog,
137  Analyzer::Query& query,
138  TlistRefType allow_tlist_ref) const {
139  return analyzeValue(*stringval_, false);
140 }
141 
142 std::shared_ptr<Analyzer::Expr> StringLiteral::analyzeValue(const std::string& stringval,
143  const bool is_null) {
144  if (!is_null) {
145  const SQLTypeInfo ti(kVARCHAR, stringval.length(), 0, true);
146  Datum d;
147  d.stringval = new std::string(stringval);
148  return makeExpr<Analyzer::Constant>(ti, false, d);
149  }
150  // Null value
151  return makeExpr<Analyzer::Constant>(kVARCHAR, true);
152 }
153 
154 std::shared_ptr<Analyzer::Expr> IntLiteral::analyze(
155  const Catalog_Namespace::Catalog& catalog,
156  Analyzer::Query& query,
157  TlistRefType allow_tlist_ref) const {
158  return analyzeValue(intval_);
159 }
160 
161 std::shared_ptr<Analyzer::Expr> IntLiteral::analyzeValue(const int64_t intval) {
162  SQLTypes t;
163  Datum d;
164  if (intval >= INT16_MIN && intval <= INT16_MAX) {
165  t = kSMALLINT;
166  d.smallintval = (int16_t)intval;
167  } else if (intval >= INT32_MIN && intval <= INT32_MAX) {
168  t = kINT;
169  d.intval = (int32_t)intval;
170  } else {
171  t = kBIGINT;
172  d.bigintval = intval;
173  }
174  return makeExpr<Analyzer::Constant>(t, false, d);
175 }
176 
177 std::shared_ptr<Analyzer::Expr> FixedPtLiteral::analyze(
178  const Catalog_Namespace::Catalog& catalog,
179  Analyzer::Query& query,
180  TlistRefType allow_tlist_ref) const {
181  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
182  Datum d = StringToDatum(*fixedptval_, ti);
183  return makeExpr<Analyzer::Constant>(ti, false, d);
184 }
185 
186 std::shared_ptr<Analyzer::Expr> FixedPtLiteral::analyzeValue(const int64_t numericval,
187  const int scale,
188  const int precision) {
189  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
190  ti.set_scale(scale);
191  ti.set_precision(precision);
192  Datum d;
193  d.bigintval = numericval;
194  return makeExpr<Analyzer::Constant>(ti, false, d);
195 }
196 
197 std::shared_ptr<Analyzer::Expr> FloatLiteral::analyze(
198  const Catalog_Namespace::Catalog& catalog,
199  Analyzer::Query& query,
200  TlistRefType allow_tlist_ref) const {
201  Datum d;
202  d.floatval = floatval_;
203  return makeExpr<Analyzer::Constant>(kFLOAT, false, d);
204 }
205 
206 std::shared_ptr<Analyzer::Expr> DoubleLiteral::analyze(
207  const Catalog_Namespace::Catalog& catalog,
208  Analyzer::Query& query,
209  TlistRefType allow_tlist_ref) const {
210  Datum d;
211  d.doubleval = doubleval_;
212  return makeExpr<Analyzer::Constant>(kDOUBLE, false, d);
213 }
214 
215 std::shared_ptr<Analyzer::Expr> TimestampLiteral::analyze(
216  const Catalog_Namespace::Catalog& catalog,
217  Analyzer::Query& query,
218  TlistRefType allow_tlist_ref) const {
219  return get(timestampval_);
220 }
221 
222 std::shared_ptr<Analyzer::Expr> TimestampLiteral::get(const int64_t timestampval) {
223  Datum d;
224  d.bigintval = timestampval;
225  return makeExpr<Analyzer::Constant>(kTIMESTAMP, false, d);
226 }
227 
228 std::shared_ptr<Analyzer::Expr> UserLiteral::analyze(
229  const Catalog_Namespace::Catalog& catalog,
230  Analyzer::Query& query,
231  TlistRefType allow_tlist_ref) const {
232  Datum d;
233  return makeExpr<Analyzer::Constant>(kTEXT, false, d);
234 }
235 
236 std::shared_ptr<Analyzer::Expr> UserLiteral::get(const std::string& user) {
237  Datum d;
238  d.stringval = new std::string(user);
239  return makeExpr<Analyzer::Constant>(kTEXT, false, d);
240 }
241 
242 std::shared_ptr<Analyzer::Expr> ArrayLiteral::analyze(
243  const Catalog_Namespace::Catalog& catalog,
244  Analyzer::Query& query,
245  TlistRefType allow_tlist_ref) const {
246  SQLTypeInfo ti = SQLTypeInfo(kARRAY, true);
247  bool set_subtype = true;
248  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
249  for (auto& p : value_list_) {
250  auto e = p->analyze(catalog, query, allow_tlist_ref);
251  CHECK(e);
252  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
253  if (c != nullptr && c->get_is_null()) {
254  value_exprs.push_back(c);
255  continue;
256  }
257  auto subtype = e->get_type_info().get_type();
258  if (subtype == kNULLT) {
259  // NULL element
260  } else if (set_subtype) {
261  ti.set_subtype(subtype);
262  set_subtype = false;
263  }
264  value_exprs.push_back(e);
265  }
266  std::shared_ptr<Analyzer::Expr> result =
267  makeExpr<Analyzer::Constant>(ti, false, value_exprs);
268  return result;
269 }
270 
271 std::string ArrayLiteral::to_string() const {
272  std::string str = "{";
273  bool notfirst = false;
274  for (auto& p : value_list_) {
275  if (notfirst) {
276  str += ", ";
277  } else {
278  notfirst = true;
279  }
280  str += p->to_string();
281  }
282  str += "}";
283  return str;
284 }
285 
286 std::shared_ptr<Analyzer::Expr> OperExpr::analyze(
287  const Catalog_Namespace::Catalog& catalog,
288  Analyzer::Query& query,
289  TlistRefType allow_tlist_ref) const {
290  auto left_expr = left_->analyze(catalog, query, allow_tlist_ref);
291  const auto& left_type = left_expr->get_type_info();
292  if (right_ == nullptr) {
293  return makeExpr<Analyzer::UOper>(
294  left_type, left_expr->get_contains_agg(), optype_, left_expr->decompress());
295  }
296  if (optype_ == kARRAY_AT) {
297  if (left_type.get_type() != kARRAY) {
298  throw std::runtime_error(left_->to_string() + " is not of array type.");
299  }
300  auto right_expr = right_->analyze(catalog, query, allow_tlist_ref);
301  const auto& right_type = right_expr->get_type_info();
302  if (!right_type.is_integer()) {
303  throw std::runtime_error(right_->to_string() + " is not of integer type.");
304  }
305  return makeExpr<Analyzer::BinOper>(
306  left_type.get_elem_type(), false, kARRAY_AT, kONE, left_expr, right_expr);
307  }
308  auto right_expr = right_->analyze(catalog, query, allow_tlist_ref);
309  return normalize(optype_, opqualifier_, left_expr, right_expr);
310 }
311 
312 bool exprs_share_one_and_same_rte_idx(const std::shared_ptr<Analyzer::Expr>& lhs_expr,
313  const std::shared_ptr<Analyzer::Expr>& rhs_expr) {
314  std::set<int> lhs_rte_idx;
315  lhs_expr->collect_rte_idx(lhs_rte_idx);
316  CHECK(!lhs_rte_idx.empty());
317  std::set<int> rhs_rte_idx;
318  rhs_expr->collect_rte_idx(rhs_rte_idx);
319  CHECK(!rhs_rte_idx.empty());
320  return lhs_rte_idx.size() == 1UL && lhs_rte_idx == rhs_rte_idx;
321 }
322 
323 SQLTypeInfo const& get_str_dict_cast_type(const SQLTypeInfo& lhs_type_info,
324  const SQLTypeInfo& rhs_type_info,
325  const Executor* executor) {
326  CHECK(lhs_type_info.is_string());
327  CHECK(lhs_type_info.get_compression() == kENCODING_DICT);
328  CHECK(rhs_type_info.is_string());
329  CHECK(rhs_type_info.get_compression() == kENCODING_DICT);
330  const auto lhs_comp_param = lhs_type_info.get_comp_param();
331  const auto rhs_comp_param = rhs_type_info.get_comp_param();
332  CHECK_NE(lhs_comp_param, rhs_comp_param);
333  if (lhs_type_info.get_comp_param() == TRANSIENT_DICT_ID) {
334  return rhs_type_info;
335  }
336  if (rhs_type_info.get_comp_param() == TRANSIENT_DICT_ID) {
337  return lhs_type_info;
338  }
339  // If here then neither lhs or rhs type was transient, we should see which
340  // type has the largest dictionary and make that the destination type
341  const auto lhs_sdp = executor->getStringDictionaryProxy(lhs_comp_param, true);
342  const auto rhs_sdp = executor->getStringDictionaryProxy(rhs_comp_param, true);
343  return lhs_sdp->entryCount() >= rhs_sdp->entryCount() ? lhs_type_info : rhs_type_info;
344 }
345 
347  const SQLTypeInfo& rhs_type_info,
348  const Executor* executor) {
349  CHECK(lhs_type_info.is_string());
350  CHECK(rhs_type_info.is_string());
351  const auto lhs_comp_param = lhs_type_info.get_comp_param();
352  const auto rhs_comp_param = rhs_type_info.get_comp_param();
353  if (lhs_type_info.is_dict_encoded_string() && rhs_type_info.is_dict_encoded_string()) {
354  if (lhs_comp_param == rhs_comp_param ||
355  lhs_comp_param == TRANSIENT_DICT(rhs_comp_param)) {
356  return lhs_comp_param <= rhs_comp_param ? lhs_type_info : rhs_type_info;
357  }
358  return get_str_dict_cast_type(lhs_type_info, rhs_type_info, executor);
359  }
360  CHECK(lhs_type_info.is_none_encoded_string() || rhs_type_info.is_none_encoded_string());
361  SQLTypeInfo ret_ti =
362  rhs_type_info.is_none_encoded_string() ? lhs_type_info : rhs_type_info;
363  if (ret_ti.is_none_encoded_string()) {
364  ret_ti.set_dimension(
365  std::max(lhs_type_info.get_dimension(), rhs_type_info.get_dimension()));
366  }
367  return ret_ti;
368 }
369 
370 std::shared_ptr<Analyzer::Expr> OperExpr::normalize(
371  const SQLOps optype,
372  const SQLQualifier qual,
373  std::shared_ptr<Analyzer::Expr> left_expr,
374  std::shared_ptr<Analyzer::Expr> right_expr,
375  const Executor* executor) {
376  if (left_expr->get_type_info().is_date_in_days() ||
377  right_expr->get_type_info().is_date_in_days()) {
378  // Do not propogate encoding
379  left_expr = left_expr->decompress();
380  right_expr = right_expr->decompress();
381  }
382  const auto& left_type = left_expr->get_type_info();
383  auto right_type = right_expr->get_type_info();
384  if (qual != kONE) {
385  // subquery not supported yet.
386  CHECK(!std::dynamic_pointer_cast<Analyzer::Subquery>(right_expr));
387  if (right_type.get_type() != kARRAY) {
388  throw std::runtime_error(
389  "Existential or universal qualifiers can only be used in front of a subquery "
390  "or an "
391  "expression of array type.");
392  }
393  right_type = right_type.get_elem_type();
394  }
395  SQLTypeInfo new_left_type;
396  SQLTypeInfo new_right_type;
397  auto result_type = Analyzer::BinOper::analyze_type_info(
398  optype, left_type, right_type, &new_left_type, &new_right_type);
399  if (result_type.is_timeinterval()) {
400  return makeExpr<Analyzer::BinOper>(
401  result_type, false, optype, qual, left_expr, right_expr);
402  }
403  if (left_type != new_left_type) {
404  left_expr = left_expr->add_cast(new_left_type);
405  }
406  if (right_type != new_right_type) {
407  if (qual == kONE) {
408  right_expr = right_expr->add_cast(new_right_type);
409  } else {
410  right_expr = right_expr->add_cast(new_right_type.get_array_type());
411  }
412  }
413 
414  if (IS_COMPARISON(optype)) {
415  if (optype != kOVERLAPS && new_left_type.is_geometry() &&
416  new_right_type.is_geometry()) {
417  throw std::runtime_error(
418  "Comparison operators are not yet supported for geospatial types.");
419  }
420 
421  if (new_left_type.get_compression() == kENCODING_DICT &&
422  new_right_type.get_compression() == kENCODING_DICT) {
423  if (new_left_type.get_comp_param() != new_right_type.get_comp_param()) {
424  if (optype == kEQ || optype == kNE) {
425  // Join framework does its own string dictionary translation
426  // (at least partly since the rhs table projection does not use
427  // the normal runtime execution framework), so if we detect
428  // that the rte idxs of the two tables are different, bail
429  // on translating
430  const bool should_translate_strings =
431  exprs_share_one_and_same_rte_idx(left_expr, right_expr);
432  if (should_translate_strings && (optype == kEQ || optype == kNE)) {
433  CHECK(executor);
434  // Make the type we're casting to the transient dictionary, if it exists,
435  // otherwise the largest dictionary in terms of number of entries
436  SQLTypeInfo ti(
437  get_str_dict_cast_type(new_left_type, new_right_type, executor));
438  auto& expr_to_cast = ti == new_left_type ? right_expr : left_expr;
439  ti.set_fixed_size();
441  expr_to_cast = expr_to_cast->add_cast(ti);
442  } else { // Ordered comparison operator
443  // We do not currently support ordered (i.e. >, <=) comparisons between
444  // dictionary-encoded columns, and need to decompress when translation
445  // is turned off even for kEQ and KNE
446  left_expr = left_expr->decompress();
447  right_expr = right_expr->decompress();
448  }
449  } else { // Ordered comparison operator
450  // We do not currently support ordered (i.e. >, <=) comparisons between
451  // dictionary-encoded columns, and need to decompress when translation
452  // is turned off even for kEQ and KNE
453  left_expr = left_expr->decompress();
454  right_expr = right_expr->decompress();
455  }
456  } else { // Strings shared comp param
457  if (!(optype == kEQ || optype == kNE)) {
458  // We do not currently support ordered (i.e. >, <=) comparisons between
459  // encoded columns, so try to decode (will only succeed with watchdog off)
460  left_expr = left_expr->decompress();
461  right_expr = right_expr->decompress();
462  } else {
463  // do nothing, can directly support equals/non-equals comparisons between two
464  // dictionary encoded columns sharing the same dictionary as these are
465  // effectively integer comparisons in the same dictionary space
466  }
467  }
468  } else if (new_left_type.get_compression() == kENCODING_DICT &&
469  new_right_type.get_compression() == kENCODING_NONE) {
470  SQLTypeInfo ti(new_right_type);
471  ti.set_compression(new_left_type.get_compression());
472  ti.set_comp_param(new_left_type.get_comp_param());
473  ti.set_fixed_size();
474  right_expr = right_expr->add_cast(ti);
475  } else if (new_right_type.get_compression() == kENCODING_DICT &&
476  new_left_type.get_compression() == kENCODING_NONE) {
477  SQLTypeInfo ti(new_left_type);
478  ti.set_compression(new_right_type.get_compression());
479  ti.set_comp_param(new_right_type.get_comp_param());
480  ti.set_fixed_size();
481  left_expr = left_expr->add_cast(ti);
482  } else {
483  left_expr = left_expr->decompress();
484  right_expr = right_expr->decompress();
485  }
486  } else {
487  // Is this now a no-op just for pairs of none-encoded string columns
488  left_expr = left_expr->decompress();
489  right_expr = right_expr->decompress();
490  }
491  bool has_agg = (left_expr->get_contains_agg() || right_expr->get_contains_agg());
492  return makeExpr<Analyzer::BinOper>(
493  result_type, has_agg, optype, qual, left_expr, right_expr);
494 }
495 
496 std::shared_ptr<Analyzer::Expr> SubqueryExpr::analyze(
497  const Catalog_Namespace::Catalog& catalog,
498  Analyzer::Query& query,
499  TlistRefType allow_tlist_ref) const {
500  throw std::runtime_error("Subqueries are not supported yet.");
501  return nullptr;
502 }
503 
504 std::shared_ptr<Analyzer::Expr> IsNullExpr::analyze(
505  const Catalog_Namespace::Catalog& catalog,
506  Analyzer::Query& query,
507  TlistRefType allow_tlist_ref) const {
508  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
509  auto result = makeExpr<Analyzer::UOper>(kBOOLEAN, kISNULL, arg_expr);
510  if (is_not_) {
511  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
512  }
513  return result;
514 }
515 
516 std::shared_ptr<Analyzer::Expr> InSubquery::analyze(
517  const Catalog_Namespace::Catalog& catalog,
518  Analyzer::Query& query,
519  TlistRefType allow_tlist_ref) const {
520  throw std::runtime_error("Subqueries are not supported yet.");
521  return nullptr;
522 }
523 
524 std::shared_ptr<Analyzer::Expr> InValues::analyze(
525  const Catalog_Namespace::Catalog& catalog,
526  Analyzer::Query& query,
527  TlistRefType allow_tlist_ref) const {
528  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
529  SQLTypeInfo ti = arg_expr->get_type_info();
530  bool dict_comp = ti.get_compression() == kENCODING_DICT;
531  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
532  for (auto& p : value_list_) {
533  auto e = p->analyze(catalog, query, allow_tlist_ref);
534  if (ti != e->get_type_info()) {
535  if (ti.is_string() && e->get_type_info().is_string()) {
536  // Todo(todd): Can we have this leverage the cast framework as well
537  ti = Analyzer::BinOper::common_string_type(ti, e->get_type_info());
538  } else if (ti.is_number() && e->get_type_info().is_number()) {
539  ti = Analyzer::BinOper::common_numeric_type(ti, e->get_type_info());
540  } else {
541  throw std::runtime_error("IN expressions must contain compatible types.");
542  }
543  }
544  if (dict_comp) {
545  value_exprs.push_back(e->add_cast(arg_expr->get_type_info()));
546  } else {
547  value_exprs.push_back(e);
548  }
549  }
550  if (!dict_comp) {
551  arg_expr = arg_expr->decompress();
552  arg_expr = arg_expr->add_cast(ti);
553  std::list<std::shared_ptr<Analyzer::Expr>> cast_vals;
554  for (auto p : value_exprs) {
555  cast_vals.push_back(p->add_cast(ti));
556  }
557  value_exprs.swap(cast_vals);
558  }
559  std::shared_ptr<Analyzer::Expr> result =
560  makeExpr<Analyzer::InValues>(arg_expr, value_exprs);
561  if (is_not_) {
562  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
563  }
564  return result;
565 }
566 
567 std::shared_ptr<Analyzer::Expr> BetweenExpr::analyze(
568  const Catalog_Namespace::Catalog& catalog,
569  Analyzer::Query& query,
570  TlistRefType allow_tlist_ref) const {
571  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
572  auto lower_expr = lower_->analyze(catalog, query, allow_tlist_ref);
573  auto upper_expr = upper_->analyze(catalog, query, allow_tlist_ref);
574  SQLTypeInfo new_left_type, new_right_type;
576  arg_expr->get_type_info(),
577  lower_expr->get_type_info(),
578  &new_left_type,
579  &new_right_type);
580  auto lower_pred =
581  makeExpr<Analyzer::BinOper>(kBOOLEAN,
582  kGE,
583  kONE,
584  arg_expr->add_cast(new_left_type)->decompress(),
585  lower_expr->add_cast(new_right_type)->decompress());
587  arg_expr->get_type_info(),
588  lower_expr->get_type_info(),
589  &new_left_type,
590  &new_right_type);
591  auto upper_pred = makeExpr<Analyzer::BinOper>(
592  kBOOLEAN,
593  kLE,
594  kONE,
595  arg_expr->deep_copy()->add_cast(new_left_type)->decompress(),
596  upper_expr->add_cast(new_right_type)->decompress());
597  std::shared_ptr<Analyzer::Expr> result =
598  makeExpr<Analyzer::BinOper>(kBOOLEAN, kAND, kONE, lower_pred, upper_pred);
599  if (is_not_) {
600  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
601  }
602  return result;
603 }
604 
605 std::shared_ptr<Analyzer::Expr> CharLengthExpr::analyze(
606  const Catalog_Namespace::Catalog& catalog,
607  Analyzer::Query& query,
608  TlistRefType allow_tlist_ref) const {
609  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
610  if (!arg_expr->get_type_info().is_string()) {
611  throw std::runtime_error(
612  "expression in char_length clause must be of a string type.");
613  }
614  std::shared_ptr<Analyzer::Expr> result =
615  makeExpr<Analyzer::CharLengthExpr>(arg_expr->decompress(), calc_encoded_length_);
616  return result;
617 }
618 
619 std::shared_ptr<Analyzer::Expr> CardinalityExpr::analyze(
620  const Catalog_Namespace::Catalog& catalog,
621  Analyzer::Query& query,
622  TlistRefType allow_tlist_ref) const {
623  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
624  if (!arg_expr->get_type_info().is_array()) {
625  throw std::runtime_error(
626  "expression in cardinality clause must be of an array type.");
627  }
628  std::shared_ptr<Analyzer::Expr> result =
629  makeExpr<Analyzer::CardinalityExpr>(arg_expr->decompress());
630  return result;
631 }
632 
633 void LikeExpr::check_like_expr(const std::string& like_str, char escape_char) {
634  if (like_str.back() == escape_char) {
635  throw std::runtime_error("LIKE pattern must not end with escape character.");
636  }
637 }
638 
639 bool LikeExpr::test_is_simple_expr(const std::string& like_str, char escape_char) {
640  // if not bounded by '%' then not a simple string
641  if (like_str.size() < 2 || like_str[0] != '%' || like_str[like_str.size() - 1] != '%') {
642  return false;
643  }
644  // if the last '%' is escaped then not a simple string
645  if (like_str[like_str.size() - 2] == escape_char &&
646  like_str[like_str.size() - 3] != escape_char) {
647  return false;
648  }
649  for (size_t i = 1; i < like_str.size() - 1; i++) {
650  if (like_str[i] == '%' || like_str[i] == '_' || like_str[i] == '[' ||
651  like_str[i] == ']') {
652  if (like_str[i - 1] != escape_char) {
653  return false;
654  }
655  }
656  }
657  return true;
658 }
659 
660 void LikeExpr::erase_cntl_chars(std::string& like_str, char escape_char) {
661  char prev_char = '\0';
662  // easier to create new string of allowable chars
663  // rather than erase chars from
664  // existing string
665  std::string new_str;
666  for (char& cur_char : like_str) {
667  if (cur_char == '%' || cur_char == escape_char) {
668  if (prev_char != escape_char) {
669  prev_char = cur_char;
670  continue;
671  }
672  }
673  new_str.push_back(cur_char);
674  prev_char = cur_char;
675  }
676  like_str = new_str;
677 }
678 
679 std::shared_ptr<Analyzer::Expr> LikeExpr::analyze(
680  const Catalog_Namespace::Catalog& catalog,
681  Analyzer::Query& query,
682  TlistRefType allow_tlist_ref) const {
683  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
684  auto like_expr = like_string_->analyze(catalog, query, allow_tlist_ref);
685  auto escape_expr = escape_string_ == nullptr
686  ? nullptr
687  : escape_string_->analyze(catalog, query, allow_tlist_ref);
688  return LikeExpr::get(arg_expr, like_expr, escape_expr, is_ilike_, is_not_);
689 }
690 
691 std::shared_ptr<Analyzer::Expr> LikeExpr::get(std::shared_ptr<Analyzer::Expr> arg_expr,
692  std::shared_ptr<Analyzer::Expr> like_expr,
693  std::shared_ptr<Analyzer::Expr> escape_expr,
694  const bool is_ilike,
695  const bool is_not) {
696  if (!arg_expr->get_type_info().is_string()) {
697  throw std::runtime_error("expression before LIKE must be of a string type.");
698  }
699  if (!like_expr->get_type_info().is_string()) {
700  throw std::runtime_error("expression after LIKE must be of a string type.");
701  }
702  char escape_char = '\\';
703  if (escape_expr != nullptr) {
704  if (!escape_expr->get_type_info().is_string()) {
705  throw std::runtime_error("expression after ESCAPE must be of a string type.");
706  }
707  if (!escape_expr->get_type_info().is_string()) {
708  throw std::runtime_error("expression after ESCAPE must be of a string type.");
709  }
710  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(escape_expr);
711  if (c != nullptr && c->get_constval().stringval->length() > 1) {
712  throw std::runtime_error("String after ESCAPE must have a single character.");
713  }
714  escape_char = (*c->get_constval().stringval)[0];
715  }
716  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(like_expr);
717  bool is_simple = false;
718  if (c != nullptr) {
719  std::string& pattern = *c->get_constval().stringval;
720  if (is_ilike) {
721  std::transform(pattern.begin(), pattern.end(), pattern.begin(), ::tolower);
722  }
723  check_like_expr(pattern, escape_char);
724  is_simple = test_is_simple_expr(pattern, escape_char);
725  if (is_simple) {
726  erase_cntl_chars(pattern, escape_char);
727  }
728  }
729  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::LikeExpr>(
730  arg_expr->decompress(), like_expr, escape_expr, is_ilike, is_simple);
731  if (is_not) {
732  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
733  }
734  return result;
735 }
736 
737 void RegexpExpr::check_pattern_expr(const std::string& pattern_str, char escape_char) {
738  if (pattern_str.back() == escape_char) {
739  throw std::runtime_error("REGEXP pattern must not end with escape character.");
740  }
741 }
742 
743 bool RegexpExpr::translate_to_like_pattern(std::string& pattern_str, char escape_char) {
744  char prev_char = '\0';
745  char prev_prev_char = '\0';
746  std::string like_str;
747  for (char& cur_char : pattern_str) {
748  if (prev_char == escape_char || isalnum(cur_char) || cur_char == ' ' ||
749  cur_char == '.') {
750  like_str.push_back((cur_char == '.') ? '_' : cur_char);
751  prev_prev_char = prev_char;
752  prev_char = cur_char;
753  continue;
754  }
755  if (prev_char == '.' && prev_prev_char != escape_char) {
756  if (cur_char == '*' || cur_char == '+') {
757  if (cur_char == '*') {
758  like_str.pop_back();
759  }
760  // .* --> %
761  // .+ --> _%
762  like_str.push_back('%');
763  prev_prev_char = prev_char;
764  prev_char = cur_char;
765  continue;
766  }
767  }
768  return false;
769  }
770  pattern_str = like_str;
771  return true;
772 }
773 
774 std::shared_ptr<Analyzer::Expr> RegexpExpr::analyze(
775  const Catalog_Namespace::Catalog& catalog,
776  Analyzer::Query& query,
777  TlistRefType allow_tlist_ref) const {
778  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
779  auto pattern_expr = pattern_string_->analyze(catalog, query, allow_tlist_ref);
780  auto escape_expr = escape_string_ == nullptr
781  ? nullptr
782  : escape_string_->analyze(catalog, query, allow_tlist_ref);
783  return RegexpExpr::get(arg_expr, pattern_expr, escape_expr, is_not_);
784 }
785 
786 std::shared_ptr<Analyzer::Expr> RegexpExpr::get(
787  std::shared_ptr<Analyzer::Expr> arg_expr,
788  std::shared_ptr<Analyzer::Expr> pattern_expr,
789  std::shared_ptr<Analyzer::Expr> escape_expr,
790  const bool is_not) {
791  if (!arg_expr->get_type_info().is_string()) {
792  throw std::runtime_error("expression before REGEXP must be of a string type.");
793  }
794  if (!pattern_expr->get_type_info().is_string()) {
795  throw std::runtime_error("expression after REGEXP must be of a string type.");
796  }
797  char escape_char = '\\';
798  if (escape_expr != nullptr) {
799  if (!escape_expr->get_type_info().is_string()) {
800  throw std::runtime_error("expression after ESCAPE must be of a string type.");
801  }
802  if (!escape_expr->get_type_info().is_string()) {
803  throw std::runtime_error("expression after ESCAPE must be of a string type.");
804  }
805  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(escape_expr);
806  if (c != nullptr && c->get_constval().stringval->length() > 1) {
807  throw std::runtime_error("String after ESCAPE must have a single character.");
808  }
809  escape_char = (*c->get_constval().stringval)[0];
810  if (escape_char != '\\') {
811  throw std::runtime_error("Only supporting '\\' escape character.");
812  }
813  }
814  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(pattern_expr);
815  if (c != nullptr) {
816  std::string& pattern = *c->get_constval().stringval;
817  if (translate_to_like_pattern(pattern, escape_char)) {
818  return LikeExpr::get(arg_expr, pattern_expr, escape_expr, false, is_not);
819  }
820  }
821  std::shared_ptr<Analyzer::Expr> result =
822  makeExpr<Analyzer::RegexpExpr>(arg_expr->decompress(), pattern_expr, escape_expr);
823  if (is_not) {
824  result = makeExpr<Analyzer::UOper>(kBOOLEAN, kNOT, result);
825  }
826  return result;
827 }
828 
829 std::shared_ptr<Analyzer::Expr> LikelihoodExpr::analyze(
830  const Catalog_Namespace::Catalog& catalog,
831  Analyzer::Query& query,
832  TlistRefType allow_tlist_ref) const {
833  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
834  return LikelihoodExpr::get(arg_expr, likelihood_, is_not_);
835 }
836 
837 std::shared_ptr<Analyzer::Expr> LikelihoodExpr::get(
838  std::shared_ptr<Analyzer::Expr> arg_expr,
839  float likelihood,
840  const bool is_not) {
841  if (!arg_expr->get_type_info().is_boolean()) {
842  throw std::runtime_error("likelihood expression expects boolean type.");
843  }
844  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::LikelihoodExpr>(
845  arg_expr->decompress(), is_not ? 1 - likelihood : likelihood);
846  return result;
847 }
848 
849 std::shared_ptr<Analyzer::Expr> WidthBucketExpr::analyze(
850  const Catalog_Namespace::Catalog& catalog,
851  Analyzer::Query& query,
852  TlistRefType allow_tlist_ref) const {
853  auto target_value = target_value_->analyze(catalog, query, allow_tlist_ref);
854  auto lower_bound = lower_bound_->analyze(catalog, query, allow_tlist_ref);
855  auto upper_bound = upper_bound_->analyze(catalog, query, allow_tlist_ref);
856  auto partition_count = partition_count_->analyze(catalog, query, allow_tlist_ref);
857  return WidthBucketExpr::get(target_value, lower_bound, upper_bound, partition_count);
858 }
859 
860 std::shared_ptr<Analyzer::Expr> WidthBucketExpr::get(
861  std::shared_ptr<Analyzer::Expr> target_value,
862  std::shared_ptr<Analyzer::Expr> lower_bound,
863  std::shared_ptr<Analyzer::Expr> upper_bound,
864  std::shared_ptr<Analyzer::Expr> partition_count) {
865  std::shared_ptr<Analyzer::Expr> result = makeExpr<Analyzer::WidthBucketExpr>(
866  target_value, lower_bound, upper_bound, partition_count);
867  return result;
868 }
869 
870 std::shared_ptr<Analyzer::Expr> ExistsExpr::analyze(
871  const Catalog_Namespace::Catalog& catalog,
872  Analyzer::Query& query,
873  TlistRefType allow_tlist_ref) const {
874  throw std::runtime_error("Subqueries are not supported yet.");
875  return nullptr;
876 }
877 
878 std::shared_ptr<Analyzer::Expr> ColumnRef::analyze(
879  const Catalog_Namespace::Catalog& catalog,
880  Analyzer::Query& query,
881  TlistRefType allow_tlist_ref) const {
882  int table_id{0};
883  int rte_idx{0};
884  const ColumnDescriptor* cd{nullptr};
885  if (column_ == nullptr) {
886  throw std::runtime_error("invalid column name *.");
887  }
888  if (table_ != nullptr) {
889  rte_idx = query.get_rte_idx(*table_);
890  if (rte_idx < 0) {
891  throw std::runtime_error("range variable or table name " + *table_ +
892  " does not exist.");
893  }
894  Analyzer::RangeTableEntry* rte = query.get_rte(rte_idx);
895  cd = rte->get_column_desc(catalog, *column_);
896  if (cd == nullptr) {
897  throw std::runtime_error("Column name " + *column_ + " does not exist.");
898  }
899  table_id = rte->get_table_id();
900  } else {
901  bool found = false;
902  int i = 0;
903  for (auto rte : query.get_rangetable()) {
904  cd = rte->get_column_desc(catalog, *column_);
905  if (cd != nullptr && !found) {
906  found = true;
907  rte_idx = i;
908  table_id = rte->get_table_id();
909  } else if (cd != nullptr && found) {
910  throw std::runtime_error("Column name " + *column_ + " is ambiguous.");
911  }
912  i++;
913  }
914  if (cd == nullptr && allow_tlist_ref != TlistRefType::TLIST_NONE) {
915  // check if this is a reference to a targetlist entry
916  bool found = false;
917  int varno = -1;
918  int i = 1;
919  std::shared_ptr<Analyzer::TargetEntry> tle;
920  for (auto p : query.get_targetlist()) {
921  if (*column_ == p->get_resname() && !found) {
922  found = true;
923  varno = i;
924  tle = p;
925  } else if (*column_ == p->get_resname() && found) {
926  throw std::runtime_error("Output alias " + *column_ + " is ambiguous.");
927  }
928  i++;
929  }
930  if (found) {
931  if (dynamic_cast<Analyzer::Var*>(tle->get_expr())) {
932  Analyzer::Var* v = static_cast<Analyzer::Var*>(tle->get_expr());
934  return v->deep_copy();
935  }
936  }
937  if (allow_tlist_ref == TlistRefType::TLIST_COPY) {
938  return tle->get_expr()->deep_copy();
939  } else {
940  return makeExpr<Analyzer::Var>(
941  tle->get_expr()->get_type_info(), Analyzer::Var::kOUTPUT, varno);
942  }
943  }
944  }
945  if (cd == nullptr) {
946  throw std::runtime_error("Column name " + *column_ + " does not exist.");
947  }
948  }
949  return makeExpr<Analyzer::ColumnVar>(cd->columnType, table_id, cd->columnId, rte_idx);
950 }
951 
952 std::shared_ptr<Analyzer::Expr> FunctionRef::analyze(
953  const Catalog_Namespace::Catalog& catalog,
954  Analyzer::Query& query,
955  TlistRefType allow_tlist_ref) const {
956  SQLTypeInfo result_type;
957  SQLAgg agg_type;
958  std::shared_ptr<Analyzer::Expr> arg_expr;
959  bool is_distinct = false;
960  if (boost::iequals(*name_, "count")) {
961  result_type = SQLTypeInfo(kBIGINT, false);
962  agg_type = kCOUNT;
963  if (arg_) {
964  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
965  const SQLTypeInfo& ti = arg_expr->get_type_info();
966  if (ti.is_string() && (ti.get_compression() != kENCODING_DICT || !distinct_)) {
967  throw std::runtime_error(
968  "Strings must be dictionary-encoded in COUNT(DISTINCT).");
969  }
970  if (ti.get_type() == kARRAY && !distinct_) {
971  throw std::runtime_error("Only COUNT(DISTINCT) is supported on arrays.");
972  }
973  }
974  is_distinct = distinct_;
975  } else {
976  if (!arg_) {
977  throw std::runtime_error("Cannot compute " + *name_ + " with argument '*'.");
978  }
979  if (boost::iequals(*name_, "min")) {
980  agg_type = kMIN;
981  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
982  arg_expr = arg_expr->decompress();
983  result_type = arg_expr->get_type_info();
984  } else if (boost::iequals(*name_, "max")) {
985  agg_type = kMAX;
986  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
987  arg_expr = arg_expr->decompress();
988  result_type = arg_expr->get_type_info();
989  } else if (boost::iequals(*name_, "avg")) {
990  agg_type = kAVG;
991  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
992  if (!arg_expr->get_type_info().is_number()) {
993  throw std::runtime_error("Cannot compute AVG on non-number-type arguments.");
994  }
995  arg_expr = arg_expr->decompress();
996  result_type = SQLTypeInfo(kDOUBLE, false);
997  } else if (boost::iequals(*name_, "sum")) {
998  agg_type = kSUM;
999  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1000  if (!arg_expr->get_type_info().is_number()) {
1001  throw std::runtime_error("Cannot compute SUM on non-number-type arguments.");
1002  }
1003  arg_expr = arg_expr->decompress();
1004  result_type = arg_expr->get_type_info().is_integer() ? SQLTypeInfo(kBIGINT, false)
1005  : arg_expr->get_type_info();
1006  } else if (boost::iequals(*name_, "unnest")) {
1007  arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1008  const SQLTypeInfo& arg_ti = arg_expr->get_type_info();
1009  if (arg_ti.get_type() != kARRAY) {
1010  throw std::runtime_error(arg_->to_string() + " is not of array type.");
1011  }
1012  return makeExpr<Analyzer::UOper>(arg_ti.get_elem_type(), false, kUNNEST, arg_expr);
1013  } else {
1014  throw std::runtime_error("invalid function name: " + *name_);
1015  }
1016  if (arg_expr->get_type_info().is_string() ||
1017  arg_expr->get_type_info().get_type() == kARRAY) {
1018  throw std::runtime_error(
1019  "Only COUNT(DISTINCT ) aggregate is supported on strings and arrays.");
1020  }
1021  }
1022  int naggs = query.get_num_aggs();
1023  query.set_num_aggs(naggs + 1);
1024  return makeExpr<Analyzer::AggExpr>(
1025  result_type, agg_type, arg_expr, is_distinct, nullptr);
1026 }
1027 
1028 std::shared_ptr<Analyzer::Expr> CastExpr::analyze(
1029  const Catalog_Namespace::Catalog& catalog,
1030  Analyzer::Query& query,
1031  TlistRefType allow_tlist_ref) const {
1032  target_type_->check_type();
1033  auto arg_expr = arg_->analyze(catalog, query, allow_tlist_ref);
1034  SQLTypeInfo ti(target_type_->get_type(),
1035  target_type_->get_param1(),
1036  target_type_->get_param2(),
1037  arg_expr->get_type_info().get_notnull());
1038  if (arg_expr->get_type_info().get_type() != target_type_->get_type() &&
1039  arg_expr->get_type_info().get_compression() != kENCODING_NONE) {
1040  arg_expr->decompress();
1041  }
1042  return arg_expr->add_cast(ti);
1043 }
1044 
1045 std::shared_ptr<Analyzer::Expr> CaseExpr::analyze(
1046  const Catalog_Namespace::Catalog& catalog,
1047  Analyzer::Query& query,
1048  TlistRefType allow_tlist_ref) const {
1049  SQLTypeInfo ti;
1050  std::list<std::pair<std::shared_ptr<Analyzer::Expr>, std::shared_ptr<Analyzer::Expr>>>
1051  expr_pair_list;
1052  for (auto& p : when_then_list_) {
1053  auto e1 = p->get_expr1()->analyze(catalog, query, allow_tlist_ref);
1054  if (e1->get_type_info().get_type() != kBOOLEAN) {
1055  throw std::runtime_error("Only boolean expressions can be used after WHEN.");
1056  }
1057  auto e2 = p->get_expr2()->analyze(catalog, query, allow_tlist_ref);
1058  expr_pair_list.emplace_back(e1, e2);
1059  }
1060  auto else_e =
1061  else_expr_ ? else_expr_->analyze(catalog, query, allow_tlist_ref) : nullptr;
1062  return normalize(expr_pair_list, else_e);
1063 }
1064 
1065 namespace {
1066 
1068  const std::string* s = str_literal->get_stringval();
1069  if (*s == "t" || *s == "true" || *s == "T" || *s == "True") {
1070  return true;
1071  } else if (*s == "f" || *s == "false" || *s == "F" || *s == "False") {
1072  return false;
1073  } else {
1074  throw std::runtime_error("Invalid string for boolean " + *s);
1075  }
1076 }
1077 
1078 void parse_copy_params(const std::list<std::unique_ptr<NameValueAssign>>& options_,
1079  import_export::CopyParams& copy_params,
1080  std::vector<std::string>& warnings,
1081  std::string& deferred_copy_from_partitions_) {
1082  if (!options_.empty()) {
1083  for (auto& p : options_) {
1084  if (boost::iequals(*p->get_name(), "max_reject")) {
1085  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1086  if (int_literal == nullptr) {
1087  throw std::runtime_error("max_reject option must be an integer.");
1088  }
1089  copy_params.max_reject = int_literal->get_intval();
1090  } else if (boost::iequals(*p->get_name(), "buffer_size")) {
1091  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1092  if (int_literal == nullptr) {
1093  throw std::runtime_error("buffer_size option must be an integer.");
1094  }
1095  copy_params.buffer_size = int_literal->get_intval();
1096  } else if (boost::iequals(*p->get_name(), "threads")) {
1097  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1098  if (int_literal == nullptr) {
1099  throw std::runtime_error("Threads option must be an integer.");
1100  }
1101  copy_params.threads = int_literal->get_intval();
1102  } else if (boost::iequals(*p->get_name(), "delimiter")) {
1103  const StringLiteral* str_literal =
1104  dynamic_cast<const StringLiteral*>(p->get_value());
1105  if (str_literal == nullptr) {
1106  throw std::runtime_error("Delimiter option must be a string.");
1107  } else if (str_literal->get_stringval()->length() != 1) {
1108  throw std::runtime_error("Delimiter must be a single character string.");
1109  }
1110  copy_params.delimiter = (*str_literal->get_stringval())[0];
1111  } else if (boost::iequals(*p->get_name(), "nulls")) {
1112  const StringLiteral* str_literal =
1113  dynamic_cast<const StringLiteral*>(p->get_value());
1114  if (str_literal == nullptr) {
1115  throw std::runtime_error("Nulls option must be a string.");
1116  }
1117  copy_params.null_str = *str_literal->get_stringval();
1118  } else if (boost::iequals(*p->get_name(), "header")) {
1119  const StringLiteral* str_literal =
1120  dynamic_cast<const StringLiteral*>(p->get_value());
1121  if (str_literal == nullptr) {
1122  throw std::runtime_error("Header option must be a boolean.");
1123  }
1124  copy_params.has_header = bool_from_string_literal(str_literal)
1127 #ifdef ENABLE_IMPORT_PARQUET
1128  } else if (boost::iequals(*p->get_name(), "parquet")) {
1129  warnings.push_back(
1130  "Deprecation Warning: COPY FROM WITH (parquet='true') is deprecated. Use "
1131  "WITH (source_type='parquet_file') instead.");
1132  const StringLiteral* str_literal =
1133  dynamic_cast<const StringLiteral*>(p->get_value());
1134  if (str_literal == nullptr) {
1135  throw std::runtime_error("'parquet' option must be a boolean.");
1136  }
1137  if (bool_from_string_literal(str_literal)) {
1138  // not sure a parquet "table" type is proper, but to make code
1139  // look consistent in some places, let's set "table" type too
1141  }
1142 #endif // ENABLE_IMPORT_PARQUET
1143  } else if (boost::iequals(*p->get_name(), "s3_access_key")) {
1144  const StringLiteral* str_literal =
1145  dynamic_cast<const StringLiteral*>(p->get_value());
1146  if (str_literal == nullptr) {
1147  throw std::runtime_error("Option s3_access_key must be a string.");
1148  }
1149  copy_params.s3_access_key = *str_literal->get_stringval();
1150  } else if (boost::iequals(*p->get_name(), "s3_secret_key")) {
1151  const StringLiteral* str_literal =
1152  dynamic_cast<const StringLiteral*>(p->get_value());
1153  if (str_literal == nullptr) {
1154  throw std::runtime_error("Option s3_secret_key must be a string.");
1155  }
1156  copy_params.s3_secret_key = *str_literal->get_stringval();
1157  } else if (boost::iequals(*p->get_name(), "s3_session_token")) {
1158  const StringLiteral* str_literal =
1159  dynamic_cast<const StringLiteral*>(p->get_value());
1160  if (str_literal == nullptr) {
1161  throw std::runtime_error("Option s3_session_token must be a string.");
1162  }
1163  copy_params.s3_session_token = *str_literal->get_stringval();
1164  } else if (boost::iequals(*p->get_name(), "s3_region")) {
1165  const StringLiteral* str_literal =
1166  dynamic_cast<const StringLiteral*>(p->get_value());
1167  if (str_literal == nullptr) {
1168  throw std::runtime_error("Option s3_region must be a string.");
1169  }
1170  copy_params.s3_region = *str_literal->get_stringval();
1171  } else if (boost::iequals(*p->get_name(), "s3_endpoint")) {
1172  const StringLiteral* str_literal =
1173  dynamic_cast<const StringLiteral*>(p->get_value());
1174  if (str_literal == nullptr) {
1175  throw std::runtime_error("Option s3_endpoint must be a string.");
1176  }
1177  copy_params.s3_endpoint = *str_literal->get_stringval();
1178  } else if (boost::iequals(*p->get_name(), "s3_max_concurrent_downloads")) {
1179  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1180  if (int_literal == nullptr) {
1181  throw std::runtime_error(
1182  "'s3_max_concurrent_downloads' option must be an integer");
1183  }
1184  const int s3_max_concurrent_downloads = int_literal->get_intval();
1185  if (s3_max_concurrent_downloads > 0) {
1186  copy_params.s3_max_concurrent_downloads = s3_max_concurrent_downloads;
1187  } else {
1188  throw std::runtime_error(
1189  "Invalid value for 's3_max_concurrent_downloads' option (must be > 0): " +
1190  std::to_string(s3_max_concurrent_downloads));
1191  }
1192  } else if (boost::iequals(*p->get_name(), "quote")) {
1193  const StringLiteral* str_literal =
1194  dynamic_cast<const StringLiteral*>(p->get_value());
1195  if (str_literal == nullptr) {
1196  throw std::runtime_error("Quote option must be a string.");
1197  } else if (str_literal->get_stringval()->length() != 1) {
1198  throw std::runtime_error("Quote must be a single character string.");
1199  }
1200  copy_params.quote = (*str_literal->get_stringval())[0];
1201  } else if (boost::iequals(*p->get_name(), "escape")) {
1202  const StringLiteral* str_literal =
1203  dynamic_cast<const StringLiteral*>(p->get_value());
1204  if (str_literal == nullptr) {
1205  throw std::runtime_error("Escape option must be a string.");
1206  } else if (str_literal->get_stringval()->length() != 1) {
1207  throw std::runtime_error("Escape must be a single character string.");
1208  }
1209  copy_params.escape = (*str_literal->get_stringval())[0];
1210  } else if (boost::iequals(*p->get_name(), "line_delimiter")) {
1211  const StringLiteral* str_literal =
1212  dynamic_cast<const StringLiteral*>(p->get_value());
1213  if (str_literal == nullptr) {
1214  throw std::runtime_error("Line_delimiter option must be a string.");
1215  } else if (str_literal->get_stringval()->length() != 1) {
1216  throw std::runtime_error("Line_delimiter must be a single character string.");
1217  }
1218  copy_params.line_delim = (*str_literal->get_stringval())[0];
1219  } else if (boost::iequals(*p->get_name(), "quoted")) {
1220  const StringLiteral* str_literal =
1221  dynamic_cast<const StringLiteral*>(p->get_value());
1222  if (str_literal == nullptr) {
1223  throw std::runtime_error("Quoted option must be a boolean.");
1224  }
1225  copy_params.quoted = bool_from_string_literal(str_literal);
1226  } else if (boost::iequals(*p->get_name(), "plain_text")) {
1227  const StringLiteral* str_literal =
1228  dynamic_cast<const StringLiteral*>(p->get_value());
1229  if (str_literal == nullptr) {
1230  throw std::runtime_error("plain_text option must be a boolean.");
1231  }
1232  copy_params.plain_text = bool_from_string_literal(str_literal);
1233  } else if (boost::iequals(*p->get_name(), "trim_spaces")) {
1234  const StringLiteral* str_literal =
1235  dynamic_cast<const StringLiteral*>(p->get_value());
1236  if (str_literal == nullptr) {
1237  throw std::runtime_error("trim_spaces option must be a boolean.");
1238  }
1239  copy_params.trim_spaces = bool_from_string_literal(str_literal);
1240  } else if (boost::iequals(*p->get_name(), "array_marker")) {
1241  const StringLiteral* str_literal =
1242  dynamic_cast<const StringLiteral*>(p->get_value());
1243  if (str_literal == nullptr) {
1244  throw std::runtime_error("Array Marker option must be a string.");
1245  } else if (str_literal->get_stringval()->length() != 2) {
1246  throw std::runtime_error(
1247  "Array Marker option must be exactly two characters. Default is {}.");
1248  }
1249  copy_params.array_begin = (*str_literal->get_stringval())[0];
1250  copy_params.array_end = (*str_literal->get_stringval())[1];
1251  } else if (boost::iequals(*p->get_name(), "array_delimiter")) {
1252  const StringLiteral* str_literal =
1253  dynamic_cast<const StringLiteral*>(p->get_value());
1254  if (str_literal == nullptr) {
1255  throw std::runtime_error("Array Delimiter option must be a string.");
1256  } else if (str_literal->get_stringval()->length() != 1) {
1257  throw std::runtime_error("Array Delimiter must be a single character string.");
1258  }
1259  copy_params.array_delim = (*str_literal->get_stringval())[0];
1260  } else if (boost::iequals(*p->get_name(), "lonlat")) {
1261  const StringLiteral* str_literal =
1262  dynamic_cast<const StringLiteral*>(p->get_value());
1263  if (str_literal == nullptr) {
1264  throw std::runtime_error("Lonlat option must be a boolean.");
1265  }
1266  copy_params.lonlat = bool_from_string_literal(str_literal);
1267  } else if (boost::iequals(*p->get_name(), "geo")) {
1268  warnings.push_back(
1269  "Deprecation Warning: COPY FROM WITH (geo='true') is deprecated. Use WITH "
1270  "(source_type='geo_file') instead.");
1271  const StringLiteral* str_literal =
1272  dynamic_cast<const StringLiteral*>(p->get_value());
1273  if (str_literal == nullptr) {
1274  throw std::runtime_error("'geo' option must be a boolean.");
1275  }
1276  if (bool_from_string_literal(str_literal)) {
1278  }
1279  } else if (boost::iequals(*p->get_name(), "source_type")) {
1280  const StringLiteral* str_literal =
1281  dynamic_cast<const StringLiteral*>(p->get_value());
1282  if (str_literal == nullptr) {
1283  throw std::runtime_error("'source_type' option must be a string.");
1284  }
1285  const std::string* s = str_literal->get_stringval();
1286  if (boost::iequals(*s, "delimited_file")) {
1288  } else if (boost::iequals(*s, "geo_file")) {
1290 #if ENABLE_IMPORT_PARQUET
1291  } else if (boost::iequals(*s, "parquet_file")) {
1293 #endif
1294  } else if (boost::iequals(*s, "raster_file")) {
1296  } else if (boost::iequals(*s, "regex_parsed_file")) {
1298  } else {
1299  throw std::runtime_error(
1300  "Invalid string for 'source_type' option (must be 'GEO_FILE', 'RASTER_FILE'"
1301 #if ENABLE_IMPORT_PARQUET
1302  ", 'PARQUET_FILE'"
1303 #endif
1304  ", 'REGEX_PARSED_FILE'"
1305  " or 'DELIMITED_FILE'): " +
1306  *s);
1307  }
1308  } else if (boost::iequals(*p->get_name(), "geo_coords_type")) {
1309  const StringLiteral* str_literal =
1310  dynamic_cast<const StringLiteral*>(p->get_value());
1311  if (str_literal == nullptr) {
1312  throw std::runtime_error("'geo_coords_type' option must be a string");
1313  }
1314  const std::string* s = str_literal->get_stringval();
1315  if (boost::iequals(*s, "geography")) {
1316  throw std::runtime_error(
1317  "GEOGRAPHY coords type not yet supported. Please use GEOMETRY.");
1318  // copy_params.geo_coords_type = kGEOGRAPHY;
1319  } else if (boost::iequals(*s, "geometry")) {
1320  copy_params.geo_coords_type = kGEOMETRY;
1321  } else {
1322  throw std::runtime_error(
1323  "Invalid string for 'geo_coords_type' option (must be 'GEOGRAPHY' or "
1324  "'GEOMETRY'): " +
1325  *s);
1326  }
1327  } else if (boost::iequals(*p->get_name(), "raster_point_type")) {
1328  const StringLiteral* str_literal =
1329  dynamic_cast<const StringLiteral*>(p->get_value());
1330  if (str_literal == nullptr) {
1331  throw std::runtime_error("'raster_point_type' option must be a string");
1332  }
1333  const std::string* s = str_literal->get_stringval();
1334  if (boost::iequals(*s, "none")) {
1336  } else if (boost::iequals(*s, "auto")) {
1338  } else if (boost::iequals(*s, "smallint")) {
1340  } else if (boost::iequals(*s, "int")) {
1342  } else if (boost::iequals(*s, "float")) {
1344  } else if (boost::iequals(*s, "double")) {
1346  } else if (boost::iequals(*s, "point")) {
1348  } else {
1349  throw std::runtime_error(
1350  "Invalid string for 'raster_point_type' option (must be 'NONE', 'AUTO', "
1351  "'SMALLINT', 'INT', 'FLOAT', 'DOUBLE' or 'POINT'): " +
1352  *s);
1353  }
1354  } else if (boost::iequals(*p->get_name(), "raster_point_transform")) {
1355  const StringLiteral* str_literal =
1356  dynamic_cast<const StringLiteral*>(p->get_value());
1357  if (str_literal == nullptr) {
1358  throw std::runtime_error("'raster_point_transform' option must be a string");
1359  }
1360  const std::string* s = str_literal->get_stringval();
1361  if (boost::iequals(*s, "none")) {
1363  } else if (boost::iequals(*s, "auto")) {
1365  } else if (boost::iequals(*s, "file")) {
1367  } else if (boost::iequals(*s, "world")) {
1368  copy_params.raster_point_transform =
1370  } else {
1371  throw std::runtime_error(
1372  "Invalid string for 'raster_point_transform' option (must be 'NONE', "
1373  "'AUTO', 'FILE' or 'WORLD'): " +
1374  *s);
1375  }
1376  } else if (boost::iequals(*p->get_name(), "raster_import_bands")) {
1377  const StringLiteral* str_literal =
1378  dynamic_cast<const StringLiteral*>(p->get_value());
1379  if (str_literal == nullptr) {
1380  throw std::runtime_error("'raster_import_bands' option must be a string");
1381  }
1382  const std::string* raster_import_bands = str_literal->get_stringval();
1383  if (raster_import_bands) {
1384  copy_params.raster_import_bands = *raster_import_bands;
1385  } else {
1386  throw std::runtime_error("Invalid value for 'raster_import_bands' option");
1387  }
1388  } else if (boost::iequals(*p->get_name(), "raster_import_dimensions")) {
1389  const StringLiteral* str_literal =
1390  dynamic_cast<const StringLiteral*>(p->get_value());
1391  if (str_literal == nullptr) {
1392  throw std::runtime_error("'raster_import_dimensions' option must be a string");
1393  }
1394  const std::string* raster_import_dimensions = str_literal->get_stringval();
1395  if (raster_import_dimensions) {
1396  copy_params.raster_import_dimensions = *raster_import_dimensions;
1397  } else {
1398  throw std::runtime_error("Invalid value for 'raster_import_dimensions' option");
1399  }
1400  } else if (boost::iequals(*p->get_name(), "geo_coords_encoding")) {
1401  const StringLiteral* str_literal =
1402  dynamic_cast<const StringLiteral*>(p->get_value());
1403  if (str_literal == nullptr) {
1404  throw std::runtime_error("'geo_coords_encoding' option must be a string");
1405  }
1406  const std::string* s = str_literal->get_stringval();
1407  if (boost::iequals(*s, "none")) {
1408  copy_params.geo_coords_encoding = kENCODING_NONE;
1409  copy_params.geo_coords_comp_param = 0;
1410  } else if (boost::iequals(*s, "compressed(32)")) {
1411  copy_params.geo_coords_encoding = kENCODING_GEOINT;
1412  copy_params.geo_coords_comp_param = 32;
1413  } else {
1414  throw std::runtime_error(
1415  "Invalid string for 'geo_coords_encoding' option (must be 'NONE' or "
1416  "'COMPRESSED(32)'): " +
1417  *s);
1418  }
1419  } else if (boost::iequals(*p->get_name(), "raster_scanlines_per_thread")) {
1420  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1421  if (int_literal == nullptr) {
1422  throw std::runtime_error(
1423  "'raster_scanlines_per_thread' option must be an integer");
1424  }
1425  const int raster_scanlines_per_thread = int_literal->get_intval();
1426  if (raster_scanlines_per_thread < 0) {
1427  throw std::runtime_error(
1428  "'raster_scanlines_per_thread' option must be >= 0, with 0 denoting auto "
1429  "sizing");
1430  }
1431  copy_params.raster_scanlines_per_thread = raster_scanlines_per_thread;
1432  } else if (boost::iequals(*p->get_name(), "geo_coords_srid")) {
1433  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1434  if (int_literal == nullptr) {
1435  throw std::runtime_error("'geo_coords_srid' option must be an integer");
1436  }
1437  const int srid = int_literal->get_intval();
1438  if (srid == 4326 || srid == 3857 || srid == 900913) {
1439  copy_params.geo_coords_srid = srid;
1440  } else {
1441  throw std::runtime_error(
1442  "Invalid value for 'geo_coords_srid' option (must be 4326, 3857, or "
1443  "900913): " +
1444  std::to_string(srid));
1445  }
1446  } else if (boost::iequals(*p->get_name(), "geo_layer_name")) {
1447  const StringLiteral* str_literal =
1448  dynamic_cast<const StringLiteral*>(p->get_value());
1449  if (str_literal == nullptr) {
1450  throw std::runtime_error("'geo_layer_name' option must be a string");
1451  }
1452  const std::string* layer_name = str_literal->get_stringval();
1453  if (layer_name) {
1454  copy_params.geo_layer_name = *layer_name;
1455  } else {
1456  throw std::runtime_error("Invalid value for 'geo_layer_name' option");
1457  }
1458  } else if (boost::iequals(*p->get_name(), "partitions")) {
1459  const auto partitions =
1460  static_cast<const StringLiteral*>(p->get_value())->get_stringval();
1461  CHECK(partitions);
1462  const auto partitions_uc = boost::to_upper_copy<std::string>(*partitions);
1463  if (partitions_uc != "REPLICATED") {
1464  throw std::runtime_error(
1465  "Invalid value for 'partitions' option. Must be 'REPLICATED'.");
1466  }
1467  deferred_copy_from_partitions_ = partitions_uc;
1468  } else if (boost::iequals(*p->get_name(), "geo_assign_render_groups")) {
1469  const StringLiteral* str_literal =
1470  dynamic_cast<const StringLiteral*>(p->get_value());
1471  if (str_literal == nullptr) {
1472  throw std::runtime_error("geo_assign_render_groups option must be a boolean.");
1473  }
1474  copy_params.geo_assign_render_groups = bool_from_string_literal(str_literal);
1475  } else if (boost::iequals(*p->get_name(), "geo_explode_collections")) {
1476  const StringLiteral* str_literal =
1477  dynamic_cast<const StringLiteral*>(p->get_value());
1478  if (str_literal == nullptr) {
1479  throw std::runtime_error("geo_explode_collections option must be a boolean.");
1480  }
1481  copy_params.geo_explode_collections = bool_from_string_literal(str_literal);
1482  } else if (boost::iequals(*p->get_name(), "source_srid")) {
1483  const IntLiteral* int_literal = dynamic_cast<const IntLiteral*>(p->get_value());
1484  if (int_literal == nullptr) {
1485  throw std::runtime_error("'source_srid' option must be an integer");
1486  }
1487  const int srid = int_literal->get_intval();
1489  copy_params.source_srid = srid;
1490  } else {
1491  throw std::runtime_error(
1492  "'source_srid' option can only be used on csv/tsv files");
1493  }
1494  } else if (boost::iequals(*p->get_name(), "regex_path_filter")) {
1495  const StringLiteral* str_literal =
1496  dynamic_cast<const StringLiteral*>(p->get_value());
1497  if (str_literal == nullptr) {
1498  throw std::runtime_error("Option regex_path_filter must be a string.");
1499  }
1500  const auto string_val = *str_literal->get_stringval();
1501  copy_params.regex_path_filter =
1502  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1503  } else if (boost::iequals(*p->get_name(), "file_sort_order_by")) {
1504  const StringLiteral* str_literal =
1505  dynamic_cast<const StringLiteral*>(p->get_value());
1506  if (str_literal == nullptr) {
1507  throw std::runtime_error("Option file_sort_order_by must be a string.");
1508  }
1509  const auto string_val = *str_literal->get_stringval();
1510  copy_params.file_sort_order_by =
1511  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1512  } else if (boost::iequals(*p->get_name(), "file_sort_regex")) {
1513  const StringLiteral* str_literal =
1514  dynamic_cast<const StringLiteral*>(p->get_value());
1515  if (str_literal == nullptr) {
1516  throw std::runtime_error("Option file_sort_regex must be a string.");
1517  }
1518  const auto string_val = *str_literal->get_stringval();
1519  copy_params.file_sort_regex =
1520  string_val.empty() ? std::nullopt : std::optional<std::string>{string_val};
1521  } else if (boost::iequals(*p->get_name(), "raster_point_compute_angle")) {
1522  const StringLiteral* str_literal =
1523  dynamic_cast<const StringLiteral*>(p->get_value());
1524  if (str_literal == nullptr) {
1525  throw std::runtime_error(
1526  "'raster_point_compute_angle' option must be a boolean.");
1527  }
1528  if (bool_from_string_literal(str_literal)) {
1529  copy_params.raster_point_compute_angle = true;
1530  }
1531  } else if (boost::iequals(*p->get_name(), "sql_order_by")) {
1532  if (auto str_literal = dynamic_cast<const StringLiteral*>(p->get_value())) {
1533  copy_params.sql_order_by = *str_literal->get_stringval();
1534  } else {
1535  throw std::runtime_error("Option sql_order_by must be a string.");
1536  }
1537  } else if (boost::iequals(*p->get_name(), "username")) {
1538  const StringLiteral* str_literal =
1539  dynamic_cast<const StringLiteral*>(p->get_value());
1540  if (str_literal == nullptr) {
1541  throw std::runtime_error("Option username must be a string.");
1542  }
1543  const auto string_val = *str_literal->get_stringval();
1544  copy_params.username = string_val;
1545  } else if (boost::iequals(*p->get_name(), "password")) {
1546  const StringLiteral* str_literal =
1547  dynamic_cast<const StringLiteral*>(p->get_value());
1548  if (str_literal == nullptr) {
1549  throw std::runtime_error("Option password must be a string.");
1550  }
1551  const auto string_val = *str_literal->get_stringval();
1552  copy_params.password = string_val;
1553  } else if (boost::iequals(*p->get_name(), "credential_string")) {
1554  const StringLiteral* str_literal =
1555  dynamic_cast<const StringLiteral*>(p->get_value());
1556  if (str_literal == nullptr) {
1557  throw std::runtime_error("Option credential_string must be a string.");
1558  }
1559  const auto string_val = *str_literal->get_stringval();
1560  copy_params.credential_string = string_val;
1561  } else if (boost::iequals(*p->get_name(), "dsn")) {
1562  const StringLiteral* str_literal =
1563  dynamic_cast<const StringLiteral*>(p->get_value());
1564  if (str_literal == nullptr) {
1565  throw std::runtime_error("Option dsn must be a string.");
1566  }
1567  const auto string_val = *str_literal->get_stringval();
1568  copy_params.dsn = string_val;
1569  } else if (boost::iequals(*p->get_name(), "connection_string")) {
1570  const StringLiteral* str_literal =
1571  dynamic_cast<const StringLiteral*>(p->get_value());
1572  if (str_literal == nullptr) {
1573  throw std::runtime_error("Option connection_string must be a string.");
1574  }
1575  const auto string_val = *str_literal->get_stringval();
1576  copy_params.connection_string = string_val;
1577  } else if (boost::iequals(*p->get_name(), "line_start_regex")) {
1578  const StringLiteral* str_literal =
1579  dynamic_cast<const StringLiteral*>(p->get_value());
1580  if (str_literal == nullptr) {
1581  throw std::runtime_error("Option line_start_regex must be a string.");
1582  }
1583  const auto string_val = *str_literal->get_stringval();
1584  copy_params.line_start_regex = string_val;
1585  } else if (boost::iequals(*p->get_name(), "line_regex")) {
1586  const StringLiteral* str_literal =
1587  dynamic_cast<const StringLiteral*>(p->get_value());
1588  if (str_literal == nullptr) {
1589  throw std::runtime_error("Option line_regex must be a string.");
1590  }
1591  const auto string_val = *str_literal->get_stringval();
1592  copy_params.line_regex = string_val;
1593  } else if (boost::iequals(*p->get_name(), "add_metadata_columns") &&
1595  const StringLiteral* str_literal =
1596  dynamic_cast<const StringLiteral*>(p->get_value());
1597  if (str_literal == nullptr) {
1598  throw std::runtime_error("'add_metadata_columns' option must be a string.");
1599  }
1600  copy_params.add_metadata_columns = *str_literal->get_stringval();
1601  } else {
1602  throw std::runtime_error("Invalid option for COPY: " + *p->get_name());
1603  }
1604  }
1605  }
1606 }
1607 
1608 bool expr_is_null(const Analyzer::Expr* expr) {
1609  if (expr->get_type_info().get_type() == kNULLT) {
1610  return true;
1611  }
1612  const auto const_expr = dynamic_cast<const Analyzer::Constant*>(expr);
1613  return const_expr && const_expr->get_is_null();
1614 }
1615 
1616 } // namespace
1617 
1618 std::shared_ptr<Analyzer::Expr> CaseExpr::normalize(
1619  const std::list<std::pair<std::shared_ptr<Analyzer::Expr>,
1620  std::shared_ptr<Analyzer::Expr>>>& expr_pair_list,
1621  const std::shared_ptr<Analyzer::Expr> else_e_in,
1622  const Executor* executor) {
1623  SQLTypeInfo ti;
1624  bool has_agg = false;
1625  // We need to keep track of whether there was at
1626  // least one none-encoded string literal expression
1627  // type among any of the case sub-expressions separately
1628  // from rest of type determination logic, as it will
1629  // be casted to the output dictionary type if all output
1630  // types are either dictionary encoded or none-encoded
1631  // literals, or kept as none-encoded if all sub-expression
1632  // types are none-encoded (column or literal)
1633  SQLTypeInfo none_encoded_literal_ti;
1634 
1635  for (auto& p : expr_pair_list) {
1636  auto e1 = p.first;
1637  CHECK(e1->get_type_info().is_boolean());
1638  auto e2 = p.second;
1639  if (e2->get_contains_agg()) {
1640  has_agg = true;
1641  }
1642  const auto& e2_ti = e2->get_type_info();
1643  if (e2_ti.is_string() && !e2_ti.is_dict_encoded_string() &&
1644  !std::dynamic_pointer_cast<const Analyzer::ColumnVar>(e2)) {
1645  CHECK(e2_ti.is_none_encoded_string());
1646  none_encoded_literal_ti =
1647  none_encoded_literal_ti.get_type() == kNULLT
1648  ? e2_ti
1649  : common_string_type(none_encoded_literal_ti, e2_ti, executor);
1650  continue;
1651  }
1652  if (ti.get_type() == kNULLT) {
1653  ti = e2_ti;
1654  } else if (e2_ti.get_type() == kNULLT) {
1655  ti.set_notnull(false);
1656  e2->set_type_info(ti);
1657  } else if (ti != e2_ti) {
1658  if (ti.is_string() && e2_ti.is_string()) {
1659  // Executor is needed to determine which dictionary is the largest
1660  // in case of two dictionary types with different encodings
1661  ti = common_string_type(ti, e2_ti, executor);
1662  } else if (ti.is_number() && e2_ti.is_number()) {
1664  } else if (ti.is_boolean() && e2_ti.is_boolean()) {
1666  } else {
1667  throw std::runtime_error(
1668  "Expressions in THEN clause must be of the same or compatible types.");
1669  }
1670  }
1671  }
1672  auto else_e = else_e_in;
1673  const auto& else_ti = else_e->get_type_info();
1674  if (else_e) {
1675  if (else_e->get_contains_agg()) {
1676  has_agg = true;
1677  }
1678  if (else_ti.is_string() && !else_ti.is_dict_encoded_string() &&
1679  !std::dynamic_pointer_cast<const Analyzer::ColumnVar>(else_e)) {
1680  CHECK(else_ti.is_none_encoded_string());
1681  none_encoded_literal_ti =
1682  none_encoded_literal_ti.get_type() == kNULLT
1683  ? else_ti
1684  : common_string_type(none_encoded_literal_ti, else_ti, executor);
1685  } else {
1686  if (ti.get_type() == kNULLT) {
1687  ti = else_ti;
1688  } else if (expr_is_null(else_e.get())) {
1689  ti.set_notnull(false);
1690  else_e->set_type_info(ti);
1691  } else if (ti != else_ti) {
1692  ti.set_notnull(false);
1693  if (ti.is_string() && else_ti.is_string()) {
1694  // Executor is needed to determine which dictionary is the largest
1695  // in case of two dictionary types with different encodings
1696  ti = common_string_type(ti, else_ti, executor);
1697  } else if (ti.is_number() && else_ti.is_number()) {
1698  ti = Analyzer::BinOper::common_numeric_type(ti, else_ti);
1699  } else if (ti.is_boolean() && else_ti.is_boolean()) {
1700  ti = Analyzer::BinOper::common_numeric_type(ti, else_ti);
1701  } else if (get_logical_type_info(ti) != get_logical_type_info(else_ti)) {
1702  throw std::runtime_error(
1703  // types differing by encoding will be resolved at decode
1704  "Expressions in ELSE clause must be of the same or compatible types as "
1705  "those in the THEN clauses.");
1706  }
1707  }
1708  }
1709  }
1710 
1711  if (ti.get_type() == kNULLT && none_encoded_literal_ti.get_type() != kNULLT) {
1712  // If we haven't set a type so far it's because
1713  // every case sub-expression has a none-encoded
1714  // literal output. Make this our output type
1715  ti = none_encoded_literal_ti;
1716  }
1717 
1718  std::list<std::pair<std::shared_ptr<Analyzer::Expr>, std::shared_ptr<Analyzer::Expr>>>
1719  cast_expr_pair_list;
1720  for (auto p : expr_pair_list) {
1721  ti.set_notnull(false);
1722  cast_expr_pair_list.emplace_back(p.first, p.second->add_cast(ti));
1723  }
1724  if (else_e != nullptr) {
1725  else_e = else_e->add_cast(ti);
1726  } else {
1727  Datum d;
1728  // always create an else expr so that executor doesn't need to worry about it
1729  ti.set_notnull(false);
1730  else_e = makeExpr<Analyzer::Constant>(ti, true, d);
1731  }
1732  if (ti.get_type() == kNULLT) {
1733  throw std::runtime_error(
1734  "Cannot deduce the type for case expressions, all branches null");
1735  }
1736 
1737  auto case_expr = makeExpr<Analyzer::CaseExpr>(ti, has_agg, cast_expr_pair_list, else_e);
1738  return case_expr;
1739 }
1740 
1741 std::string CaseExpr::to_string() const {
1742  std::string str("CASE ");
1743  for (auto& p : when_then_list_) {
1744  str += "WHEN " + p->get_expr1()->to_string() + " THEN " +
1745  p->get_expr2()->to_string() + " ";
1746  }
1747  if (else_expr_ != nullptr) {
1748  str += "ELSE " + else_expr_->to_string();
1749  }
1750  str += " END";
1751  return str;
1752 }
1753 
1754 void UnionQuery::analyze(const Catalog_Namespace::Catalog& catalog,
1755  Analyzer::Query& query) const {
1756  left_->analyze(catalog, query);
1757  Analyzer::Query* right_query = new Analyzer::Query();
1758  right_->analyze(catalog, *right_query);
1759  query.set_next_query(right_query);
1760  query.set_is_unionall(is_unionall_);
1761 }
1762 
1763 void QuerySpec::analyze_having_clause(const Catalog_Namespace::Catalog& catalog,
1764  Analyzer::Query& query) const {
1765  std::shared_ptr<Analyzer::Expr> p;
1766  if (having_clause_ != nullptr) {
1767  p = having_clause_->analyze(catalog, query, Expr::TlistRefType::TLIST_COPY);
1768  if (p->get_type_info().get_type() != kBOOLEAN) {
1769  throw std::runtime_error("Only boolean expressions can be in HAVING clause.");
1770  }
1771  p->check_group_by(query.get_group_by());
1772  }
1773  query.set_having_predicate(p);
1774 }
1775 
1776 void QuerySpec::analyze_group_by(const Catalog_Namespace::Catalog& catalog,
1777  Analyzer::Query& query) const {
1778  std::list<std::shared_ptr<Analyzer::Expr>> groupby;
1779  if (!groupby_clause_.empty()) {
1780  int gexpr_no = 1;
1781  std::shared_ptr<Analyzer::Expr> gexpr;
1782  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
1783  query.get_targetlist();
1784  for (auto& c : groupby_clause_) {
1785  // special-case ordinal numbers in GROUP BY
1786  if (dynamic_cast<Literal*>(c.get())) {
1787  IntLiteral* i = dynamic_cast<IntLiteral*>(c.get());
1788  if (!i) {
1789  throw std::runtime_error("Invalid literal in GROUP BY clause.");
1790  }
1791  int varno = (int)i->get_intval();
1792  if (varno <= 0 || varno > static_cast<int>(tlist.size())) {
1793  throw std::runtime_error("Invalid ordinal number in GROUP BY clause.");
1794  }
1795  if (tlist[varno - 1]->get_expr()->get_contains_agg()) {
1796  throw std::runtime_error(
1797  "Ordinal number in GROUP BY cannot reference an expression containing "
1798  "aggregate "
1799  "functions.");
1800  }
1801  gexpr = makeExpr<Analyzer::Var>(
1802  tlist[varno - 1]->get_expr()->get_type_info(), Analyzer::Var::kOUTPUT, varno);
1803  } else {
1804  gexpr = c->analyze(catalog, query, Expr::TlistRefType::TLIST_REF);
1805  }
1806  const SQLTypeInfo gti = gexpr->get_type_info();
1807  bool set_new_type = false;
1808  SQLTypeInfo ti(gti);
1809  if (gti.is_string() && gti.get_compression() == kENCODING_NONE) {
1810  set_new_type = true;
1813  ti.set_fixed_size();
1814  }
1815  std::shared_ptr<Analyzer::Var> v;
1816  if (std::dynamic_pointer_cast<Analyzer::Var>(gexpr)) {
1817  v = std::static_pointer_cast<Analyzer::Var>(gexpr);
1818  int n = v->get_varno();
1819  gexpr = tlist[n - 1]->get_own_expr();
1820  auto cv = std::dynamic_pointer_cast<Analyzer::ColumnVar>(gexpr);
1821  if (cv != nullptr) {
1822  // inherit all ColumnVar info for lineage.
1823  *std::static_pointer_cast<Analyzer::ColumnVar>(v) = *cv;
1824  }
1825  v->set_which_row(Analyzer::Var::kGROUPBY);
1826  v->set_varno(gexpr_no);
1827  tlist[n - 1]->set_expr(v);
1828  }
1829  if (set_new_type) {
1830  auto new_e = gexpr->add_cast(ti);
1831  groupby.push_back(new_e);
1832  if (v != nullptr) {
1833  v->set_type_info(new_e->get_type_info());
1834  }
1835  } else {
1836  groupby.push_back(gexpr);
1837  }
1838  gexpr_no++;
1839  }
1840  }
1841  if (query.get_num_aggs() > 0 || !groupby.empty()) {
1842  for (auto t : query.get_targetlist()) {
1843  auto e = t->get_expr();
1844  e->check_group_by(groupby);
1845  }
1846  }
1847  query.set_group_by(groupby);
1848 }
1849 
1850 void QuerySpec::analyze_where_clause(const Catalog_Namespace::Catalog& catalog,
1851  Analyzer::Query& query) const {
1852  if (where_clause_ == nullptr) {
1853  query.set_where_predicate(nullptr);
1854  return;
1855  }
1856  auto p = where_clause_->analyze(catalog, query, Expr::TlistRefType::TLIST_COPY);
1857  if (p->get_type_info().get_type() != kBOOLEAN) {
1858  throw std::runtime_error("Only boolean expressions can be in WHERE clause.");
1859  }
1860  query.set_where_predicate(p);
1861 }
1862 
1863 void QuerySpec::analyze_select_clause(const Catalog_Namespace::Catalog& catalog,
1864  Analyzer::Query& query) const {
1865  std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
1866  query.get_targetlist_nonconst();
1867  if (select_clause_.empty()) {
1868  // this means SELECT *
1869  int rte_idx = 0;
1870  for (auto rte : query.get_rangetable()) {
1871  rte->expand_star_in_targetlist(catalog, tlist, rte_idx++);
1872  }
1873  } else {
1874  for (auto& p : select_clause_) {
1875  const Parser::Expr* select_expr = p->get_select_expr();
1876  // look for the case of range_var.*
1877  if (typeid(*select_expr) == typeid(ColumnRef) &&
1878  dynamic_cast<const ColumnRef*>(select_expr)->get_column() == nullptr) {
1879  const std::string* range_var_name =
1880  dynamic_cast<const ColumnRef*>(select_expr)->get_table();
1881  int rte_idx = query.get_rte_idx(*range_var_name);
1882  if (rte_idx < 0) {
1883  throw std::runtime_error("invalid range variable name: " + *range_var_name);
1884  }
1885  Analyzer::RangeTableEntry* rte = query.get_rte(rte_idx);
1886  rte->expand_star_in_targetlist(catalog, tlist, rte_idx);
1887  } else {
1888  auto e = select_expr->analyze(catalog, query);
1889  std::string resname;
1890 
1891  if (p->get_alias() != nullptr) {
1892  resname = *p->get_alias();
1893  } else if (std::dynamic_pointer_cast<Analyzer::ColumnVar>(e) &&
1894  !std::dynamic_pointer_cast<Analyzer::Var>(e)) {
1895  auto colvar = std::static_pointer_cast<Analyzer::ColumnVar>(e);
1896  const ColumnDescriptor* col_desc = catalog.getMetadataForColumn(
1897  colvar->get_table_id(), colvar->get_column_id());
1898  resname = col_desc->columnName;
1899  }
1900  if (e->get_type_info().get_type() == kNULLT) {
1901  throw std::runtime_error(
1902  "Untyped NULL in SELECT clause. Use CAST to specify a type.");
1903  }
1904  auto o = std::static_pointer_cast<Analyzer::UOper>(e);
1905  bool unnest = (o != nullptr && o->get_optype() == kUNNEST);
1906  auto tle = std::make_shared<Analyzer::TargetEntry>(resname, e, unnest);
1907  tlist.push_back(tle);
1908  }
1909  }
1910  }
1911 }
1912 
1913 void QuerySpec::analyze_from_clause(const Catalog_Namespace::Catalog& catalog,
1914  Analyzer::Query& query) const {
1916  for (auto& p : from_clause_) {
1917  const TableDescriptor* table_desc;
1918  table_desc = catalog.getMetadataForTable(*p->get_table_name());
1919  if (table_desc == nullptr) {
1920  throw std::runtime_error("Table " + *p->get_table_name() + " does not exist.");
1921  }
1922  std::string range_var;
1923  if (p->get_range_var() == nullptr) {
1924  range_var = *p->get_table_name();
1925  } else {
1926  range_var = *p->get_range_var();
1927  }
1928  rte = new Analyzer::RangeTableEntry(range_var, table_desc, nullptr);
1929  query.add_rte(rte);
1930  }
1931 }
1932 
1933 void QuerySpec::analyze(const Catalog_Namespace::Catalog& catalog,
1934  Analyzer::Query& query) const {
1935  query.set_is_distinct(is_distinct_);
1936  analyze_from_clause(catalog, query);
1937  analyze_select_clause(catalog, query);
1938  analyze_where_clause(catalog, query);
1939  analyze_group_by(catalog, query);
1940  analyze_having_clause(catalog, query);
1941 }
1942 
1943 namespace {
1944 
1945 // clean known escape'd chars without having to do a full json parse
1946 std::string unescape(std::string s) {
1947  boost::replace_all(s, "\\\\t", "\t");
1948  boost::replace_all(s, "\\t", "\t");
1949  boost::replace_all(s, "\\\\n", "\n");
1950  boost::replace_all(s, "\\n", "\n");
1951 
1952  // handle numerics
1953  std::smatch m;
1954 
1955  // "\x00"
1956  std::regex e1("(\\\\x[0-9A-Fa-f][0-9A-Fa-f])");
1957  while (std::regex_search(s, m, e1)) {
1958  std::string original(m[0].first, m[0].second);
1959  std::string replacement;
1960  long val = strtol(original.substr(2, 2).c_str(), NULL, 16);
1961  replacement.push_back(val);
1962  boost::replace_all(s, original, replacement);
1963  }
1964 
1965  // "\u0000"
1966  std::regex e2("(\\\\u[0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f])");
1967  while (std::regex_search(s, m, e2)) {
1968  std::string original(m[0].first, m[0].second);
1969  std::string replacement;
1970  long val = strtol(original.substr(2, 4).c_str(), NULL, 16);
1971  replacement.push_back(val);
1972  boost::replace_all(s, original, replacement);
1973  }
1974 
1975  return s;
1976 }
1977 
1978 void parse_options(const rapidjson::Value& payload,
1979  std::list<std::unique_ptr<NameValueAssign>>& nameValueList,
1980  bool stringToNull = false,
1981  bool stringToInteger = false) {
1982  if (payload.HasMember("options") && payload["options"].IsObject()) {
1983  for (const auto& option : payload["options"].GetObject()) {
1984  auto option_name = std::make_unique<std::string>(option.name.GetString());
1985  std::unique_ptr<Literal> literal_value;
1986  if (option.value.IsString()) {
1987  std::string str = option.value.GetString();
1988  if (stringToNull && str == "") {
1989  literal_value = std::make_unique<NullLiteral>();
1990  } else if (stringToInteger && std::all_of(str.begin(), str.end(), ::isdigit)) {
1991  int iVal = std::stoi(str);
1992  literal_value = std::make_unique<IntLiteral>(iVal);
1993  } else {
1994  // Rapidjson will deliberately provide escape'd strings when accessed
1995  // ... but the literal should have a copy of the raw unescaped string
1996  auto unique_literal_string = std::make_unique<std::string>(unescape(str));
1997  literal_value =
1998  std::make_unique<StringLiteral>(unique_literal_string.release());
1999  }
2000  } else if (option.value.IsInt() || option.value.IsInt64()) {
2001  literal_value = std::make_unique<IntLiteral>(json_i64(option.value));
2002  } else if (option.value.IsNull()) {
2003  literal_value = std::make_unique<NullLiteral>();
2004  } else {
2005  throw std::runtime_error("Unable to handle literal for " + *option_name);
2006  }
2007  CHECK(literal_value);
2008 
2009  nameValueList.emplace_back(std::make_unique<NameValueAssign>(
2010  option_name.release(), literal_value.release()));
2011  }
2012  }
2013 }
2014 } // namespace
2015 
2016 void SelectStmt::analyze(const Catalog_Namespace::Catalog& catalog,
2017  Analyzer::Query& query) const {
2018  query.set_stmt_type(kSELECT);
2019  query.set_limit(limit_);
2020  if (offset_ < 0) {
2021  throw std::runtime_error("OFFSET cannot be negative.");
2022  }
2023  query.set_offset(offset_);
2024  query_expr_->analyze(catalog, query);
2025  if (orderby_clause_.empty() && !query.get_is_distinct()) {
2026  query.set_order_by(nullptr);
2027  return;
2028  }
2029  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& tlist =
2030  query.get_targetlist();
2031  std::list<Analyzer::OrderEntry>* order_by = new std::list<Analyzer::OrderEntry>();
2032  if (!orderby_clause_.empty()) {
2033  for (auto& p : orderby_clause_) {
2034  int tle_no = p->get_colno();
2035  if (tle_no == 0) {
2036  // use column name
2037  // search through targetlist for matching name
2038  const std::string* name = p->get_column()->get_column();
2039  tle_no = 1;
2040  bool found = false;
2041  for (auto tle : tlist) {
2042  if (tle->get_resname() == *name) {
2043  found = true;
2044  break;
2045  }
2046  tle_no++;
2047  }
2048  if (!found) {
2049  throw std::runtime_error("invalid name in order by: " + *name);
2050  }
2051  }
2052  order_by->push_back(
2053  Analyzer::OrderEntry(tle_no, p->get_is_desc(), p->get_nulls_first()));
2054  }
2055  }
2056  if (query.get_is_distinct()) {
2057  // extend order_by to include all targetlist entries.
2058  for (int i = 1; i <= static_cast<int>(tlist.size()); i++) {
2059  bool in_orderby = false;
2060  std::for_each(order_by->begin(),
2061  order_by->end(),
2062  [&in_orderby, i](const Analyzer::OrderEntry& oe) {
2063  in_orderby = in_orderby || (i == oe.tle_no);
2064  });
2065  if (!in_orderby) {
2066  order_by->push_back(Analyzer::OrderEntry(i, false, false));
2067  }
2068  }
2069  }
2070  query.set_order_by(order_by);
2071 }
2072 
2073 std::string SelectEntry::to_string() const {
2074  std::string str = select_expr_->to_string();
2075  if (alias_ != nullptr) {
2076  str += " AS " + *alias_;
2077  }
2078  return str;
2079 }
2080 
2081 std::string TableRef::to_string() const {
2082  std::string str = *table_name_;
2083  if (range_var_ != nullptr) {
2084  str += " " + *range_var_;
2085  }
2086  return str;
2087 }
2088 
2089 std::string ColumnRef::to_string() const {
2090  std::string str;
2091  if (table_ == nullptr) {
2092  str = *column_;
2093  } else if (column_ == nullptr) {
2094  str = *table_ + ".*";
2095  } else {
2096  str = *table_ + "." + *column_;
2097  }
2098  return str;
2099 }
2100 
2101 std::string OperExpr::to_string() const {
2102  std::string op_str[] = {
2103  "=", "===", "<>", "<", ">", "<=", ">=", " AND ", " OR ", "NOT", "-", "+", "*", "/"};
2104  std::string str;
2105  if (optype_ == kUMINUS) {
2106  str = "-(" + left_->to_string() + ")";
2107  } else if (optype_ == kNOT) {
2108  str = "NOT (" + left_->to_string() + ")";
2109  } else if (optype_ == kARRAY_AT) {
2110  str = left_->to_string() + "[" + right_->to_string() + "]";
2111  } else if (optype_ == kUNNEST) {
2112  str = "UNNEST(" + left_->to_string() + ")";
2113  } else if (optype_ == kIN) {
2114  str = "(" + left_->to_string() + " IN " + right_->to_string() + ")";
2115  } else {
2116  str = "(" + left_->to_string() + op_str[optype_] + right_->to_string() + ")";
2117  }
2118  return str;
2119 }
2120 
2121 std::string InExpr::to_string() const {
2122  std::string str = arg_->to_string();
2123  if (is_not_) {
2124  str += " NOT IN ";
2125  } else {
2126  str += " IN ";
2127  }
2128  return str;
2129 }
2130 
2131 std::string ExistsExpr::to_string() const {
2132  return "EXISTS (" + query_->to_string() + ")";
2133 }
2134 
2135 std::string SubqueryExpr::to_string() const {
2136  std::string str;
2137  str = "(";
2138  str += query_->to_string();
2139  str += ")";
2140  return str;
2141 }
2142 
2143 std::string IsNullExpr::to_string() const {
2144  std::string str = arg_->to_string();
2145  if (is_not_) {
2146  str += " IS NOT NULL";
2147  } else {
2148  str += " IS NULL";
2149  }
2150  return str;
2151 }
2152 
2153 std::string InSubquery::to_string() const {
2154  std::string str = InExpr::to_string();
2155  str += subquery_->to_string();
2156  return str;
2157 }
2158 
2159 std::string InValues::to_string() const {
2160  std::string str = InExpr::to_string() + "(";
2161  bool notfirst = false;
2162  for (auto& p : value_list_) {
2163  if (notfirst) {
2164  str += ", ";
2165  } else {
2166  notfirst = true;
2167  }
2168  str += p->to_string();
2169  }
2170  str += ")";
2171  return str;
2172 }
2173 
2174 std::string BetweenExpr::to_string() const {
2175  std::string str = arg_->to_string();
2176  if (is_not_) {
2177  str += " NOT BETWEEN ";
2178  } else {
2179  str += " BETWEEN ";
2180  }
2181  str += lower_->to_string() + " AND " + upper_->to_string();
2182  return str;
2183 }
2184 
2185 std::string CharLengthExpr::to_string() const {
2186  std::string str;
2187  if (calc_encoded_length_) {
2188  str = "CHAR_LENGTH (" + arg_->to_string() + ")";
2189  } else {
2190  str = "LENGTH (" + arg_->to_string() + ")";
2191  }
2192  return str;
2193 }
2194 
2195 std::string CardinalityExpr::to_string() const {
2196  std::string str = "CARDINALITY(" + arg_->to_string() + ")";
2197  return str;
2198 }
2199 
2200 std::string LikeExpr::to_string() const {
2201  std::string str = arg_->to_string();
2202  if (is_not_) {
2203  str += " NOT LIKE ";
2204  } else {
2205  str += " LIKE ";
2206  }
2207  str += like_string_->to_string();
2208  if (escape_string_ != nullptr) {
2209  str += " ESCAPE " + escape_string_->to_string();
2210  }
2211  return str;
2212 }
2213 
2214 std::string RegexpExpr::to_string() const {
2215  std::string str = arg_->to_string();
2216  if (is_not_) {
2217  str += " NOT REGEXP ";
2218  } else {
2219  str += " REGEXP ";
2220  }
2221  str += pattern_string_->to_string();
2222  if (escape_string_ != nullptr) {
2223  str += " ESCAPE " + escape_string_->to_string();
2224  }
2225  return str;
2226 }
2227 
2228 std::string WidthBucketExpr::to_string() const {
2229  std::string str = " WIDTH_BUCKET ";
2230  str += target_value_->to_string();
2231  str += " ";
2232  str += lower_bound_->to_string();
2233  str += " ";
2234  str += upper_bound_->to_string();
2235  str += " ";
2236  str += partition_count_->to_string();
2237  str += " ";
2238  return str;
2239 }
2240 
2241 std::string LikelihoodExpr::to_string() const {
2242  std::string str = " LIKELIHOOD ";
2243  str += arg_->to_string();
2244  str += " ";
2245  str += boost::lexical_cast<std::string>(is_not_ ? 1.0 - likelihood_ : likelihood_);
2246  return str;
2247 }
2248 
2249 std::string FunctionRef::to_string() const {
2250  std::string str = *name_ + "(";
2251  if (distinct_) {
2252  str += "DISTINCT ";
2253  }
2254  if (arg_ == nullptr) {
2255  str += "*)";
2256  } else {
2257  str += arg_->to_string() + ")";
2258  }
2259  return str;
2260 }
2261 
2262 std::string QuerySpec::to_string() const {
2263  std::string query_str = "SELECT ";
2264  if (is_distinct_) {
2265  query_str += "DISTINCT ";
2266  }
2267  if (select_clause_.empty()) {
2268  query_str += "* ";
2269  } else {
2270  bool notfirst = false;
2271  for (auto& p : select_clause_) {
2272  if (notfirst) {
2273  query_str += ", ";
2274  } else {
2275  notfirst = true;
2276  }
2277  query_str += p->to_string();
2278  }
2279  }
2280  query_str += " FROM ";
2281  bool notfirst = false;
2282  for (auto& p : from_clause_) {
2283  if (notfirst) {
2284  query_str += ", ";
2285  } else {
2286  notfirst = true;
2287  }
2288  query_str += p->to_string();
2289  }
2290  if (where_clause_) {
2291  query_str += " WHERE " + where_clause_->to_string();
2292  }
2293  if (!groupby_clause_.empty()) {
2294  query_str += " GROUP BY ";
2295  bool notfirst = false;
2296  for (auto& p : groupby_clause_) {
2297  if (notfirst) {
2298  query_str += ", ";
2299  } else {
2300  notfirst = true;
2301  }
2302  query_str += p->to_string();
2303  }
2304  }
2305  if (having_clause_) {
2306  query_str += " HAVING " + having_clause_->to_string();
2307  }
2308  query_str += ";";
2309  return query_str;
2310 }
2311 
2312 void InsertStmt::analyze(const Catalog_Namespace::Catalog& catalog,
2313  Analyzer::Query& query) const {
2314  query.set_stmt_type(kINSERT);
2315  const TableDescriptor* td = catalog.getMetadataForTable(*table_);
2316  if (td == nullptr) {
2317  throw std::runtime_error("Table " + *table_ + " does not exist.");
2318  }
2319  if (td->isView) {
2320  throw std::runtime_error("Insert to views is not supported yet.");
2321  }
2323  query.set_result_table_id(td->tableId);
2324  std::list<int> result_col_list;
2325  if (column_list_.empty()) {
2326  const std::list<const ColumnDescriptor*> all_cols =
2327  catalog.getAllColumnMetadataForTable(td->tableId, false, false, true);
2328  for (auto cd : all_cols) {
2329  result_col_list.push_back(cd->columnId);
2330  }
2331  } else {
2332  for (auto& c : column_list_) {
2333  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
2334  if (cd == nullptr) {
2335  throw std::runtime_error("Column " + *c + " does not exist.");
2336  }
2337  result_col_list.push_back(cd->columnId);
2338  const auto& col_ti = cd->columnType;
2339  if (col_ti.get_physical_cols() > 0) {
2340  CHECK(cd->columnType.is_geometry());
2341  for (auto i = 1; i <= col_ti.get_physical_cols(); i++) {
2342  const ColumnDescriptor* pcd =
2343  catalog.getMetadataForColumn(td->tableId, cd->columnId + i);
2344  if (pcd == nullptr) {
2345  throw std::runtime_error("Column " + *c + "'s metadata is incomplete.");
2346  }
2347  result_col_list.push_back(pcd->columnId);
2348  }
2349  }
2350  }
2351  }
2352  query.set_result_col_list(result_col_list);
2353 }
2354 
2355 namespace {
2356 Literal* parse_insert_literal(const rapidjson::Value& literal) {
2357  CHECK(literal.IsObject());
2358  CHECK(literal.HasMember("literal"));
2359  CHECK(literal.HasMember("type"));
2360  auto type = json_str(literal["type"]);
2361  if (type == "NULL") {
2362  return new NullLiteral();
2363  } else if (type == "CHAR" || type == "BOOLEAN") {
2364  auto* val = new std::string(json_str(literal["literal"]));
2365  return new StringLiteral(val);
2366  } else if (type == "DECIMAL") {
2367  CHECK(literal.HasMember("scale"));
2368  CHECK(literal.HasMember("precision"));
2369  auto scale = json_i64(literal["scale"]);
2370  auto precision = json_i64(literal["precision"]);
2371  if (scale == 0) {
2372  auto int_val = std::stol(json_str(literal["literal"]));
2373  return new IntLiteral(int_val);
2374  } else if (precision > 18) {
2375  auto dbl_val = std::stod(json_str(literal["literal"]));
2376  return new DoubleLiteral(dbl_val);
2377  } else {
2378  auto* val = new std::string(json_str(literal["literal"]));
2379  return new FixedPtLiteral(val);
2380  }
2381  } else if (type == "DOUBLE") {
2382  auto dbl_val = std::stod(json_str(literal["literal"]));
2383  return new DoubleLiteral(dbl_val);
2384  } else {
2385  CHECK(false) << "Unexpected calcite data type: " << type;
2386  }
2387  return nullptr;
2388 }
2389 
2390 ArrayLiteral* parse_insert_array_literal(const rapidjson::Value& array) {
2391  CHECK(array.IsArray());
2392  auto json_elements = array.GetArray();
2393  auto* elements = new std::list<Expr*>();
2394  for (const auto& e : json_elements) {
2395  elements->push_back(parse_insert_literal(e));
2396  }
2397  return new ArrayLiteral(elements);
2398 }
2399 } // namespace
2400 
2401 InsertValuesStmt::InsertValuesStmt(const rapidjson::Value& payload)
2402  : InsertStmt(nullptr, nullptr) {
2403  CHECK(payload.HasMember("name"));
2404  table_ = std::make_unique<std::string>(json_str(payload["name"]));
2405 
2406  if (payload.HasMember("columns")) {
2407  CHECK(payload["columns"].IsArray());
2408  for (auto& column : payload["columns"].GetArray()) {
2409  std::string s = json_str(column);
2410  column_list_.emplace_back(std::make_unique<std::string>(s));
2411  }
2412  }
2413 
2414  CHECK(payload.HasMember("values") && payload["values"].IsArray());
2415  auto tuples = payload["values"].GetArray();
2416  if (tuples.Empty()) {
2417  throw std::runtime_error("Values statement cannot be empty");
2418  }
2419  values_lists_.reserve(tuples.Size());
2420  for (const auto& json_tuple : tuples) {
2421  auto values_list = std::make_unique<ValuesList>();
2422  CHECK(json_tuple.IsArray());
2423  auto tuple = json_tuple.GetArray();
2424  for (const auto& value : tuple) {
2425  CHECK(value.IsObject());
2426  if (value.HasMember("array")) {
2427  values_list->push_back(parse_insert_array_literal(value["array"]));
2428  } else {
2429  values_list->push_back(parse_insert_literal(value));
2430  }
2431  }
2432  values_lists_.push_back(std::move(values_list));
2433  }
2434 }
2435 
2437  Analyzer::Query& query) const {
2438  InsertStmt::analyze(catalog, query);
2439  size_t list_size = values_lists_[0]->get_value_list().size();
2440  if (!column_list_.empty()) {
2441  if (list_size != column_list_.size()) {
2442  throw std::runtime_error(
2443  "Numbers of columns and values don't match for the "
2444  "insert.");
2445  }
2446  } else {
2447  const auto tableId = query.get_result_table_id();
2448  const std::list<const ColumnDescriptor*> non_phys_cols =
2449  catalog.getAllColumnMetadataForTable(tableId, false, false, false);
2450  if (non_phys_cols.size() != list_size) {
2451  throw std::runtime_error(
2452  "Number of columns in table does not match the list of values given in the "
2453  "insert.");
2454  }
2455  }
2456  std::vector<const ColumnDescriptor*> cds;
2457  cds.reserve(query.get_result_col_list().size());
2458  for (auto id : query.get_result_col_list()) {
2459  const auto* cd = catalog.getMetadataForColumn(query.get_result_table_id(), id);
2460  CHECK(cd);
2461  cds.push_back(cd);
2462  }
2463  auto& query_values_lists = query.get_values_lists();
2464  query_values_lists.resize(values_lists_.size());
2465  for (size_t i = 0; i < values_lists_.size(); ++i) {
2466  const auto& values_list = values_lists_[i]->get_value_list();
2467  if (values_list.size() != list_size) {
2468  throw std::runtime_error(
2469  "Insert values lists should be of the same size. Expected: " +
2470  std::to_string(list_size) + ", Got: " + std::to_string(values_list.size()));
2471  }
2472  auto& query_values_list = query_values_lists[i];
2473  size_t cds_id = 0;
2474  for (auto& v : values_list) {
2475  auto e = v->analyze(catalog, query);
2476  const auto* cd = cds[cds_id];
2477  const auto& col_ti = cd->columnType;
2478  if (col_ti.get_notnull()) {
2479  auto c = std::dynamic_pointer_cast<Analyzer::Constant>(e);
2480  if (c != nullptr && c->get_is_null()) {
2481  throw std::runtime_error("Cannot insert NULL into column " + cd->columnName);
2482  }
2483  }
2484  e = e->add_cast(col_ti);
2485  query_values_list.emplace_back(new Analyzer::TargetEntry("", e, false));
2486  ++cds_id;
2487 
2488  if (col_ti.get_physical_cols() > 0) {
2489  CHECK(cd->columnType.is_geometry());
2490  auto c = dynamic_cast<const Analyzer::Constant*>(e.get());
2491  if (!c) {
2492  auto uoper = std::dynamic_pointer_cast<Analyzer::UOper>(e);
2493  if (uoper && uoper->get_optype() == kCAST) {
2494  c = dynamic_cast<const Analyzer::Constant*>(uoper->get_operand());
2495  }
2496  }
2497  bool is_null = false;
2498  std::string* geo_string{nullptr};
2499  if (c) {
2500  is_null = c->get_is_null();
2501  if (!is_null) {
2502  geo_string = c->get_constval().stringval;
2503  }
2504  }
2505  if (!is_null && !geo_string) {
2506  throw std::runtime_error("Expecting a WKT or WKB hex string for column " +
2507  cd->columnName);
2508  }
2509  std::vector<double> coords;
2510  std::vector<double> bounds;
2511  std::vector<int> ring_sizes;
2512  std::vector<int> poly_rings;
2513  int render_group =
2514  0; // @TODO simon.eves where to get render_group from in this context?!
2515  SQLTypeInfo import_ti{cd->columnType};
2516  if (!is_null) {
2518  *geo_string, import_ti, coords, bounds, ring_sizes, poly_rings)) {
2519  throw std::runtime_error("Cannot read geometry to insert into column " +
2520  cd->columnName);
2521  }
2522  if (coords.empty()) {
2523  // Importing from geo_string WKT resulted in empty coords: dealing with a NULL
2524  is_null = true;
2525  }
2526  if (cd->columnType.get_type() != import_ti.get_type()) {
2527  // allow POLYGON to be inserted into MULTIPOLYGON column
2528  if (!(import_ti.get_type() == SQLTypes::kPOLYGON &&
2529  cd->columnType.get_type() == SQLTypes::kMULTIPOLYGON)) {
2530  throw std::runtime_error(
2531  "Imported geometry doesn't match the type of column " + cd->columnName);
2532  }
2533  }
2534  } else {
2535  // Special case for NULL POINT, push NULL representation to coords
2536  if (cd->columnType.get_type() == kPOINT) {
2537  if (!coords.empty()) {
2538  throw std::runtime_error(
2539  "NULL POINT with unexpected coordinates in column " + cd->columnName);
2540  }
2541  coords.push_back(NULL_ARRAY_DOUBLE);
2542  coords.push_back(NULL_DOUBLE);
2543  }
2544  }
2545 
2546  // TODO: check if import SRID matches columns SRID, may need to transform before
2547  // inserting
2548 
2549  const auto* cd_coords = cds[cds_id];
2550  CHECK_EQ(cd_coords->columnType.get_type(), kARRAY);
2551  CHECK_EQ(cd_coords->columnType.get_subtype(), kTINYINT);
2552  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2553  if (!is_null || cd->columnType.get_type() == kPOINT) {
2554  auto compressed_coords = Geospatial::compress_coords(coords, col_ti);
2555  for (auto cc : compressed_coords) {
2556  Datum d;
2557  d.tinyintval = cc;
2558  auto e = makeExpr<Analyzer::Constant>(kTINYINT, false, d);
2559  value_exprs.push_back(e);
2560  }
2561  }
2562  query_values_list.emplace_back(new Analyzer::TargetEntry(
2563  "",
2564  makeExpr<Analyzer::Constant>(cd_coords->columnType, is_null, value_exprs),
2565  false));
2566  ++cds_id;
2567 
2568  if (cd->columnType.get_type() == kPOLYGON ||
2569  cd->columnType.get_type() == kMULTIPOLYGON) {
2570  // Put ring sizes array into separate physical column
2571  const auto* cd_ring_sizes = cds[cds_id];
2572  CHECK(cd_ring_sizes);
2573  CHECK_EQ(cd_ring_sizes->columnType.get_type(), kARRAY);
2574  CHECK_EQ(cd_ring_sizes->columnType.get_subtype(), kINT);
2575  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2576  if (!is_null) {
2577  for (auto c : ring_sizes) {
2578  Datum d;
2579  d.intval = c;
2580  auto e = makeExpr<Analyzer::Constant>(kINT, false, d);
2581  value_exprs.push_back(e);
2582  }
2583  }
2584  query_values_list.emplace_back(new Analyzer::TargetEntry(
2585  "",
2586  makeExpr<Analyzer::Constant>(
2587  cd_ring_sizes->columnType, is_null, value_exprs),
2588  false));
2589  ++cds_id;
2590 
2591  if (cd->columnType.get_type() == kMULTIPOLYGON) {
2592  // Put poly_rings array into separate physical column
2593  const auto* cd_poly_rings = cds[cds_id];
2594  CHECK(cd_poly_rings);
2595  CHECK_EQ(cd_poly_rings->columnType.get_type(), kARRAY);
2596  CHECK_EQ(cd_poly_rings->columnType.get_subtype(), kINT);
2597  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2598  if (!is_null) {
2599  for (auto c : poly_rings) {
2600  Datum d;
2601  d.intval = c;
2602  auto e = makeExpr<Analyzer::Constant>(kINT, false, d);
2603  value_exprs.push_back(e);
2604  }
2605  }
2606  query_values_list.emplace_back(new Analyzer::TargetEntry(
2607  "",
2608  makeExpr<Analyzer::Constant>(
2609  cd_poly_rings->columnType, is_null, value_exprs),
2610  false));
2611  ++cds_id;
2612  }
2613  }
2614 
2615  if (cd->columnType.get_type() == kLINESTRING ||
2616  cd->columnType.get_type() == kPOLYGON ||
2617  cd->columnType.get_type() == kMULTIPOLYGON) {
2618  const auto* cd_bounds = cds[cds_id];
2619  CHECK(cd_bounds);
2620  CHECK_EQ(cd_bounds->columnType.get_type(), kARRAY);
2621  CHECK_EQ(cd_bounds->columnType.get_subtype(), kDOUBLE);
2622  std::list<std::shared_ptr<Analyzer::Expr>> value_exprs;
2623  if (!is_null) {
2624  for (auto b : bounds) {
2625  Datum d;
2626  d.doubleval = b;
2627  auto e = makeExpr<Analyzer::Constant>(kDOUBLE, false, d);
2628  value_exprs.push_back(e);
2629  }
2630  }
2631  query_values_list.emplace_back(new Analyzer::TargetEntry(
2632  "",
2633  makeExpr<Analyzer::Constant>(cd_bounds->columnType, is_null, value_exprs),
2634  false));
2635  ++cds_id;
2636  }
2637 
2638  if (cd->columnType.get_type() == kPOLYGON ||
2639  cd->columnType.get_type() == kMULTIPOLYGON) {
2640  // Put render group into separate physical column
2641  const auto* cd_render_group = cds[cds_id];
2642  CHECK(cd_render_group);
2643  CHECK_EQ(cd_render_group->columnType.get_type(), kINT);
2644  Datum d;
2645  d.intval = render_group;
2646  query_values_list.emplace_back(new Analyzer::TargetEntry(
2647  "",
2648  makeExpr<Analyzer::Constant>(cd_render_group->columnType, is_null, d),
2649  false));
2650  ++cds_id;
2651  }
2652  }
2653  }
2654  }
2655 }
2656 
2658  bool read_only_mode) {
2659  if (read_only_mode) {
2660  throw std::runtime_error("INSERT values invalid in read only mode.");
2661  }
2662  auto execute_read_lock =
2666  auto& catalog = session.getCatalog();
2667  const auto td_with_lock =
2669  catalog, *table_);
2672  *table_)) {
2673  throw std::runtime_error("User has no insert privileges on " + *table_ + ".");
2674  }
2675  Analyzer::Query query;
2676  analyze(catalog, query);
2677 
2678  // Take an insert data write lock, which prevents concurrent inserts.
2679  const auto insert_data_lock =
2681 
2682  // NOTE(max): we do the same checks as below just a few calls earlier in analyze().
2683  // Do we keep those intentionally to make sure nothing changed in between w/o
2684  // catalog locks or is it just a duplicate work?
2685  auto td = td_with_lock();
2686  CHECK(td);
2687  if (td->isView) {
2688  throw std::runtime_error("Singleton inserts on views is not supported.");
2689  }
2691 
2693  RelAlgExecutor ra_executor(executor.get(), catalog);
2694 
2696  if (!leafs_connector_) {
2697  leafs_connector_ = &local_connector;
2698  }
2700  try {
2701  ra_executor.executeSimpleInsert(query, insert_data_loader, session);
2702  } catch (...) {
2703  try {
2704  leafs_connector_->rollback(session, td->tableId);
2705  } catch (std::exception& e) {
2706  LOG(ERROR) << "An error occurred during insert rollback attempt. Table id: "
2707  << td->tableId << ", Error: " << e.what();
2708  }
2709  throw;
2710  }
2711  if (!td->isTemporaryTable()) {
2712  leafs_connector_->checkpoint(session, td->tableId);
2713  }
2714 }
2715 
2717  Analyzer::Query& query) const {
2718  throw std::runtime_error("UPDATE statement not supported yet.");
2719 }
2720 
2722  Analyzer::Query& query) const {
2723  throw std::runtime_error("DELETE statement not supported yet.");
2724 }
2725 
2726 namespace {
2727 
2729  const auto& col_ti = cd.columnType;
2730  if (!col_ti.is_integer() && !col_ti.is_time() &&
2731  !(col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT)) {
2732  throw std::runtime_error("Cannot shard on type " + col_ti.get_type_name() +
2733  ", encoding " + col_ti.get_compression_name());
2734  }
2735 }
2736 
2737 size_t shard_column_index(const std::string& name,
2738  const std::list<ColumnDescriptor>& columns) {
2739  size_t index = 1;
2740  for (const auto& cd : columns) {
2741  if (cd.columnName == name) {
2743  return index;
2744  }
2745  ++index;
2746  if (cd.columnType.is_geometry()) {
2747  index += cd.columnType.get_physical_cols();
2748  }
2749  }
2750  // Not found, return 0
2751  return 0;
2752 }
2753 
2754 size_t sort_column_index(const std::string& name,
2755  const std::list<ColumnDescriptor>& columns) {
2756  size_t index = 1;
2757  for (const auto& cd : columns) {
2758  if (boost::to_upper_copy<std::string>(cd.columnName) == name) {
2759  return index;
2760  }
2761  ++index;
2762  if (cd.columnType.is_geometry()) {
2763  index += cd.columnType.get_physical_cols();
2764  }
2765  }
2766  // Not found, return 0
2767  return 0;
2768 }
2769 
2770 void set_string_field(rapidjson::Value& obj,
2771  const std::string& field_name,
2772  const std::string& field_value,
2773  rapidjson::Document& document) {
2774  rapidjson::Value field_name_json_str;
2775  field_name_json_str.SetString(
2776  field_name.c_str(), field_name.size(), document.GetAllocator());
2777  rapidjson::Value field_value_json_str;
2778  field_value_json_str.SetString(
2779  field_value.c_str(), field_value.size(), document.GetAllocator());
2780  obj.AddMember(field_name_json_str, field_value_json_str, document.GetAllocator());
2781 }
2782 
2784  const ShardKeyDef* shard_key_def,
2785  const std::vector<SharedDictionaryDef>& shared_dict_defs) {
2786  rapidjson::Document document;
2787  auto& allocator = document.GetAllocator();
2788  rapidjson::Value arr(rapidjson::kArrayType);
2789  if (shard_key_def) {
2790  rapidjson::Value shard_key_obj(rapidjson::kObjectType);
2791  set_string_field(shard_key_obj, "type", "SHARD KEY", document);
2792  set_string_field(shard_key_obj, "name", shard_key_def->get_column(), document);
2793  arr.PushBack(shard_key_obj, allocator);
2794  }
2795  for (const auto& shared_dict_def : shared_dict_defs) {
2796  rapidjson::Value shared_dict_obj(rapidjson::kObjectType);
2797  set_string_field(shared_dict_obj, "type", "SHARED DICTIONARY", document);
2798  set_string_field(shared_dict_obj, "name", shared_dict_def.get_column(), document);
2800  shared_dict_obj, "foreign_table", shared_dict_def.get_foreign_table(), document);
2801  set_string_field(shared_dict_obj,
2802  "foreign_column",
2803  shared_dict_def.get_foreign_column(),
2804  document);
2805  arr.PushBack(shared_dict_obj, allocator);
2806  }
2807  rapidjson::StringBuffer buffer;
2808  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
2809  arr.Accept(writer);
2810  return buffer.GetString();
2811 }
2812 
2813 template <typename LITERAL_TYPE,
2814  typename ASSIGNMENT,
2815  typename VALIDATE = DefaultValidate<LITERAL_TYPE>>
2816 decltype(auto) get_property_value(const NameValueAssign* p,
2817  ASSIGNMENT op,
2818  VALIDATE validate = VALIDATE()) {
2819  const auto val = validate(p);
2820  return op(val);
2821 }
2822 
2824  const NameValueAssign* p,
2825  const std::list<ColumnDescriptor>& columns) {
2826  auto assignment = [&td](const auto val) { td.storageType = val; };
2827  return get_property_value<StringLiteral, decltype(assignment), CaseSensitiveValidate>(
2828  p, assignment);
2829 }
2830 
2832  const NameValueAssign* p,
2833  const std::list<ColumnDescriptor>& columns) {
2834  return get_property_value<IntLiteral>(p,
2835  [&td](const auto val) { td.maxFragRows = val; });
2836 }
2837 
2839  const NameValueAssign* p,
2840  const std::list<ColumnDescriptor>& columns) {
2841  return get_property_value<IntLiteral>(
2842  p, [&df_td](const auto val) { df_td.maxFragRows = val; });
2843 }
2844 
2846  const NameValueAssign* p,
2847  const std::list<ColumnDescriptor>& columns) {
2848  return get_property_value<IntLiteral>(p,
2849  [&td](const auto val) { td.maxChunkSize = val; });
2850 }
2851 
2853  DataframeTableDescriptor& df_td,
2854  const NameValueAssign* p,
2855  const std::list<ColumnDescriptor>& columns) {
2856  return get_property_value<IntLiteral>(
2857  p, [&df_td](const auto val) { df_td.maxChunkSize = val; });
2858 }
2859 
2861  const NameValueAssign* p,
2862  const std::list<ColumnDescriptor>& columns) {
2863  return get_property_value<StringLiteral>(p, [&df_td](const auto val) {
2864  if (val.size() != 1) {
2865  throw std::runtime_error("Length of DELIMITER must be equal to 1.");
2866  }
2867  df_td.delimiter = val;
2868  });
2869 }
2870 
2872  const NameValueAssign* p,
2873  const std::list<ColumnDescriptor>& columns) {
2874  return get_property_value<StringLiteral>(p, [&df_td](const auto val) {
2875  if (val == "FALSE") {
2876  df_td.hasHeader = false;
2877  } else if (val == "TRUE") {
2878  df_td.hasHeader = true;
2879  } else {
2880  throw std::runtime_error("Option HEADER support only 'true' or 'false' values.");
2881  }
2882  });
2883 }
2884 
2886  const NameValueAssign* p,
2887  const std::list<ColumnDescriptor>& columns) {
2888  return get_property_value<IntLiteral>(p,
2889  [&td](const auto val) { td.fragPageSize = val; });
2890 }
2892  const NameValueAssign* p,
2893  const std::list<ColumnDescriptor>& columns) {
2894  return get_property_value<IntLiteral>(p, [&td](const auto val) { td.maxRows = val; });
2895 }
2896 
2898  const NameValueAssign* p,
2899  const std::list<ColumnDescriptor>& columns) {
2900  return get_property_value<IntLiteral>(
2901  p, [&df_td](const auto val) { df_td.skipRows = val; });
2902 }
2903 
2905  const NameValueAssign* p,
2906  const std::list<ColumnDescriptor>& columns) {
2907  return get_property_value<StringLiteral>(p, [&td](const auto partitions_uc) {
2908  if (partitions_uc != "SHARDED" && partitions_uc != "REPLICATED") {
2909  throw std::runtime_error("PARTITIONS must be SHARDED or REPLICATED");
2910  }
2911  if (td.shardedColumnId != 0 && partitions_uc == "REPLICATED") {
2912  throw std::runtime_error(
2913  "A table cannot be sharded and replicated at the same time");
2914  };
2915  td.partitions = partitions_uc;
2916  });
2917 }
2919  const NameValueAssign* p,
2920  const std::list<ColumnDescriptor>& columns) {
2921  if (!td.shardedColumnId) {
2922  throw std::runtime_error("SHARD KEY must be defined.");
2923  }
2924  return get_property_value<IntLiteral>(p, [&td](const auto shard_count) {
2925  if (g_leaf_count && shard_count % g_leaf_count) {
2926  throw std::runtime_error(
2927  "SHARD_COUNT must be a multiple of the number of leaves in the cluster.");
2928  }
2929  td.nShards = g_leaf_count ? shard_count / g_leaf_count : shard_count;
2930  if (!td.shardedColumnId && !td.nShards) {
2931  throw std::runtime_error(
2932  "Must specify the number of shards through the SHARD_COUNT option");
2933  };
2934  });
2935 }
2936 
2937 decltype(auto) get_vacuum_def(TableDescriptor& td,
2938  const NameValueAssign* p,
2939  const std::list<ColumnDescriptor>& columns) {
2940  return get_property_value<StringLiteral>(p, [&td](const auto vacuum_uc) {
2941  if (vacuum_uc != "IMMEDIATE" && vacuum_uc != "DELAYED") {
2942  throw std::runtime_error("VACUUM must be IMMEDIATE or DELAYED");
2943  }
2944  td.hasDeletedCol = boost::iequals(vacuum_uc, "IMMEDIATE") ? false : true;
2945  });
2946 }
2947 
2949  const NameValueAssign* p,
2950  const std::list<ColumnDescriptor>& columns) {
2951  return get_property_value<StringLiteral>(p, [&td, &columns](const auto sort_upper) {
2952  td.sortedColumnId = sort_column_index(sort_upper, columns);
2953  if (!td.sortedColumnId) {
2954  throw std::runtime_error("Specified sort column " + sort_upper + " doesn't exist");
2955  }
2956  });
2957 }
2958 
2960  const NameValueAssign* p,
2961  const std::list<ColumnDescriptor>& columns) {
2962  auto assignment = [&td](const auto val) {
2963  td.maxRollbackEpochs =
2964  val < 0 ? -1 : val; // Anything < 0 means unlimited rollbacks. Note that 0
2965  // still means keeping a shadow copy of data/metdata
2966  // between epochs so bad writes can be rolled back
2967  };
2968  return get_property_value<IntLiteral, decltype(assignment), PositiveOrZeroValidate>(
2969  p, assignment);
2970 }
2971 
2972 static const std::map<const std::string, const TableDefFuncPtr> tableDefFuncMap = {
2973  {"fragment_size"s, get_frag_size_def},
2974  {"max_chunk_size"s, get_max_chunk_size_def},
2975  {"page_size"s, get_page_size_def},
2976  {"max_rows"s, get_max_rows_def},
2977  {"partitions"s, get_partions_def},
2978  {"shard_count"s, get_shard_count_def},
2979  {"vacuum"s, get_vacuum_def},
2980  {"sort_column"s, get_sort_column_def},
2981  {"storage_type"s, get_storage_type},
2982  {"max_rollback_epochs", get_max_rollback_epochs_def}};
2983 
2985  const std::unique_ptr<NameValueAssign>& p,
2986  const std::list<ColumnDescriptor>& columns) {
2987  const auto it = tableDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
2988  if (it == tableDefFuncMap.end()) {
2989  throw std::runtime_error(
2990  "Invalid CREATE TABLE option " + *p->get_name() +
2991  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, PAGE_SIZE, MAX_ROLLBACK_EPOCHS, "
2992  "MAX_ROWS, "
2993  "PARTITIONS, SHARD_COUNT, VACUUM, SORT_COLUMN, STORAGE_TYPE.");
2994  }
2995  return it->second(td, p.get(), columns);
2996 }
2997 
2999  const std::unique_ptr<NameValueAssign>& p,
3000  const std::list<ColumnDescriptor>& columns) {
3001  const auto it = tableDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
3002  if (it == tableDefFuncMap.end()) {
3003  throw std::runtime_error(
3004  "Invalid CREATE TABLE AS option " + *p->get_name() +
3005  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, PAGE_SIZE, MAX_ROLLBACK_EPOCHS, "
3006  "MAX_ROWS, "
3007  "PARTITIONS, SHARD_COUNT, VACUUM, SORT_COLUMN, STORAGE_TYPE or "
3008  "USE_SHARED_DICTIONARIES.");
3009  }
3010  return it->second(td, p.get(), columns);
3011 }
3012 
3013 static const std::map<const std::string, const DataframeDefFuncPtr> dataframeDefFuncMap =
3014  {{"fragment_size"s, get_frag_size_dataframe_def},
3015  {"max_chunk_size"s, get_max_chunk_size_dataframe_def},
3016  {"skip_rows"s, get_skip_rows_def},
3017  {"delimiter"s, get_delimiter_def},
3018  {"header"s, get_header_def}};
3019 
3021  const std::unique_ptr<NameValueAssign>& p,
3022  const std::list<ColumnDescriptor>& columns) {
3023  const auto it =
3024  dataframeDefFuncMap.find(boost::to_lower_copy<std::string>(*p->get_name()));
3025  if (it == dataframeDefFuncMap.end()) {
3026  throw std::runtime_error(
3027  "Invalid CREATE DATAFRAME option " + *p->get_name() +
3028  ". Should be FRAGMENT_SIZE, MAX_CHUNK_SIZE, SKIP_ROWS, DELIMITER or HEADER.");
3029  }
3030  return it->second(df_td, p.get(), columns);
3031 }
3032 
3033 std::unique_ptr<ColumnDef> column_from_json(const rapidjson::Value& element) {
3034  CHECK(element.HasMember("name"));
3035  auto col_name = std::make_unique<std::string>(json_str(element["name"]));
3036  CHECK(element.HasMember("sqltype"));
3037  const auto sql_types = to_sql_type(json_str(element["sqltype"]));
3038 
3039  // decimal / numeric precision / scale
3040  int precision = -1;
3041  int scale = -1;
3042  if (element.HasMember("precision")) {
3043  precision = json_i64(element["precision"]);
3044  }
3045  if (element.HasMember("scale")) {
3046  scale = json_i64(element["scale"]);
3047  }
3048 
3049  std::optional<int64_t> array_size;
3050  if (element.HasMember("arraySize")) {
3051  // We do not yet support geo arrays
3052  array_size = json_i64(element["arraySize"]);
3053  }
3054  std::unique_ptr<SQLType> sql_type;
3055  if (element.HasMember("subtype")) {
3056  CHECK(element.HasMember("coordinateSystem"));
3057  const auto subtype_sql_types = to_sql_type(json_str(element["subtype"]));
3058  sql_type =
3059  std::make_unique<SQLType>(subtype_sql_types,
3060  static_cast<int>(sql_types),
3061  static_cast<int>(json_i64(element["coordinateSystem"])),
3062  false);
3063  } else if (precision > 0 && scale > 0) {
3064  sql_type = std::make_unique<SQLType>(sql_types,
3065  precision,
3066  scale,
3067  /*is_array=*/array_size.has_value(),
3068  array_size ? *array_size : -1);
3069  } else if (precision > 0) {
3070  sql_type = std::make_unique<SQLType>(sql_types,
3071  precision,
3072  0,
3073  /*is_array=*/array_size.has_value(),
3074  array_size ? *array_size : -1);
3075  } else {
3076  sql_type = std::make_unique<SQLType>(sql_types,
3077  /*is_array=*/array_size.has_value(),
3078  array_size ? *array_size : -1);
3079  }
3080  CHECK(sql_type);
3081 
3082  CHECK(element.HasMember("nullable"));
3083  const auto nullable = json_bool(element["nullable"]);
3084  std::unique_ptr<ColumnConstraintDef> constraint_def;
3085  StringLiteral* str_literal = nullptr;
3086  if (element.HasMember("default") && !element["default"].IsNull()) {
3087  std::string* defaultval = new std::string(json_str(element["default"]));
3088  boost::algorithm::trim_if(*defaultval, boost::is_any_of(" \"'`"));
3089  str_literal = new StringLiteral(defaultval);
3090  }
3091 
3092  constraint_def = std::make_unique<ColumnConstraintDef>(/*notnull=*/!nullable,
3093  /*unique=*/false,
3094  /*primarykey=*/false,
3095  /*defaultval=*/str_literal);
3096  std::unique_ptr<CompressDef> compress_def;
3097  if (element.HasMember("encodingType") && !element["encodingType"].IsNull()) {
3098  std::string encoding_type = json_str(element["encodingType"]);
3099  CHECK(element.HasMember("encodingSize"));
3100  auto encoding_name = std::make_unique<std::string>(json_str(element["encodingType"]));
3101  compress_def = std::make_unique<CompressDef>(encoding_name.release(),
3102  json_i64(element["encodingSize"]));
3103  }
3104  return std::make_unique<ColumnDef>(col_name.release(),
3105  sql_type.release(),
3106  compress_def ? compress_def.release() : nullptr,
3107  constraint_def ? constraint_def.release() : nullptr);
3108 }
3109 
3110 void parse_elements(const rapidjson::Value& payload,
3111  std::string element_name,
3112  std::string& table_name,
3113  std::list<std::unique_ptr<TableElement>>& table_element_list) {
3114  const auto elements = payload[element_name].GetArray();
3115  for (const auto& element : elements) {
3116  CHECK(element.IsObject());
3117  CHECK(element.HasMember("type"));
3118  if (json_str(element["type"]) == "SQL_COLUMN_DECLARATION") {
3119  auto col_def = column_from_json(element);
3120  table_element_list.emplace_back(std::move(col_def));
3121  } else if (json_str(element["type"]) == "SQL_COLUMN_CONSTRAINT") {
3122  CHECK(element.HasMember("name"));
3123  if (json_str(element["name"]) == "SHARD_KEY") {
3124  CHECK(element.HasMember("columns"));
3125  CHECK(element["columns"].IsArray());
3126  const auto& columns = element["columns"].GetArray();
3127  if (columns.Size() != size_t(1)) {
3128  throw std::runtime_error("Only one shard column is currently supported.");
3129  }
3130  auto shard_key_def = std::make_unique<ShardKeyDef>(json_str(columns[0]));
3131  table_element_list.emplace_back(std::move(shard_key_def));
3132  } else if (json_str(element["name"]) == "SHARED_DICT") {
3133  CHECK(element.HasMember("columns"));
3134  CHECK(element["columns"].IsArray());
3135  const auto& columns = element["columns"].GetArray();
3136  if (columns.Size() != size_t(1)) {
3137  throw std::runtime_error(
3138  R"(Only one column per shared dictionary entry is currently supported. Use multiple SHARED DICT statements to share dictionaries from multiple columns.)");
3139  }
3140  CHECK(element.HasMember("references") && element["references"].IsObject());
3141  const auto& references = element["references"].GetObject();
3142  std::string references_table_name;
3143  if (references.HasMember("table")) {
3144  references_table_name = json_str(references["table"]);
3145  } else {
3146  references_table_name = table_name;
3147  }
3148  CHECK(references.HasMember("column"));
3149 
3150  auto shared_dict_def = std::make_unique<SharedDictionaryDef>(
3151  json_str(columns[0]), references_table_name, json_str(references["column"]));
3152  table_element_list.emplace_back(std::move(shared_dict_def));
3153 
3154  } else {
3155  LOG(FATAL) << "Unsupported type for SQL_COLUMN_CONSTRAINT: "
3156  << json_str(element["name"]);
3157  }
3158  } else {
3159  LOG(FATAL) << "Unsupported element type for CREATE TABLE: "
3160  << element["type"].GetString();
3161  }
3162  }
3163 }
3164 } // namespace
3165 
3166 CreateTableStmt::CreateTableStmt(const rapidjson::Value& payload) {
3167  CHECK(payload.HasMember("name"));
3168  table_ = std::make_unique<std::string>(json_str(payload["name"]));
3169  CHECK(payload.HasMember("elements"));
3170  CHECK(payload["elements"].IsArray());
3171 
3172  is_temporary_ = false;
3173  if (payload.HasMember("temporary")) {
3174  is_temporary_ = json_bool(payload["temporary"]);
3175  }
3176 
3177  if_not_exists_ = false;
3178  if (payload.HasMember("ifNotExists")) {
3179  if_not_exists_ = json_bool(payload["ifNotExists"]);
3180  }
3181 
3182  parse_elements(payload, "elements", *table_, table_element_list_);
3183 
3184  parse_options(payload, storage_options_);
3185 }
3186 
3188  TableDescriptor& td,
3189  std::list<ColumnDescriptor>& columns,
3190  std::vector<SharedDictionaryDef>& shared_dict_defs) {
3191  std::unordered_set<std::string> uc_col_names;
3192  const auto& catalog = session.getCatalog();
3193  const ShardKeyDef* shard_key_def{nullptr};
3194  for (auto& e : table_element_list_) {
3195  if (dynamic_cast<SharedDictionaryDef*>(e.get())) {
3196  auto shared_dict_def = static_cast<SharedDictionaryDef*>(e.get());
3198  this, shared_dict_def, columns, shared_dict_defs, catalog);
3199  shared_dict_defs.push_back(*shared_dict_def);
3200  continue;
3201  }
3202  if (dynamic_cast<ShardKeyDef*>(e.get())) {
3203  if (shard_key_def) {
3204  throw std::runtime_error("Specified more than one shard key");
3205  }
3206  shard_key_def = static_cast<const ShardKeyDef*>(e.get());
3207  continue;
3208  }
3209  if (!dynamic_cast<ColumnDef*>(e.get())) {
3210  throw std::runtime_error("Table constraints are not supported yet.");
3211  }
3212  ColumnDef* coldef = static_cast<ColumnDef*>(e.get());
3213  ColumnDescriptor cd;
3214  cd.columnName = *coldef->get_column_name();
3216  setColumnDescriptor(cd, coldef);
3217  columns.push_back(cd);
3218  }
3219 
3220  ddl_utils::set_default_table_attributes(*table_, td, columns.size());
3221 
3222  if (shard_key_def) {
3223  td.shardedColumnId = shard_column_index(shard_key_def->get_column(), columns);
3224  if (!td.shardedColumnId) {
3225  throw std::runtime_error("Specified shard column " + shard_key_def->get_column() +
3226  " doesn't exist");
3227  }
3228  }
3229  if (is_temporary_) {
3231  } else {
3233  }
3234  if (!storage_options_.empty()) {
3235  for (auto& p : storage_options_) {
3236  get_table_definitions(td, p, columns);
3237  }
3238  }
3239  if (td.shardedColumnId && !td.nShards) {
3240  throw std::runtime_error("SHARD_COUNT needs to be specified with SHARD_KEY.");
3241  }
3242  td.keyMetainfo = serialize_key_metainfo(shard_key_def, shared_dict_defs);
3243 }
3244 
3246  bool read_only_mode) {
3247  if (read_only_mode) {
3248  throw std::runtime_error("CREATE TABLE invalid in read only mode.");
3249  }
3250  auto& catalog = session.getCatalog();
3251 
3252  const auto execute_write_lock =
3256 
3257  // check access privileges
3260  throw std::runtime_error("Table " + *table_ +
3261  " will not be created. User has no create privileges.");
3262  }
3263 
3264  if (!catalog.validateNonExistentTableOrView(*table_, if_not_exists_)) {
3265  return;
3266  }
3267 
3268  TableDescriptor td;
3269  std::list<ColumnDescriptor> columns;
3270  std::vector<SharedDictionaryDef> shared_dict_defs;
3271 
3272  executeDryRun(session, td, columns, shared_dict_defs);
3273  td.userId = session.get_currentUser().userId;
3274 
3275  catalog.createShardedTable(td, columns, shared_dict_defs);
3276  // TODO (max): It's transactionally unsafe, should be fixed: we may create object w/o
3277  // privileges
3278  SysCatalog::instance().createDBObject(
3279  session.get_currentUser(), td.tableName, TableDBObjectType, catalog);
3280 }
3281 
3282 CreateDataframeStmt::CreateDataframeStmt(const rapidjson::Value& payload) {
3283  CHECK(payload.HasMember("name"));
3284  table_ = std::make_unique<std::string>(json_str(payload["name"]));
3285 
3286  CHECK(payload.HasMember("elementList"));
3287  parse_elements(payload, "elementList", *table_, table_element_list_);
3288 
3289  CHECK(payload.HasMember("filePath"));
3290  std::string fs = json_str(payload["filePath"]);
3291  // strip leading/trailing spaces/quotes/single quotes
3292  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
3293  filename_ = std::make_unique<std::string>(fs);
3294 
3295  parse_options(payload, storage_options_);
3296 }
3297 
3299  bool read_only_mode) {
3300  if (read_only_mode) {
3301  throw std::runtime_error("CREATE DATAFRAME invalid in read only mode.");
3302  }
3303  auto& catalog = session.getCatalog();
3304 
3305  const auto execute_write_lock =
3309 
3310  // check access privileges
3313  throw std::runtime_error("Table " + *table_ +
3314  " will not be created. User has no create privileges.");
3315  }
3316 
3317  if (catalog.getMetadataForTable(*table_) != nullptr) {
3318  throw std::runtime_error("Table " + *table_ + " already exists.");
3319  }
3321  std::list<ColumnDescriptor> columns;
3322  std::vector<SharedDictionaryDef> shared_dict_defs;
3323 
3324  std::unordered_set<std::string> uc_col_names;
3325  for (auto& e : table_element_list_) {
3326  if (dynamic_cast<SharedDictionaryDef*>(e.get())) {
3327  auto shared_dict_def = static_cast<SharedDictionaryDef*>(e.get());
3329  this, shared_dict_def, columns, shared_dict_defs, catalog);
3330  shared_dict_defs.push_back(*shared_dict_def);
3331  continue;
3332  }
3333  if (!dynamic_cast<ColumnDef*>(e.get())) {
3334  throw std::runtime_error("Table constraints are not supported yet.");
3335  }
3336  ColumnDef* coldef = static_cast<ColumnDef*>(e.get());
3337  ColumnDescriptor cd;
3338  cd.columnName = *coldef->get_column_name();
3339  const auto uc_col_name = boost::to_upper_copy<std::string>(cd.columnName);
3340  const auto it_ok = uc_col_names.insert(uc_col_name);
3341  if (!it_ok.second) {
3342  throw std::runtime_error("Column '" + cd.columnName + "' defined more than once");
3343  }
3344  setColumnDescriptor(cd, coldef);
3345  columns.push_back(cd);
3346  }
3347 
3348  df_td.tableName = *table_;
3349  df_td.nColumns = columns.size();
3350  df_td.isView = false;
3351  df_td.fragmenter = nullptr;
3356  df_td.maxRows = DEFAULT_MAX_ROWS;
3358  if (!storage_options_.empty()) {
3359  for (auto& p : storage_options_) {
3360  get_dataframe_definitions(df_td, p, columns);
3361  }
3362  }
3363  df_td.keyMetainfo = serialize_key_metainfo(nullptr, shared_dict_defs);
3364  df_td.userId = session.get_currentUser().userId;
3365  df_td.storageType = *filename_;
3366 
3367  catalog.createShardedTable(df_td, columns, shared_dict_defs);
3368  // TODO (max): It's transactionally unsafe, should be fixed: we may create object w/o
3369  // privileges
3370  SysCatalog::instance().createDBObject(
3371  session.get_currentUser(), df_td.tableName, TableDBObjectType, catalog);
3372 }
3373 
3374 std::shared_ptr<ResultSet> getResultSet(QueryStateProxy query_state_proxy,
3375  const std::string select_stmt,
3376  std::vector<TargetMetaInfo>& targets,
3377  bool validate_only = false,
3378  std::vector<size_t> outer_fragment_indices = {},
3379  bool allow_interrupt = false) {
3380  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
3381  auto& catalog = session->getCatalog();
3382 
3384 #ifdef HAVE_CUDA
3385  const auto device_type = session->get_executor_device_type();
3386 #else
3387  const auto device_type = ExecutorDeviceType::CPU;
3388 #endif // HAVE_CUDA
3389  auto calcite_mgr = catalog.getCalciteMgr();
3390 
3391  // TODO MAT this should actually get the global or the session parameter for
3392  // view optimization
3393  const auto calciteQueryParsingOption =
3394  calcite_mgr->getCalciteQueryParsingOption(true, false, true);
3395  const auto calciteOptimizationOption =
3396  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {});
3397  const auto query_ra = calcite_mgr
3398  ->process(query_state_proxy,
3399  pg_shim(select_stmt),
3400  calciteQueryParsingOption,
3401  calciteOptimizationOption)
3402  .plan_result;
3403  RelAlgExecutor ra_executor(executor.get(),
3404  catalog,
3405  query_ra,
3406  query_state_proxy.getQueryState().shared_from_this());
3408  // TODO(adb): Need a better method of dropping constants into this ExecutionOptions
3409  // struct
3410  ExecutionOptions eo = {false,
3411  false,
3412  true,
3413  false,
3414  true,
3415  false,
3416  false,
3417  validate_only,
3418  false,
3419  10000,
3420  false,
3421  false,
3422  1000,
3423  allow_interrupt,
3427  outer_fragment_indices};
3428 
3429  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
3432  nullptr,
3433  nullptr,
3434  0,
3435  0),
3436  {}};
3437  result = ra_executor.executeRelAlgQuery(co, eo, false, nullptr);
3438  targets = result.getTargetsMeta();
3439 
3440  return result.getRows();
3441 }
3442 
3444  std::string& sql_query_string) {
3445  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
3446  auto& catalog = session->getCatalog();
3447 
3449 #ifdef HAVE_CUDA
3450  const auto device_type = session->get_executor_device_type();
3451 #else
3452  const auto device_type = ExecutorDeviceType::CPU;
3453 #endif // HAVE_CUDA
3454  auto calcite_mgr = catalog.getCalciteMgr();
3455 
3456  // TODO MAT this should actually get the global or the session parameter for
3457  // view optimization
3458  const auto calciteQueryParsingOption =
3459  calcite_mgr->getCalciteQueryParsingOption(true, false, true);
3460  const auto calciteOptimizationOption =
3461  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {});
3462  const auto query_ra = calcite_mgr
3463  ->process(query_state_proxy,
3464  pg_shim(sql_query_string),
3465  calciteQueryParsingOption,
3466  calciteOptimizationOption)
3467  .plan_result;
3468  RelAlgExecutor ra_executor(executor.get(), catalog, query_ra);
3469  CompilationOptions co = {device_type, true, ExecutorOptLevel::Default, false};
3470  // TODO(adb): Need a better method of dropping constants into this ExecutionOptions
3471  // struct
3472  ExecutionOptions eo = {false,
3473  false,
3474  true,
3475  false,
3476  true,
3477  false,
3478  false,
3479  false,
3480  false,
3481  10000,
3482  false,
3483  false,
3484  0.9,
3485  false};
3486  return ra_executor.getOuterFragmentCount(co, eo);
3487 }
3488 
3490  std::string& sql_query_string,
3491  std::vector<size_t> outer_frag_indices,
3492  bool validate_only,
3493  bool allow_interrupt) {
3494  // TODO(PS): Should we be using the shimmed query in getResultSet?
3495  std::string pg_shimmed_select_query = pg_shim(sql_query_string);
3496 
3497  std::vector<TargetMetaInfo> target_metainfos;
3499  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
3500  auto query_session = session ? session->get_session_id() : "";
3501  auto query_submitted_time = query_state_proxy.getQueryState().getQuerySubmittedTime();
3502  if (allow_interrupt && !validate_only && !query_session.empty()) {
3503  executor->enrollQuerySession(query_session,
3504  sql_query_string,
3505  query_submitted_time,
3507  QuerySessionStatus::QueryStatus::PENDING_EXECUTOR);
3508  }
3509  auto result_rows = getResultSet(query_state_proxy,
3510  sql_query_string,
3511  target_metainfos,
3512  validate_only,
3513  outer_frag_indices,
3514  allow_interrupt);
3515  AggregatedResult res = {result_rows, target_metainfos};
3516  return res;
3517 }
3518 
3519 std::vector<AggregatedResult> LocalQueryConnector::query(
3520  QueryStateProxy query_state_proxy,
3521  std::string& sql_query_string,
3522  std::vector<size_t> outer_frag_indices,
3523  bool allow_interrupt) {
3524  auto res = query(
3525  query_state_proxy, sql_query_string, outer_frag_indices, false, allow_interrupt);
3526  return {res};
3527 }
3528 
3529 std::list<ColumnDescriptor> LocalQueryConnector::getColumnDescriptors(
3531  bool for_create) {
3532  std::list<ColumnDescriptor> column_descriptors;
3533  std::list<ColumnDescriptor> column_descriptors_for_create;
3534 
3535  int rowid_suffix = 0;
3536  for (const auto& target_metainfo : result.targets_meta) {
3537  ColumnDescriptor cd;
3538  cd.columnName = target_metainfo.get_resname();
3539  if (cd.columnName == "rowid") {
3540  cd.columnName += std::to_string(rowid_suffix++);
3541  }
3542  cd.columnType = target_metainfo.get_physical_type_info();
3543 
3544  ColumnDescriptor cd_for_create = cd;
3545 
3547  // we need to reset the comp param (as this points to the actual dictionary)
3548  if (cd.columnType.is_array()) {
3549  // for dict encoded arrays, it is always 4 bytes
3550  cd_for_create.columnType.set_comp_param(32);
3551  } else {
3552  cd_for_create.columnType.set_comp_param(cd.columnType.get_size() * 8);
3553  }
3554  }
3555 
3556  if (cd.columnType.is_date() && !cd.columnType.is_date_in_days()) {
3557  // default to kENCODING_DATE_IN_DAYS encoding
3559  cd_for_create.columnType.set_comp_param(0);
3560  }
3561 
3562  column_descriptors_for_create.push_back(cd_for_create);
3563  column_descriptors.push_back(cd);
3564  }
3565 
3566  if (for_create) {
3567  return column_descriptors_for_create;
3568  }
3569 
3570  return column_descriptors;
3571 }
3572 
3574  const rapidjson::Value& payload) {
3575  CHECK(payload.HasMember("name"));
3576  table_name_ = json_str(payload["name"]);
3577 
3578  CHECK(payload.HasMember("query"));
3579  select_query_ = json_str(payload["query"]);
3580 
3581  boost::replace_all(select_query_, "\n", " ");
3582  select_query_ = "(" + select_query_ + ")";
3583 
3584  if (payload.HasMember("columns")) {
3585  CHECK(payload["columns"].IsArray());
3586  for (auto& column : payload["columns"].GetArray()) {
3587  std::string s = json_str(column);
3588  column_list_.emplace_back(std::unique_ptr<std::string>(new std::string(s)));
3589  }
3590  }
3591 }
3592 
3594  const TableDescriptor* td,
3595  bool validate_table,
3596  bool for_CTAS) {
3597  auto const session = query_state_proxy.getQueryState().getConstSessionInfo();
3598  auto& catalog = session->getCatalog();
3600 
3601  LocalQueryConnector local_connector;
3602  bool populate_table = false;
3603 
3604  if (leafs_connector_) {
3605  populate_table = true;
3606  } else {
3607  leafs_connector_ = &local_connector;
3608  if (!g_cluster) {
3609  populate_table = true;
3610  }
3611  }
3612 
3613  auto get_target_column_descriptors = [this, &catalog](const TableDescriptor* td) {
3614  std::vector<const ColumnDescriptor*> target_column_descriptors;
3615  if (column_list_.empty()) {
3616  auto list = catalog.getAllColumnMetadataForTable(td->tableId, false, false, false);
3617  target_column_descriptors = {std::begin(list), std::end(list)};
3618  } else {
3619  for (auto& c : column_list_) {
3620  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *c);
3621  if (cd == nullptr) {
3622  throw std::runtime_error("Column " + *c + " does not exist.");
3623  }
3624  target_column_descriptors.push_back(cd);
3625  }
3626  }
3627 
3628  return target_column_descriptors;
3629  };
3630 
3631  bool is_temporary = table_is_temporary(td);
3632 
3633  if (validate_table) {
3634  // check access privileges
3635  if (!td) {
3636  throw std::runtime_error("Table " + table_name_ + " does not exist.");
3637  }
3638  if (td->isView) {
3639  throw std::runtime_error("Insert to views is not supported yet.");
3640  }
3641 
3642  if (!session->checkDBAccessPrivileges(DBObjectType::TableDBObjectType,
3644  table_name_)) {
3645  throw std::runtime_error("User has no insert privileges on " + table_name_ + ".");
3646  }
3647 
3648  // only validate the select query so we get the target types
3649  // correctly, but do not populate the result set
3650  auto result = local_connector.query(query_state_proxy, select_query_, {}, true, true);
3651  auto source_column_descriptors = local_connector.getColumnDescriptors(result, false);
3652 
3653  std::vector<const ColumnDescriptor*> target_column_descriptors =
3654  get_target_column_descriptors(td);
3655 
3656  if (source_column_descriptors.size() != target_column_descriptors.size()) {
3657  throw std::runtime_error("The number of source and target columns does not match.");
3658  }
3659 
3660  for (int i = 0; i < source_column_descriptors.size(); i++) {
3661  const ColumnDescriptor* source_cd =
3662  &(*std::next(source_column_descriptors.begin(), i));
3663  const ColumnDescriptor* target_cd = target_column_descriptors.at(i);
3664 
3665  if (source_cd->columnType.get_type() != target_cd->columnType.get_type()) {
3666  auto type_cannot_be_cast = [](const auto& col_type) {
3667  return (col_type.is_time() || col_type.is_geometry() || col_type.is_array() ||
3668  col_type.is_boolean());
3669  };
3670 
3671  if (type_cannot_be_cast(source_cd->columnType) ||
3672  type_cannot_be_cast(target_cd->columnType)) {
3673  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3674  source_cd->columnType.get_type_name() +
3675  "' and target '" + target_cd->columnName + " " +
3676  target_cd->columnType.get_type_name() +
3677  "' column types do not match.");
3678  }
3679  }
3680  if (source_cd->columnType.is_array()) {
3681  if (source_cd->columnType.get_subtype() != target_cd->columnType.get_subtype()) {
3682  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3683  source_cd->columnType.get_type_name() +
3684  "' and target '" + target_cd->columnName + " " +
3685  target_cd->columnType.get_type_name() +
3686  "' array column element types do not match.");
3687  }
3688  }
3689 
3690  if (source_cd->columnType.is_decimal() ||
3691  source_cd->columnType.get_elem_type().is_decimal()) {
3692  SQLTypeInfo sourceType = source_cd->columnType;
3693  SQLTypeInfo targetType = target_cd->columnType;
3694 
3695  if (source_cd->columnType.is_array()) {
3696  sourceType = source_cd->columnType.get_elem_type();
3697  targetType = target_cd->columnType.get_elem_type();
3698  }
3699 
3700  if (sourceType.get_scale() != targetType.get_scale()) {
3701  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3702  source_cd->columnType.get_type_name() +
3703  "' and target '" + target_cd->columnName + " " +
3704  target_cd->columnType.get_type_name() +
3705  "' decimal columns scales do not match.");
3706  }
3707  }
3708 
3709  if (source_cd->columnType.is_string()) {
3710  if (!target_cd->columnType.is_string()) {
3711  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3712  source_cd->columnType.get_type_name() +
3713  "' and target '" + target_cd->columnName + " " +
3714  target_cd->columnType.get_type_name() +
3715  "' column types do not match.");
3716  }
3717  if (source_cd->columnType.get_compression() !=
3718  target_cd->columnType.get_compression()) {
3719  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3720  source_cd->columnType.get_type_name() +
3721  "' and target '" + target_cd->columnName + " " +
3722  target_cd->columnType.get_type_name() +
3723  "' columns string encodings do not match.");
3724  }
3725  }
3726 
3727  if (source_cd->columnType.is_timestamp() && target_cd->columnType.is_timestamp()) {
3728  if (source_cd->columnType.get_dimension() !=
3729  target_cd->columnType.get_dimension()) {
3730  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3731  source_cd->columnType.get_type_name() +
3732  "' and target '" + target_cd->columnName + " " +
3733  target_cd->columnType.get_type_name() +
3734  "' timestamp column precisions do not match.");
3735  }
3736  }
3737 
3738  if (!source_cd->columnType.is_string() && !source_cd->columnType.is_geometry() &&
3739  !source_cd->columnType.is_integer() && !source_cd->columnType.is_decimal() &&
3740  !source_cd->columnType.is_date() && !source_cd->columnType.is_time() &&
3741  !source_cd->columnType.is_timestamp() &&
3742  source_cd->columnType.get_size() > target_cd->columnType.get_size()) {
3743  throw std::runtime_error("Source '" + source_cd->columnName + " " +
3744  source_cd->columnType.get_type_name() +
3745  "' and target '" + target_cd->columnName + " " +
3746  target_cd->columnType.get_type_name() +
3747  "' column encoding sizes do not match.");
3748  }
3749  }
3750  }
3751 
3752  if (!populate_table) {
3753  return;
3754  }
3755 
3756  int64_t total_row_count = 0;
3757  int64_t total_source_query_time_ms = 0;
3758  int64_t total_target_value_translate_time_ms = 0;
3759  int64_t total_data_load_time_ms = 0;
3760 
3762  auto target_column_descriptors = get_target_column_descriptors(td);
3763  auto outer_frag_count =
3765 
3766  size_t outer_frag_end = outer_frag_count == 0 ? 1 : outer_frag_count;
3767  auto query_session = session ? session->get_session_id() : "";
3769  std::string work_type_str = for_CTAS ? "CTAS" : "ITAS";
3770  try {
3771  for (size_t outer_frag_idx = 0; outer_frag_idx < outer_frag_end; outer_frag_idx++) {
3772  std::vector<size_t> allowed_outer_fragment_indices;
3773 
3774  if (outer_frag_count) {
3775  allowed_outer_fragment_indices.push_back(outer_frag_idx);
3776  }
3777 
3778  const auto query_clock_begin = timer_start();
3779  std::vector<AggregatedResult> query_results =
3780  leafs_connector_->query(query_state_proxy,
3781  select_query_,
3782  allowed_outer_fragment_indices,
3784  total_source_query_time_ms += timer_stop(query_clock_begin);
3785 
3786  auto start_time = query_state_proxy.getQueryState().getQuerySubmittedTime();
3787  auto query_str = "INSERT_DATA for " + work_type_str;
3789  // In the clean-up phase of the query execution for collecting aggregated result
3790  // of SELECT query, we remove its query session info, so we need to enroll the
3791  // session info again
3792  executor->enrollQuerySession(query_session,
3793  query_str,
3794  start_time,
3796  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3797  }
3798 
3799  ScopeGuard clearInterruptStatus = [executor, &query_session, &start_time] {
3800  // this data population is non-kernel operation, so we manually cleanup
3801  // the query session info in the cleanup phase
3803  executor->clearQuerySessionStatus(query_session, start_time);
3804  }
3805  };
3806 
3807  for (auto& res : query_results) {
3808  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3809  throw std::runtime_error(
3810  "Query execution has been interrupted while performing " + work_type_str);
3811  }
3812  auto& result_rows = res.rs;
3813  result_rows->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
3814  const auto num_rows = result_rows->rowCount();
3815 
3816  if (0 == num_rows) {
3817  continue;
3818  }
3819 
3820  total_row_count += num_rows;
3821 
3822  size_t leaf_count = leafs_connector_->leafCount();
3823 
3824  // ensure that at least 1 row is processed per block up to a maximum of 65536 rows
3825  const size_t rows_per_block =
3826  std::max(std::min(num_rows / leaf_count, size_t(64 * 1024)), size_t(1));
3827 
3828  std::vector<std::unique_ptr<TargetValueConverter>> value_converters;
3829 
3831 
3832  const int num_worker_threads = std::thread::hardware_concurrency();
3833 
3834  std::vector<size_t> thread_start_idx(num_worker_threads),
3835  thread_end_idx(num_worker_threads);
3836  bool can_go_parallel = !result_rows->isTruncated() && rows_per_block > 20000;
3837 
3838  std::atomic<size_t> crt_row_idx{0};
3839 
3840  auto do_work = [&result_rows, &value_converters, &crt_row_idx](
3841  const size_t idx,
3842  const size_t block_end,
3843  const size_t num_cols,
3844  const size_t thread_id,
3845  bool& stop_convert) {
3846  const auto result_row = result_rows->getRowAtNoTranslations(idx);
3847  if (!result_row.empty()) {
3848  size_t target_row = crt_row_idx.fetch_add(1);
3849  if (target_row >= block_end) {
3850  stop_convert = true;
3851  return;
3852  }
3853  for (unsigned int col = 0; col < num_cols; col++) {
3854  const auto& mapd_variant = result_row[col];
3855  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3856  }
3857  }
3858  };
3859 
3860  auto convert_function = [&thread_start_idx,
3861  &thread_end_idx,
3862  &value_converters,
3863  &executor,
3864  &query_session,
3865  &work_type_str,
3866  &do_work](const int thread_id, const size_t block_end) {
3867  const int num_cols = value_converters.size();
3868  const size_t start = thread_start_idx[thread_id];
3869  const size_t end = thread_end_idx[thread_id];
3870  size_t idx = 0;
3871  bool stop_convert = false;
3873  size_t local_idx = 0;
3874  for (idx = start; idx < end; ++idx, ++local_idx) {
3875  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3876  check_session_interrupted(query_session, executor))) {
3877  throw std::runtime_error(
3878  "Query execution has been interrupted while performing " +
3879  work_type_str);
3880  }
3881  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3882  if (stop_convert) {
3883  break;
3884  }
3885  }
3886  } else {
3887  for (idx = start; idx < end; ++idx) {
3888  do_work(idx, block_end, num_cols, thread_id, stop_convert);
3889  if (stop_convert) {
3890  break;
3891  }
3892  }
3893  }
3894  thread_start_idx[thread_id] = idx;
3895  };
3896 
3897  auto single_threaded_value_converter =
3898  [&crt_row_idx, &value_converters, &result_rows](const size_t idx,
3899  const size_t block_end,
3900  const size_t num_cols,
3901  bool& stop_convert) {
3902  size_t target_row = crt_row_idx.fetch_add(1);
3903  if (target_row >= block_end) {
3904  stop_convert = true;
3905  return;
3906  }
3907  const auto result_row = result_rows->getNextRow(false, false);
3908  CHECK(!result_row.empty());
3909  for (unsigned int col = 0; col < num_cols; col++) {
3910  const auto& mapd_variant = result_row[col];
3911  value_converters[col]->convertToColumnarFormat(target_row, &mapd_variant);
3912  }
3913  };
3914 
3915  auto single_threaded_convert_function = [&value_converters,
3916  &thread_start_idx,
3917  &thread_end_idx,
3918  &executor,
3919  &query_session,
3920  &work_type_str,
3921  &single_threaded_value_converter](
3922  const int thread_id,
3923  const size_t block_end) {
3924  const int num_cols = value_converters.size();
3925  const size_t start = thread_start_idx[thread_id];
3926  const size_t end = thread_end_idx[thread_id];
3927  size_t idx = 0;
3928  bool stop_convert = false;
3930  size_t local_idx = 0;
3931  for (idx = start; idx < end; ++idx, ++local_idx) {
3932  if (UNLIKELY((local_idx & 0xFFFF) == 0 &&
3933  check_session_interrupted(query_session, executor))) {
3934  throw std::runtime_error(
3935  "Query execution has been interrupted while performing " +
3936  work_type_str);
3937  }
3938  single_threaded_value_converter(idx, block_end, num_cols, stop_convert);
3939  if (stop_convert) {
3940  break;
3941  }
3942  }
3943  } else {
3944  for (idx = start; idx < end; ++idx) {
3945  single_threaded_value_converter(idx, end, num_cols, stop_convert);
3946  if (stop_convert) {
3947  break;
3948  }
3949  }
3950  }
3951  thread_start_idx[thread_id] = idx;
3952  };
3953 
3954  if (can_go_parallel) {
3955  const size_t entry_count = result_rows->entryCount();
3956  for (size_t
3957  i = 0,
3958  start_entry = 0,
3959  stride = (entry_count + num_worker_threads - 1) / num_worker_threads;
3960  i < num_worker_threads && start_entry < entry_count;
3961  ++i, start_entry += stride) {
3962  const auto end_entry = std::min(start_entry + stride, entry_count);
3963  thread_start_idx[i] = start_entry;
3964  thread_end_idx[i] = end_entry;
3965  }
3966  } else {
3967  thread_start_idx[0] = 0;
3968  thread_end_idx[0] = result_rows->entryCount();
3969  }
3970 
3971  RenderGroupAnalyzerMap render_group_analyzer_map;
3972 
3973  for (size_t block_start = 0; block_start < num_rows;
3974  block_start += rows_per_block) {
3975  const auto num_rows_this_itr = block_start + rows_per_block < num_rows
3976  ? rows_per_block
3977  : num_rows - block_start;
3978  crt_row_idx = 0; // reset block tracker
3979  value_converters.clear();
3980  int colNum = 0;
3981  for (const auto targetDescriptor : target_column_descriptors) {
3982  auto sourceDataMetaInfo = res.targets_meta[colNum++];
3984  num_rows_this_itr,
3985  catalog,
3986  sourceDataMetaInfo,
3987  targetDescriptor,
3988  targetDescriptor->columnType,
3989  !targetDescriptor->columnType.get_notnull(),
3990  result_rows->getRowSetMemOwner()->getLiteralStringDictProxy(),
3992  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
3993  ? executor->getStringDictionaryProxy(
3994  sourceDataMetaInfo.get_type_info().get_comp_param(),
3995  result_rows->getRowSetMemOwner(),
3996  true)
3997  : nullptr,
3998  IS_GEO_POLY(targetDescriptor->columnType.get_type()) &&
4000  ? &render_group_analyzer_map
4001  : nullptr};
4002  auto converter = factory.create(param);
4003  value_converters.push_back(std::move(converter));
4004  }
4005 
4006  const auto translate_clock_begin = timer_start();
4007  if (can_go_parallel) {
4008  std::vector<std::future<void>> worker_threads;
4009  for (int i = 0; i < num_worker_threads; ++i) {
4010  worker_threads.push_back(
4011  std::async(std::launch::async, convert_function, i, num_rows_this_itr));
4012  }
4013 
4014  for (auto& child : worker_threads) {
4015  child.wait();
4016  }
4017  for (auto& child : worker_threads) {
4018  child.get();
4019  }
4020 
4021  } else {
4022  single_threaded_convert_function(0, num_rows_this_itr);
4023  }
4024 
4025  // finalize the insert data
4026  auto finalizer_func =
4027  [](std::unique_ptr<TargetValueConverter>::pointer targetValueConverter) {
4028  targetValueConverter->finalizeDataBlocksForInsertData();
4029  };
4030 
4031  std::vector<std::future<void>> worker_threads;
4032  for (auto& converterPtr : value_converters) {
4033  worker_threads.push_back(
4034  std::async(std::launch::async, finalizer_func, converterPtr.get()));
4035  }
4036 
4037  for (auto& child : worker_threads) {
4038  child.wait();
4039  }
4040  for (auto& child : worker_threads) {
4041  child.get();
4042  }
4043 
4045  insert_data.databaseId = catalog.getCurrentDB().dbId;
4046  CHECK(td);
4047  insert_data.tableId = td->tableId;
4048  insert_data.numRows = num_rows_this_itr;
4049 
4050  for (int col_idx = 0; col_idx < target_column_descriptors.size(); col_idx++) {
4052  check_session_interrupted(query_session, executor))) {
4053  throw std::runtime_error(
4054  "Query execution has been interrupted while performing " +
4055  work_type_str);
4056  }
4057  value_converters[col_idx]->addDataBlocksToInsertData(insert_data);
4058  }
4059  total_target_value_translate_time_ms += timer_stop(translate_clock_begin);
4060 
4061  const auto data_load_clock_begin = timer_start();
4062  auto data_memory_holder =
4063  import_export::fill_missing_columns(&catalog, insert_data);
4064  insertDataLoader.insertData(*session, insert_data);
4065  total_data_load_time_ms += timer_stop(data_load_clock_begin);
4066  }
4067  }
4068  }
4069  } catch (...) {
4070  try {
4071  leafs_connector_->rollback(*session, td->tableId);
4072  } catch (std::exception& e) {
4073  LOG(ERROR) << "An error occurred during ITAS rollback attempt. Table id: "
4074  << td->tableId << ", Error: " << e.what();
4075  }
4076  throw;
4077  }
4078 
4079  int64_t total_time_ms = total_source_query_time_ms +
4080  total_target_value_translate_time_ms + total_data_load_time_ms;
4081 
4082  VLOG(1) << "CTAS/ITAS " << total_row_count << " rows loaded in " << total_time_ms
4083  << "ms (outer_frag_count=" << outer_frag_count
4084  << ", query_time=" << total_source_query_time_ms
4085  << "ms, translation_time=" << total_target_value_translate_time_ms
4086  << "ms, data_load_time=" << total_data_load_time_ms
4087  << "ms)\nquery: " << select_query_;
4088 
4089  if (!is_temporary) {
4090  leafs_connector_->checkpoint(*session, td->tableId);
4091  }
4092 }
4093 
4094 namespace {
4096  const std::string& table_name) {
4097  auto table_id = catalog.getTableId(table_name);
4098  if (!table_id.has_value()) {
4099  throw std::runtime_error{"Table \"" + table_name + "\" does not exist."};
4100  }
4101  return table_id.value();
4102 }
4103 
4105  Catalog_Namespace::Catalog& catalog,
4106  const std::string& query_str,
4107  const QueryStateProxy& query_state_proxy,
4108  const std::optional<std::string>& insert_table_name = {}) {
4109  auto calcite_mgr = catalog.getCalciteMgr();
4110  const auto calciteQueryParsingOption =
4111  calcite_mgr->getCalciteQueryParsingOption(true, false, true);
4112  const auto calciteOptimizationOption =
4113  calcite_mgr->getCalciteOptimizationOption(false, g_enable_watchdog, {});
4114  const auto result = calcite_mgr->process(query_state_proxy,
4115  pg_shim(query_str),
4116  calciteQueryParsingOption,
4117  calciteOptimizationOption);
4118  // force sort into tableid order in case of name change to guarantee fixed order of
4119  // mutex access
4120  auto comparator = [&catalog](const std::string& table_1, const std::string& table_2) {
4121  return get_table_id(catalog, table_1) < get_table_id(catalog, table_2);
4122  };
4123  std::set<std::string, decltype(comparator)> tables(comparator);
4124  for (auto& tab : result.resolved_accessed_objects.tables_selected_from) {
4125  tables.emplace(tab[0]);
4126  }
4127  if (insert_table_name.has_value()) {
4128  tables.emplace(insert_table_name.value());
4129  }
4131  for (const auto& table : tables) {
4132  locks.emplace_back(
4135  catalog, table)));
4136  if (insert_table_name.has_value() && table == insert_table_name.value()) {
4137  locks.emplace_back(
4140  catalog.getDatabaseId(), (*locks.back())())));
4141  } else {
4142  locks.emplace_back(
4145  catalog.getDatabaseId(), (*locks.back())())));
4146  }
4147  }
4148  return locks;
4149 }
4150 } // namespace
4151 
4153  bool read_only_mode) {
4154  if (read_only_mode) {
4155  throw std::runtime_error("INSERT INTO TABLE invalid in read only mode.");
4156  }
4157  auto session_copy = session;
4158  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4159  &session_copy, boost::null_deleter());
4160  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4161  auto stdlog = STDLOG(query_state);
4162  auto& catalog = session_ptr->getCatalog();
4163 
4164  const auto execute_read_lock =
4168 
4169  if (catalog.getMetadataForTable(table_name_) == nullptr) {
4170  throw std::runtime_error("ITAS failed: table " + table_name_ + " does not exist.");
4171  }
4172 
4173  auto locks = acquire_query_table_locks(
4174  catalog, select_query_, query_state->createQueryStateProxy(), table_name_);
4175  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4176 
4177  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
4178 
4179  try {
4180  populateData(query_state->createQueryStateProxy(), td, true, false);
4181  } catch (...) {
4182  throw;
4183  }
4184 }
4185 
4187  : InsertIntoTableAsSelectStmt(payload) {
4188  if (payload.HasMember("temporary")) {
4189  is_temporary_ = json_bool(payload["temporary"]);
4190  } else {
4191  is_temporary_ = false;
4192  }
4193 
4194  if (payload.HasMember("ifNotExists")) {
4195  if_not_exists_ = json_bool(payload["ifNotExists"]);
4196  } else {
4197  if_not_exists_ = false;
4198  }
4199 
4200  parse_options(payload, storage_options_);
4201 }
4202 
4204  bool read_only_mode) {
4205  if (read_only_mode) {
4206  throw std::runtime_error("CREATE TABLE invalid in read only mode.");
4207  }
4208  auto session_copy = session;
4209  auto session_ptr = std::shared_ptr<Catalog_Namespace::SessionInfo>(
4210  &session_copy, boost::null_deleter());
4211  auto query_state = query_state::QueryState::create(session_ptr, select_query_);
4212  auto stdlog = STDLOG(query_state);
4213  LocalQueryConnector local_connector;
4214  auto& catalog = session.getCatalog();
4215  bool create_table = nullptr == leafs_connector_;
4216 
4217  std::set<std::string> select_tables;
4218  if (create_table) {
4219  const auto execute_write_lock =
4223 
4224  // check access privileges
4227  throw std::runtime_error("CTAS failed. Table " + table_name_ +
4228  " will not be created. User has no create privileges.");
4229  }
4230 
4231  if (catalog.getMetadataForTable(table_name_) != nullptr) {
4232  if (if_not_exists_) {
4233  return;
4234  }
4235  throw std::runtime_error("Table " + table_name_ +
4236  " already exists and no data was loaded.");
4237  }
4238 
4239  // only validate the select query so we get the target types
4240  // correctly, but do not populate the result set
4241  // we currently have exclusive access to the system so this is safe
4242  auto validate_result = local_connector.query(
4243  query_state->createQueryStateProxy(), select_query_, {}, true, false);
4244 
4245  const auto column_descriptors_for_create =
4246  local_connector.getColumnDescriptors(validate_result, true);
4247 
4248  // some validation as the QE might return some out of range column types
4249  for (auto& cd : column_descriptors_for_create) {
4250  if (cd.columnType.is_decimal() && cd.columnType.get_precision() > 18) {
4251  throw std::runtime_error(cd.columnName + ": Precision too high, max 18.");
4252  }
4253  }
4254 
4255  TableDescriptor td;
4256  td.tableName = table_name_;
4257  td.userId = session.get_currentUser().userId;
4258  td.nColumns = column_descriptors_for_create.size();
4259  td.isView = false;
4260  td.fragmenter = nullptr;
4267  if (is_temporary_) {
4269  } else {
4271  }
4272 
4273  bool use_shared_dictionaries = true;
4274 
4275  if (!storage_options_.empty()) {
4276  for (auto& p : storage_options_) {
4277  if (boost::to_lower_copy<std::string>(*p->get_name()) ==
4278  "use_shared_dictionaries") {
4279  const StringLiteral* literal =
4280  dynamic_cast<const StringLiteral*>(p->get_value());
4281  if (nullptr == literal) {
4282  throw std::runtime_error(
4283  "USE_SHARED_DICTIONARIES must be a string parameter");
4284  }
4285  std::string val = boost::to_lower_copy<std::string>(*literal->get_stringval());
4286  use_shared_dictionaries = val == "true" || val == "1" || val == "t";
4287  } else {
4288  get_table_definitions_for_ctas(td, p, column_descriptors_for_create);
4289  }
4290  }
4291  }
4292 
4293  std::vector<SharedDictionaryDef> sharedDictionaryRefs;
4294 
4295  if (use_shared_dictionaries) {
4296  const auto source_column_descriptors =
4297  local_connector.getColumnDescriptors(validate_result, false);
4298  const auto mapping = catalog.getDictionaryToColumnMapping();
4299 
4300  for (auto& source_cd : source_column_descriptors) {
4301  const auto& ti = source_cd.columnType;
4302  if (ti.is_string()) {
4303  if (ti.get_compression() == kENCODING_DICT) {
4304  int dict_id = ti.get_comp_param();
4305  auto it = mapping.find(dict_id);
4306  if (mapping.end() != it) {
4307  const auto targetColumn = it->second;
4308  auto targetTable =
4309  catalog.getMetadataForTable(targetColumn->tableId, false);
4310  CHECK(targetTable);
4311  LOG(INFO) << "CTAS: sharing text dictionary on column "
4312  << source_cd.columnName << " with " << targetTable->tableName
4313  << "." << targetColumn->columnName;
4314  sharedDictionaryRefs.emplace_back(
4315  source_cd.columnName, targetTable->tableName, targetColumn->columnName);
4316  }
4317  }
4318  }
4319  }
4320  }
4321 
4322  // currently no means of defining sharding in CTAS
4323  td.keyMetainfo = serialize_key_metainfo(nullptr, sharedDictionaryRefs);
4324 
4325  catalog.createTable(td, column_descriptors_for_create, sharedDictionaryRefs, true);
4326  // TODO (max): It's transactionally unsafe, should be fixed: we may create object
4327  // w/o privileges
4328  SysCatalog::instance().createDBObject(
4329  session.get_currentUser(), td.tableName, TableDBObjectType, catalog);
4330  }
4331 
4332  // note there is a time where we do not have any executor outer lock here. someone could
4333  // come along and mess with the data or other tables.
4334  const auto execute_read_lock =
4338 
4339  auto locks = acquire_query_table_locks(
4340  catalog, select_query_, query_state->createQueryStateProxy(), table_name_);
4341  const TableDescriptor* td = catalog.getMetadataForTable(table_name_);
4342  try {
4343  populateData(query_state->createQueryStateProxy(), td, false, true);
4344  } catch (...) {
4345  if (!g_cluster) {
4346  const TableDescriptor* created_td = catalog.getMetadataForTable(table_name_);
4347  if (created_td) {
4348  catalog.dropTable(created_td);
4349  }
4350  }
4351  throw;
4352  }
4353 }
4354 
4355 DropTableStmt::DropTableStmt(const rapidjson::Value& payload) {
4356  CHECK(payload.HasMember("tableName"));
4357  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
4358 
4359  if_exists_ = false;
4360  if (payload.HasMember("ifExists")) {
4361  if_exists_ = json_bool(payload["ifExists"]);
4362  }
4363 }
4364 
4366  bool read_only_mode) {
4367  if (read_only_mode) {
4368  throw std::runtime_error("DROP TABLE invalid in read only mode.");
4369  }
4370  const auto execute_read_lock =
4374  auto& catalog = session.getCatalog();
4375  const TableDescriptor* td{nullptr};
4376  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>> td_with_lock;
4377  try {
4378  td_with_lock =
4379  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
4381  catalog, *table_, false));
4382  td = (*td_with_lock)();
4383  } catch (const std::runtime_error& e) {
4384  if (if_exists_) {
4385  return;
4386  } else {
4387  throw e;
4388  }
4389  }
4390 
4391  CHECK(td);
4392  CHECK(td_with_lock);
4393 
4394  // check access privileges
4395  if (!session.checkDBAccessPrivileges(
4397  throw std::runtime_error("Table " + *table_ +
4398  " will not be dropped. User has no proper privileges.");
4399  }
4400 
4402 
4403  {
4404  auto table_data_read_lock =
4406  Executor::clearExternalCaches(false, td, catalog.getCurrentDB().dbId);
4407  }
4408 
4409  auto table_data_write_lock =
4411  catalog.dropTable(td);
4412 }
4413 
4415  bool read_only_mode) {}
4416 
4417 std::unique_ptr<DDLStmt> AlterTableStmt::delegate(const rapidjson::Value& payload) {
4418  CHECK(payload.HasMember("tableName"));
4419  auto tableName = json_str(payload["tableName"]);
4420 
4421  CHECK(payload.HasMember("alterType"));
4422  auto type = json_str(payload["alterType"]);
4423 
4424  if (type == "RENAME_TABLE") {
4425  CHECK(payload.HasMember("newTableName"));
4426  auto newTableName = json_str(payload["newTableName"]);
4427  return std::unique_ptr<DDLStmt>(new Parser::RenameTableStmt(
4428  new std::string(tableName), new std::string(newTableName)));
4429 
4430  } else if (type == "RENAME_COLUMN") {
4431  CHECK(payload.HasMember("columnName"));
4432  auto columnName = json_str(payload["columnName"]);
4433  CHECK(payload.HasMember("newColumnName"));
4434  auto newColumnName = json_str(payload["newColumnName"]);
4435  return std::unique_ptr<DDLStmt>(
4436  new Parser::RenameColumnStmt(new std::string(tableName),
4437  new std::string(columnName),
4438  new std::string(newColumnName)));
4439 
4440  } else if (type == "ADD_COLUMN") {
4441  CHECK(payload.HasMember("columnData"));
4442  CHECK(payload["columnData"].IsArray());
4443 
4444  // New Columns go into this list
4445  std::list<ColumnDef*>* table_element_list_ = new std::list<ColumnDef*>;
4446 
4447  const auto elements = payload["columnData"].GetArray();
4448  for (const auto& element : elements) {
4449  CHECK(element.IsObject());
4450  CHECK(element.HasMember("type"));
4451  if (json_str(element["type"]) == "SQL_COLUMN_DECLARATION") {
4452  auto col_def = column_from_json(element);
4453  table_element_list_->emplace_back(col_def.release());
4454  } else {
4455  LOG(FATAL) << "Unsupported element type for ALTER TABLE: "
4456  << element["type"].GetString();
4457  }
4458  }
4459 
4460  return std::unique_ptr<DDLStmt>(
4461  new Parser::AddColumnStmt(new std::string(tableName), table_element_list_));
4462 
4463  } else if (type == "DROP_COLUMN") {
4464  CHECK(payload.HasMember("columnData"));
4465  auto columnData = json_str(payload["columnData"]);
4466  // Convert columnData to std::list<std::string*>*
4467  // allocate std::list<> as DropColumnStmt will delete it;
4468  std::list<std::string*>* cols = new std::list<std::string*>;
4469  std::vector<std::string> cols1;
4470  boost::split(cols1, columnData, boost::is_any_of(","));
4471  for (auto s : cols1) {
4472  // strip leading/trailing spaces/quotes/single quotes
4473  boost::algorithm::trim_if(s, boost::is_any_of(" \"'`"));
4474  std::string* str = new std::string(s);
4475  cols->emplace_back(str);
4476  }
4477 
4478  return std::unique_ptr<DDLStmt>(
4479  new Parser::DropColumnStmt(new std::string(tableName), cols));
4480 
4481  } else if (type == "ALTER_OPTIONS") {
4482  CHECK(payload.HasMember("options"));
4483 
4484  if (payload["options"].IsObject()) {
4485  for (const auto& option : payload["options"].GetObject()) {
4486  std::string* option_name = new std::string(json_str(option.name));
4487  Literal* literal_value;
4488  if (option.value.IsString()) {
4489  std::string literal_string = json_str(option.value);
4490 
4491  // iff this string can be converted to INT
4492  // ... do so because it is necessary for AlterTableParamStmt
4493  std::size_t sz;
4494  int iVal = std::stoi(literal_string, &sz);
4495  if (sz == literal_string.size()) {
4496  literal_value = new IntLiteral(iVal);
4497  } else {
4498  literal_value = new StringLiteral(&literal_string);
4499  }
4500  } else if (option.value.IsInt() || option.value.IsInt64()) {
4501  literal_value = new IntLiteral(json_i64(option.value));
4502  } else if (option.value.IsNull()) {
4503  literal_value = new NullLiteral();
4504  } else {
4505  throw std::runtime_error("Unable to handle literal for " + *option_name);
4506  }
4507  CHECK(literal_value);
4508 
4509  NameValueAssign* nv = new NameValueAssign(option_name, literal_value);
4510  return std::unique_ptr<DDLStmt>(
4511  new Parser::AlterTableParamStmt(new std::string(tableName), nv));
4512  }
4513 
4514  } else {
4515  CHECK(payload["options"].IsNull());
4516  }
4517  }
4518  return nullptr;
4519 }
4520 
4521 TruncateTableStmt::TruncateTableStmt(const rapidjson::Value& payload) {
4522  CHECK(payload.HasMember("tableName"));
4523  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
4524 }
4525 
4527  bool read_only_mode) {
4528  if (read_only_mode) {
4529  throw std::runtime_error("TRUNCATE TABLE invalid in read only mode.");
4530  }
4531  const auto execute_read_lock =
4535  auto& catalog = session.getCatalog();
4536  const auto td_with_lock =
4538  catalog, *table_, true);
4539  const auto td = td_with_lock();
4540  if (!td) {
4541  throw std::runtime_error("Table " + *table_ + " does not exist.");
4542  }
4543 
4544  // check access privileges
4545  std::vector<DBObject> privObjects;
4546  DBObject dbObject(*table_, TableDBObjectType);
4547  dbObject.loadKey(catalog);
4549  privObjects.push_back(dbObject);
4550  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
4551  throw std::runtime_error("Table " + *table_ + " will not be truncated. User " +
4552  session.get_currentUser().userLoggable() +
4553  " has no proper privileges.");
4554  }
4555 
4556  if (td->isView) {
4557  throw std::runtime_error(*table_ + " is a view. Cannot Truncate.");
4558  }
4560 
4561  // invalidate cached item
4562  {
4563  auto table_data_read_lock =
4565  Executor::clearExternalCaches(false, td, catalog.getCurrentDB().dbId);
4566  }
4567 
4568  auto table_data_write_lock =
4570  catalog.truncateTable(td);
4571 }
4572 
4573 OptimizeTableStmt::OptimizeTableStmt(const rapidjson::Value& payload) {
4574  CHECK(payload.HasMember("tableName"));
4575  table_ = std::make_unique<std::string>(json_str(payload["tableName"]));
4576  parse_options(payload, options_);
4577 }
4578 
4579 namespace {
4581  const TableDescriptor* td,
4582  const AccessPrivileges access_priv) {
4583  CHECK(td);
4584  auto& cat = session_info.getCatalog();
4585  std::vector<DBObject> privObjects;
4586  DBObject dbObject(td->tableName, TableDBObjectType);
4587  dbObject.loadKey(cat);
4588  dbObject.setPrivileges(access_priv);
4589  privObjects.push_back(dbObject);
4590  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
4591  privObjects);
4592 };
4593 } // namespace
4594 
4596  bool read_only_mode) {
4597  if (read_only_mode) {
4598  throw std::runtime_error("OPTIMIZE TABLE invalid in read only mode.");
4599  }
4600  auto& catalog = session.getCatalog();
4601 
4602  const auto execute_read_lock =
4606 
4607  const auto td_with_lock =
4609  catalog, *table_);
4610  const auto td = td_with_lock();
4611 
4612  if (!td || !user_can_access_table(session, td, AccessPrivileges::DELETE_FROM_TABLE)) {
4613  throw std::runtime_error("Table " + *table_ + " does not exist.");
4614  }
4615 
4616  if (td->isView) {
4617  throw std::runtime_error("OPTIMIZE TABLE command is not supported on views.");
4618  }
4619 
4620  // invalidate cached item
4621  std::vector<int> table_key{catalog.getCurrentDB().dbId, td->tableId};
4622  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
4623 
4625  const TableOptimizer optimizer(td, executor, catalog);
4626  if (shouldVacuumDeletedRows()) {
4627  optimizer.vacuumDeletedRows();
4628  }
4629  optimizer.recomputeMetadata();
4630 }
4631 
4632 bool repair_type(std::list<std::unique_ptr<NameValueAssign>>& options) {
4633  for (const auto& opt : options) {
4634  if (boost::iequals(*opt->get_name(), "REPAIR_TYPE")) {
4635  const auto repair_type =
4636  static_cast<const StringLiteral*>(opt->get_value())->get_stringval();
4637  CHECK(repair_type);
4638  if (boost::iequals(*repair_type, "REMOVE")) {
4639  return true;
4640  } else {
4641  throw std::runtime_error("REPAIR_TYPE must be REMOVE.");
4642  }
4643  } else {
4644  throw std::runtime_error("The only VALIDATE WITH options is REPAIR_TYPE.");
4645  }
4646  }
4647  return false;
4648 }
4649 
4650 ValidateStmt::ValidateStmt(std::string* type, std::list<NameValueAssign*>* with_opts)
4651  : type_(type) {
4652  if (!type) {
4653  throw std::runtime_error("Validation Type is required for VALIDATE command.");
4654  }
4655  std::list<std::unique_ptr<NameValueAssign>> options;
4656  if (with_opts) {
4657  for (const auto e : *with_opts) {
4658  options.emplace_back(e);
4659  }
4660  delete with_opts;
4661 
4662  isRepairTypeRemove_ = repair_type(options);
4663  }
4664 }
4665 
4666 ValidateStmt::ValidateStmt(const rapidjson::Value& payload) {
4667  CHECK(payload.HasMember("type"));
4668  type_ = std::make_unique<std::string>(json_str(payload["type"]));
4669 
4670  std::list<std::unique_ptr<NameValueAssign>> options;
4671  parse_options(payload, options);
4672 
4673  isRepairTypeRemove_ = repair_type(options);
4674 }
4675 
4677  const TableDescriptor* td) {
4678  if (session.get_currentUser().isSuper ||
4679  session.get_currentUser().userId == td->userId) {
4680  return;
4681  }
4682  std::vector<DBObject> privObjects;
4683  DBObject dbObject(td->tableName, TableDBObjectType);
4684  dbObject.loadKey(session.getCatalog());
4686  privObjects.push_back(dbObject);
4687  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
4688  throw std::runtime_error("Current user does not have the privilege to alter table: " +
4689  td->tableName);
4690  }
4691 }
4692 
4693 RenameUserStmt::RenameUserStmt(const rapidjson::Value& payload) {
4694  CHECK(payload.HasMember("name"));
4695  username_ = std::make_unique<std::string>(json_str(payload["name"]));
4696  CHECK(payload.HasMember("newName"));
4697  new_username_ = std::make_unique<std::string>(json_str(payload["newName"]));
4698 }
4699 
4701  bool read_only_mode) {
4702  if (read_only_mode) {
4703  throw std::runtime_error("RENAME TABLE invalid in read only mode.");
4704  }
4705  if (!session.get_currentUser().isSuper) {
4706  throw std::runtime_error("Only a super user can rename users.");
4707  }
4708 
4710  if (!SysCatalog::instance().getMetadataForUser(*username_, user)) {
4711  throw std::runtime_error("User " + *username_ + " does not exist.");
4712  }
4713 
4714  SysCatalog::instance().renameUser(*username_, *new_username_);
4715 }
4716 
4717 RenameDBStmt::RenameDBStmt(const rapidjson::Value& payload) {
4718  CHECK(payload.HasMember("name"));
4719  database_name_ = std::make_unique<std::string>(json_str(payload["name"]));
4720  CHECK(payload.HasMember("newName"));
4721  new_database_name_ = std::make_unique<std::string>(json_str(payload["newName"]));
4722 }
4723 
4725  bool read_only_mode) {
4726  if (read_only_mode) {
4727  throw std::runtime_error("RENAME DATABASE invalid in read only mode.");
4728  }
4730 
4731  // TODO: use database lock instead
4732  const auto execute_write_lock =
4736 
4737  if (!SysCatalog::instance().getMetadataForDB(*database_name_, db)) {
4738  throw std::runtime_error("Database " + *database_name_ + " does not exist.");
4739  }
4740 
4741  if (!session.get_currentUser().isSuper &&
4742  session.get_currentUser().userId != db.dbOwner) {
4743  throw std::runtime_error("Only a super user or the owner can rename the database.");
4744  }
4745 
4746  SysCatalog::instance().renameDatabase(*database_name_, *new_database_name_);
4747 }
4748 
4749 RenameTableStmt::RenameTableStmt(const rapidjson::Value& payload) {
4750  CHECK(payload.HasMember("tableNames"));
4751  CHECK(payload["tableNames"].IsArray());
4752  const auto elements = payload["tableNames"].GetArray();
4753  for (const auto& element : elements) {
4754  CHECK(element.HasMember("name"));
4755  CHECK(element.HasMember("newName"));
4756  tablesToRename_.emplace_back(new std::string(json_str(element["name"])),
4757  new std::string(json_str(element["newName"])));
4758  }
4759 }
4760 
4761 RenameTableStmt::RenameTableStmt(std::string* tab_name, std::string* new_tab_name) {
4762  tablesToRename_.emplace_back(tab_name, new_tab_name);
4763 }
4764 
4766  std::list<std::pair<std::string, std::string>> tableNames) {
4767  for (auto item : tableNames) {
4768  tablesToRename_.emplace_back(new std::string(item.first),
4769  new std::string(item.second));
4770  }
4771 }
4772 
4773 using SubstituteMap = std::map<std::string, std::string>;
4774 
4775 // Namespace fns used to track a left-to-right execution of RENAME TABLE
4776 // and verify that the command should be (entirely/mostly) valid
4777 //
4778 namespace {
4779 
4780 static constexpr char const* EMPTY_NAME{""};
4781 
4782 std::string generateUniqueTableName(std::string name) {
4783  // TODO - is there a "better" way to create a tmp name for the table
4784  std::time_t result = std::time(nullptr);
4785  return name + "_tmp" + std::to_string(result);
4786 }
4787 
4788 void recordRename(SubstituteMap& sMap, std::string oldName, std::string newName) {
4789  sMap[oldName] = newName;
4790 }
4791 
4793  SubstituteMap& sMap,
4794  std::string tableName) {
4795  if (sMap.find(tableName) != sMap.end()) {
4796  if (sMap[tableName] == EMPTY_NAME) {
4797  return tableName;
4798  }
4799  return sMap[tableName];
4800  } else {
4801  // lookup table in src catalog
4802  const TableDescriptor* td = catalog.getMetadataForTable(tableName);
4803  if (td) {
4804  sMap[tableName] = tableName;
4805  } else {
4806  sMap[tableName] = EMPTY_NAME;
4807  }
4808  }
4809  return tableName;
4810 }
4811 
4812 bool hasData(SubstituteMap& sMap, std::string tableName) {
4813  // assumes loadTable has been previously called
4814  return (sMap[tableName] != EMPTY_NAME);
4815 }
4816 
4818  // Substition map should be clean at end of rename:
4819  // all items in map must (map to self) or (map to EMPTY_STRING) by end
4820 
4821  for (auto it : sMap) {
4822  if ((it.second) != EMPTY_NAME && (it.first) != (it.second)) {
4823  throw std::runtime_error(
4824  "Error: Attempted to overwrite and lose data in table: \'" + (it.first) + "\'");
4825  }
4826  }
4827 }
4828 } // namespace
4829 
4830 namespace {
4833  throw std::runtime_error(td->tableName + " is a foreign table. " +
4834  "Use ALTER FOREIGN TABLE.");
4835  }
4836 }
4837 } // namespace
4838 
4840  bool read_only_mode) {
4841  if (read_only_mode) {
4842  throw std::runtime_error("RENAME TABLE invalid in read only mode.");
4843  }
4844  auto& catalog = session.getCatalog();
4845 
4846  // TODO(adb): the catalog should be handling this locking (see AddColumStmt)
4847  const auto execute_write_lock =
4851 
4852  // accumulated vector of table names: oldName->newName
4853  std::vector<std::pair<std::string, std::string>> names;
4854 
4855  SubstituteMap tableSubtituteMap;
4856 
4857  for (auto& item : tablesToRename_) {
4858  std::string curTableName = *(item.first);
4859  std::string newTableName = *(item.second);
4860 
4861  // Note: if rename (a->b, b->a)
4862  // requires a tmp name change (a->tmp, b->a, tmp->a),
4863  // inject that here because
4864  // catalog.renameTable() assumes cleanliness else will fail
4865 
4866  std::string altCurTableName = loadTable(catalog, tableSubtituteMap, curTableName);
4867  std::string altNewTableName = loadTable(catalog, tableSubtituteMap, newTableName);
4868 
4869  if (altCurTableName != curTableName && altCurTableName != EMPTY_NAME) {
4870  // rename is a one-shot deal, reset the mapping once used
4871  recordRename(tableSubtituteMap, curTableName, curTableName);
4872  }
4873 
4874  // Check to see if the command (as-entered) will likely execute cleanly (logic-wise)
4875  // src tables exist before coping from
4876  // destination table collisions
4877  // handled (a->b, b->a)
4878  // or flagged (pre-existing a,b ... "RENAME TABLE a->c, b->c" )
4879  // handle mulitple chained renames, tmp names (a_>tmp, b->a, tmp->a)
4880  // etc.
4881  //
4882  if (hasData(tableSubtituteMap, altCurTableName)) {
4883  const TableDescriptor* td = catalog.getMetadataForTable(altCurTableName);
4884  if (td) {
4885  // Tables *and* views may be renamed here, foreign tables not
4886  // -> just block foreign tables
4888  check_alter_table_privilege(session, td);
4889  }
4890 
4891  if (hasData(tableSubtituteMap, altNewTableName)) {
4892  std::string tmpNewTableName = generateUniqueTableName(altNewTableName);
4893  // rename: newTableName to tmpNewTableName to get it out of the way
4894  // because it was full
4895  recordRename(tableSubtituteMap, altCurTableName, EMPTY_NAME);
4896  recordRename(tableSubtituteMap, altNewTableName, tmpNewTableName);
4897  recordRename(tableSubtituteMap, tmpNewTableName, tmpNewTableName);
4898  names.emplace_back(altNewTableName, tmpNewTableName);
4899  names.emplace_back(altCurTableName, altNewTableName);
4900  } else {
4901  // rename: curNewTableName to newTableName
4902  recordRename(tableSubtituteMap, altCurTableName, EMPTY_NAME);
4903  recordRename(tableSubtituteMap, altNewTableName, altNewTableName);
4904  names.emplace_back(altCurTableName, altNewTableName);
4905  }
4906  } else {
4907  throw std::runtime_error("Source table \'" + curTableName + "\' does not exist.");
4908  }
4909  }
4910  checkNameSubstition(tableSubtituteMap);
4911 
4912  catalog.renameTable(names);
4913 
4914  // just to be explicit, clean out the list, the unique_ptr will delete
4915  while (!tablesToRename_.empty()) {
4916  tablesToRename_.pop_front();
4917  }
4918 } // namespace Parser
4919 
4921  bool not_null;
4922  const ColumnConstraintDef* cc = coldef->get_column_constraint();
4923  if (cc == nullptr) {
4924  not_null = false;
4925  } else {
4926  not_null = cc->get_notnull();
4927  }
4928  std::string default_value;
4929  const std::string* default_value_ptr = nullptr;
4930  if (cc) {
4931  if (auto def_val_literal = cc->get_defaultval()) {
4932  auto defaultsp = dynamic_cast<const StringLiteral*>(def_val_literal);
4933  default_value =
4934  defaultsp ? *defaultsp->get_stringval() : def_val_literal->to_string();
4935  // The preprocessing below is needed because:
4936  // a) TypedImportBuffer expects arrays in the {...} format
4937  // b) TypedImportBuffer expects string literals inside arrays w/o any quotes
4938  if (coldef->get_column_type()->get_is_array()) {
4939  std::regex array_re(R"(^ARRAY\s*\[(.*)\]$)", std::regex_constants::icase);
4940  default_value = std::regex_replace(default_value, array_re, "{$1}");
4941  boost::erase_all(default_value, "\'");
4942  }
4943  default_value_ptr = &default_value;
4944  }
4945  }
4947  cd,
4948  coldef->get_column_type(),
4949  not_null,
4950  coldef->get_compression(),
4951  default_value_ptr);
4952 }
4953 
4955  const TableDescriptor* td) {
4956  auto& catalog = session.getCatalog();
4957  if (!td) {
4958  throw std::runtime_error("Table " + *table_ + " does not exist.");
4959  } else {
4960  if (td->isView) {
4961  throw std::runtime_error("Adding columns to a view is not supported.");
4962  }
4964  if (table_is_temporary(td)) {
4965  throw std::runtime_error(
4966  "Adding columns to temporary tables is not yet supported.");
4967  }
4968  }
4969 
4970  check_alter_table_privilege(session, td);
4971 
4972  if (0 == coldefs_.size()) {
4973  coldefs_.push_back(std::move(coldef_));
4974  }
4975 
4976  for (const auto& coldef : coldefs_) {
4977  auto& new_column_name = *coldef->get_column_name();
4978  if (catalog.getMetadataForColumn(td->tableId, new_column_name) != nullptr) {
4979  throw std::runtime_error("Column " + new_column_name + " already exists.");
4980  }
4981  }
4982 }
4983 
4985  bool read_only_mode) {
4986  if (read_only_mode) {
4987  throw std::runtime_error("ADD COLUMN invalid in read only mode.");
4988  }
4989  // TODO: Review add and drop column implementation
4990  const auto execute_write_lock =
4994  auto& catalog = session.getCatalog();
4995  const auto td_with_lock =
4997  catalog, *table_, true);
4998  const auto td = td_with_lock();
4999 
5000  check_executable(session, td);
5001 
5002  CHECK(td->fragmenter);
5003  if (std::dynamic_pointer_cast<Fragmenter_Namespace::SortedOrderFragmenter>(
5004  td->fragmenter)) {
5005  throw std::runtime_error(
5006  "Adding columns to a table is not supported when using the \"sort_column\" "
5007  "option.");
5008  }
5009 
5010  // invalidate cached item
5011  std::vector<int> table_key{catalog.getCurrentDB().dbId, td->tableId};
5012  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
5013 
5014  // Do not take a data write lock, as the fragmenter may call `deleteFragments`
5015  // during a cap operation. Note that the schema write lock will prevent concurrent
5016  // inserts along with all other queries.
5017 
5018  catalog.getSqliteConnector().query("BEGIN TRANSACTION");
5019  try {
5020  std::map<const std::string, const ColumnDescriptor> cds;
5021  std::map<const int, const ColumnDef*> cid_coldefs;
5022  for (const auto& coldef : coldefs_) {
5023  ColumnDescriptor cd;
5024  setColumnDescriptor(cd, coldef.get());
5025  catalog.addColumn(*td, cd);
5026  cds.emplace(*coldef->get_column_name(), cd);
5027  cid_coldefs.emplace(cd.columnId, coldef.get());
5028 
5029  // expand geo column to phy columns
5030  if (cd.columnType.is_geometry()) {
5031  std::list<ColumnDescriptor> phy_geo_columns;
5032  catalog.expandGeoColumn(cd, phy_geo_columns);
5033  for (auto& cd : phy_geo_columns) {
5034  catalog.addColumn(*td, cd);
5035  cds.emplace(cd.columnName, cd);
5036  cid_coldefs.emplace(cd.columnId, nullptr);
5037  }
5038  }
5039  }
5040 
5041  std::unique_ptr<import_export::Loader> loader(new import_export::Loader(catalog, td));
5042  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
5043  for (const auto& cd : cds) {
5044  import_buffers.emplace_back(std::make_unique<import_export::TypedImportBuffer>(
5045  &cd.second, loader->getStringDict(&cd.second)));
5046  }
5047  loader->setAddingColumns(true);
5048 
5049  // set_geo_physical_import_buffer below needs a sorted import_buffers
5050  std::sort(import_buffers.begin(),
5051  import_buffers.end(),
5052  [](decltype(import_buffers[0])& a, decltype(import_buffers[0])& b) {
5053  return a->getColumnDesc()->columnId < b->getColumnDesc()->columnId;
5054  });
5055 
5056  size_t nrows = td->fragmenter->getNumRows();
5057  // if sharded, get total nrows from all sharded tables
5058  if (td->nShards > 0) {
5059  const auto physical_tds = catalog.getPhysicalTablesDescriptors(td);
5060  nrows = 0;
5061  std::for_each(physical_tds.begin(), physical_tds.end(), [&nrows](const auto& td) {
5062  nrows += td->fragmenter->getNumRows();
5063  });
5064  }
5065  if (nrows > 0) {
5066  int skip_physical_cols = 0;
5067  for (const auto cit : cid_coldefs) {
5068  const auto cd = catalog.getMetadataForColumn(td->tableId, cit.first);
5069  const auto coldef = cit.second;
5070  const bool is_null = !cd->default_value.has_value();
5071 
5072  if (cd->columnType.get_notnull() && is_null) {
5073  throw std::runtime_error("Default value required for column " + cd->columnName +
5074  " because of NOT NULL constraint");
5075  }
5076 
5077  for (auto it = import_buffers.begin(); it < import_buffers.end(); ++it) {
5078  auto& import_buffer = *it;
5079  if (cd->columnId == import_buffer->getColumnDesc()->columnId) {
5080  if (coldef != nullptr ||
5081  skip_physical_cols-- <= 0) { // skip non-null phy col
5082  import_buffer->add_value(cd,
5083  cd->default_value.value_or("NULL"),
5084  is_null,
5086  if (cd->columnType.is_geometry()) {
5087  std::vector<double> coords, bounds;
5088  std::vector<int> ring_sizes, poly_rings;
5089  int render_group = 0;
5090  SQLTypeInfo tinfo{cd->columnType};
5092  cd->default_value.value_or("NULL"),
5093  tinfo,
5094  coords,
5095  bounds,
5096  ring_sizes,
5097  poly_rings,
5098  false)) {
5099  throw std::runtime_error("Bad geometry data: '" +
5100  cd->default_value.value_or("NULL") + "'");
5101  }
5102  size_t col_idx = 1 + std::distance(import_buffers.begin(), it);
5104  cd,
5105  import_buffers,
5106  col_idx,
5107  coords,
5108  bounds,
5109  ring_sizes,
5110  poly_rings,
5111  render_group);
5112  // skip following phy cols
5113  skip_physical_cols = cd->columnType.get_physical_cols();
5114  }
5115  }
5116  break;
5117  }
5118  }
5119  }
5120  }
5121 
5122  if (!loader->loadNoCheckpoint(import_buffers, nrows, &session)) {
5123  throw std::runtime_error("loadNoCheckpoint failed!");
5124  }
5125  catalog.roll(true);
5126  catalog.resetTableEpochFloor(td->tableId);
5127  loader->checkpoint();
5128  catalog.getSqliteConnector().query("END TRANSACTION");
5129  } catch (...) {
5130  catalog.roll(false);
5131  catalog.getSqliteConnector().query("ROLLBACK TRANSACTION");
5132  throw;
5133  }
5134 }
5135 
5137  bool read_only_mode) {
5138  if (read_only_mode) {
5139  throw std::runtime_error("DROP COLUMN invalid in read only mode.");
5140  }
5141  // TODO: Review add and drop column implementation
5142  const auto execute_write_lock =
5146  auto& catalog = session.getCatalog();
5147  const auto td_with_lock =
5149  catalog, *table_, true);
5150  const auto td = td_with_lock();
5151  if (!td) {
5152  throw std::runtime_error("Table " + *table_ + " does not exist.");
5153  }
5155  if (td->isView) {
5156  throw std::runtime_error("Dropping a column from a view is not supported.");
5157  }
5158  if (table_is_temporary(td)) {
5159  throw std::runtime_error(
5160  "Dropping a column from a temporary table is not yet supported.");
5161  }
5162 
5163  check_alter_table_privilege(session, td);
5164 
5165  for (const auto& column : columns_) {
5166  if (nullptr == catalog.getMetadataForColumn(td->tableId, *column)) {
5167  throw std::runtime_error("Column " + *column + " does not exist.");
5168  }
5169  }
5170 
5171  if (td->nColumns <= (td->hasDeletedCol ? 3 : 2)) {
5172  throw std::runtime_error("Table " + *table_ + " has only one column.");
5173  }
5174 
5175  // invalidate cached item
5176  Executor::clearExternalCaches(false, td, catalog.getCurrentDB().dbId);
5177 
5178  catalog.getSqliteConnector().query("BEGIN TRANSACTION");
5179  try {
5180  std::vector<int> columnIds;
5181  for (const auto& column : columns_) {
5182  ColumnDescriptor cd = *catalog.getMetadataForColumn(td->tableId, *column);
5183  if (td->nShards > 0 && td->shardedColumnId == cd.columnId) {
5184  throw std::runtime_error("Dropping sharding column " + cd.columnName +
5185  " is not supported.");
5186  }
5187  catalog.dropColumn(*td, cd);
5188  columnIds.push_back(cd.columnId);
5189  for (int i = 0; i < cd.columnType.get_physical_cols(); i++) {
5190  const auto pcd = catalog.getMetadataForColumn(td->tableId, cd.columnId + i + 1);
5191  CHECK(pcd);
5192  catalog.dropColumn(*td, *pcd);
5193  columnIds.push_back(cd.columnId + i + 1);
5194  }
5195  }
5196 
5197  for (auto shard : catalog.getPhysicalTablesDescriptors(td)) {
5198  shard->fragmenter->dropColumns(columnIds);
5199  }
5200  // if test forces to rollback
5202  throw std::runtime_error("lol!");
5203  }
5204  catalog.roll(true);
5206  catalog.resetTableEpochFloor(td->tableId);
5207  catalog.checkpoint(td->tableId);
5208  }
5209  catalog.getSqliteConnector().query("END TRANSACTION");
5210  } catch (...) {
5211  catalog.setForReload(td->tableId);
5212  catalog.roll(false);
5213  catalog.getSqliteConnector().query("ROLLBACK TRANSACTION");
5214  throw;
5215  }
5216 }
5217 
5219  bool read_only_mode) {
5220  if (read_only_mode) {
5221  throw std::runtime_error("RENAME COLUMN invalid in read only mode.");
5222  }
5223  auto& catalog = session.getCatalog();
5224 
5225  const auto execute_read_lock =
5229 
5230  const auto td_with_lock =
5232  catalog, *table_, false);
5233  const auto td = td_with_lock();
5234  CHECK(td);
5236 
5237  check_alter_table_privilege(session, td);
5238  const ColumnDescriptor* cd = catalog.getMetadataForColumn(td->tableId, *column_);
5239  if (cd == nullptr) {
5240  throw std::runtime_error("Column " + *column_ + " does not exist.");
5241  }
5242  if (catalog.getMetadataForColumn(td->tableId, *new_column_name_) != nullptr) {
5243  throw std::runtime_error("Column " + *new_column_name_ + " already exists.");
5244  }
5245  catalog.renameColumn(td, cd, *new_column_name_);
5246 }
5247 
5249  bool read_only_mode) {
5250  if (read_only_mode) {
5251  throw std::runtime_error("ALTER TABLE invalid in read only mode.");
5252  }
5253  enum TableParamType { MaxRollbackEpochs, Epoch, MaxRows };
5254  static const std::unordered_map<std::string, TableParamType> param_map = {
5255  {"max_rollback_epochs", TableParamType::MaxRollbackEpochs},
5256  {"epoch", TableParamType::Epoch},
5257  {"max_rows", TableParamType::MaxRows}};
5258  const auto execute_read_lock =
5262  auto& catalog = session.getCatalog();
5263  const auto td_with_lock =
5265  catalog, *table_, false);
5266  const auto td = td_with_lock();
5267  if (!td) {
5268  throw std::runtime_error("Table " + *table_ + " does not exist.");
5269  }
5270  if (td->isView) {
5271  throw std::runtime_error("Setting parameters for a view is not supported.");
5272  }
5273  if (table_is_temporary(td)) {
5274  throw std::runtime_error(
5275  "Setting parameters for a temporary table is not yet supported.");
5276  }
5277  check_alter_table_privilege(session, td);
5278 
5279  // invalidate cached item
5280  std::vector<int> table_key{catalog.getCurrentDB().dbId, td->tableId};
5281  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
5282 
5283  std::string param_name(*param_->get_name());
5284  boost::algorithm::to_lower(param_name);
5285  const IntLiteral* val_int_literal =
5286  dynamic_cast<const IntLiteral*>(param_->get_value());
5287  if (val_int_literal == nullptr) {
5288  throw std::runtime_error("Table parameters should be integers.");
5289  }
5290  const int64_t param_val = val_int_literal->get_intval();
5291 
5292  const auto param_it = param_map.find(param_name);
5293  if (param_it == param_map.end()) {
5294  throw std::runtime_error(param_name + " is not a settable table parameter.");
5295  }
5296  switch (param_it->second) {
5297  case MaxRollbackEpochs: {
5298  catalog.setMaxRollbackEpochs(td->tableId, param_val);
5299  break;
5300  }
5301  case Epoch: {
5302  catalog.setTableEpoch(catalog.getDatabaseId(), td->tableId, param_val);
5303  break;
5304  }
5305  case MaxRows: {
5306  catalog.setMaxRows(td->tableId, param_val);
5307  break;
5308  }
5309  default: {
5310  UNREACHABLE() << "Unexpected TableParamType value: " << param_it->second
5311  << ", key: " << param_it->first;
5312  }
5313  }
5314 }
5315 
5317  std::string* f,
5318  std::list<NameValueAssign*>* o)
5319  : table_(t), copy_from_source_pattern_(f), success_(true) {
5320  if (o) {
5321  for (const auto e : *o) {
5322  options_.emplace_back(e);
5323  }
5324  delete o;
5325  }
5326 }
5327 
5328 CopyTableStmt::CopyTableStmt(const rapidjson::Value& payload) : success_(true) {
5329  CHECK(payload.HasMember("table"));
5330  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5331 
5332  CHECK(payload.HasMember("filePath"));
5333  std::string fs = json_str(payload["filePath"]);
5334  // strip leading/trailing spaces/quotes/single quotes
5335  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5336  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5337 
5338  parse_options(payload, options_);
5339 }
5340 
5342  bool read_only_mode) {
5343  if (read_only_mode) {
5344  throw std::runtime_error("IMPORT invalid in read only mode.");
5345  }
5346  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5347  const TableDescriptor* td,
5348  const std::string& copy_from_source,
5349  const import_export::CopyParams& copy_params)
5350  -> std::unique_ptr<import_export::AbstractImporter> {
5351  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5352  };
5353  return execute(session, read_only_mode, importer_factory);
5354 }
5355 
5357  const Catalog_Namespace::SessionInfo& session,
5358  bool read_only_mode,
5359  const std::function<std::unique_ptr<import_export::AbstractImporter>(
5361  const TableDescriptor*,
5362  const std::string&,
5363  const import_export::CopyParams&)>& importer_factory) {
5364  if (read_only_mode) {
5365  throw std::runtime_error("COPY FROM invalid in read only mode.");
5366  }
5367 
5368  size_t total_time = 0;
5369 
5370  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5371  const auto execute_read_lock =
5375 
5376  const TableDescriptor* td{nullptr};
5377  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5378  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5379 
5380  auto& catalog = session.getCatalog();
5381 
5382  try {
5383  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5385  catalog, *table_));
5386  td = (*td_with_lock)();
5387  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5389  } catch (const std::runtime_error& e) {
5390  // noop
5391  // TODO(adb): We're really only interested in whether the table exists or not.
5392  // Create a more refined exception.
5393