OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/dynamic_bitset.hpp>
31 #include <boost/filesystem.hpp>
32 #include <boost/geometry.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
56 #include "Geospatial/Compression.h"
57 #include "Geospatial/GDAL.h"
58 #include "Geospatial/Transforms.h"
59 #include "Geospatial/Types.h"
63 #include "Logger/Logger.h"
65 #include "QueryEngine/Execute.h"
67 #include "RenderGroupAnalyzer.h"
68 #include "Shared/DateTimeParser.h"
69 #include "Shared/SqlTypesLayout.h"
70 #include "Shared/file_path_util.h"
71 #include "Shared/import_helpers.h"
72 #include "Shared/likely.h"
73 #include "Shared/measure.h"
74 #include "Shared/misc.h"
75 #include "Shared/scope.h"
76 #include "Shared/shard_key.h"
77 #include "Shared/thread_count.h"
79 
80 #include "gen-cpp/OmniSci.h"
81 
82 #ifdef _WIN32
83 #include <fcntl.h>
84 #include <io.h>
85 #endif
86 
87 #define TIMER_STOP(t) \
88  (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
89  t)) / \
90  1.0E6f)
91 
93  32; // Max number of default import threads to use (num hardware threads will be used
94  // if lower, and can also be explicitly overriden in copy statement with threads
95  // option)
96 size_t g_archive_read_buf_size = 1 << 20;
97 
98 static constexpr int kMaxRasterScanlinesPerThread = 32;
99 
100 inline auto get_filesize(const std::string& file_path) {
101  boost::filesystem::path boost_file_path{file_path};
102  boost::system::error_code ec;
103  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
104  return ec ? 0 : filesize;
105 }
106 
107 namespace {
108 
109 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
110  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
111  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
112  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
113  }
114  return false;
115 }
116 
117 const size_t num_import_threads(const int copy_params_threads) {
118  if (copy_params_threads > 0) {
119  return static_cast<size_t>(copy_params_threads);
120  }
121  return std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
123 }
124 
125 } // namespace
126 
127 // For logging std::vector<std::string> row.
128 namespace boost {
129 namespace log {
130 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
131  out << '[';
132  for (size_t i = 0; i < row.size(); ++i) {
133  out << (i ? ", " : "") << row[i];
134  }
135  out << ']';
136  return out;
137 }
138 } // namespace log
139 } // namespace boost
140 
141 namespace import_export {
142 
143 using FieldNameToIndexMapType = std::map<std::string, size_t>;
144 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
146  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
147 using FeaturePtrVector = std::vector<Geospatial::GDAL::FeatureUqPtr>;
148 
149 #define DEBUG_TIMING false
150 #define DEBUG_RENDER_GROUP_ANALYZER 0
151 #define DEBUG_AWS_AUTHENTICATION 0
152 
153 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
154 
155 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
156 
158 static std::map<std::string, ImportStatus> import_status_map;
159 
161  const TableDescriptor* t,
162  const std::string& f,
163  const CopyParams& p)
164  : Importer(new Loader(c, t), f, p) {}
165 
166 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
167  : DataStreamSink(p, f), loader(providedLoader) {
168  import_id = boost::filesystem::path(file_path).filename().string();
169  file_size = 0;
170  max_threads = 0;
171  p_file = nullptr;
172  buffer[0] = nullptr;
173  buffer[1] = nullptr;
174  // we may be overallocating a little more memory here due to dropping phy cols.
175  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
176  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
177  int i = 0;
178  bool has_array = false;
179  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
180  int skip_physical_cols = 0;
181  for (auto& p : loader->get_column_descs()) {
182  // phy geo columns can't be in input file
183  if (skip_physical_cols-- > 0) {
184  continue;
185  }
186  // neither are rowid or $deleted$
187  // note: columns can be added after rowid/$deleted$
188  if (p->isVirtualCol || p->isDeletedCol) {
189  continue;
190  }
191  skip_physical_cols = p->columnType.get_physical_cols();
192  if (p->columnType.get_type() == kARRAY) {
193  is_array.get()[i] = true;
194  has_array = true;
195  } else {
196  is_array.get()[i] = false;
197  }
198  ++i;
199  }
200  if (has_array) {
201  is_array_a = std::unique_ptr<bool[]>(is_array.release());
202  } else {
203  is_array_a = std::unique_ptr<bool[]>(nullptr);
204  }
205 }
206 
208  if (p_file != nullptr) {
209  fclose(p_file);
210  }
211  if (buffer[0] != nullptr) {
212  free(buffer[0]);
213  }
214  if (buffer[1] != nullptr) {
215  free(buffer[1]);
216  }
217 }
218 
219 ImportStatus Importer::get_import_status(const std::string& import_id) {
220  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
221  return import_status_map.at(import_id);
222 }
223 
224 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
225  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
226  is.end = std::chrono::steady_clock::now();
227  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
229 }
230 
231 static const std::string trim_space(const char* field, const size_t len) {
232  size_t i = 0;
233  size_t j = len;
234  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
235  i++;
236  }
237  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
238  j--;
239  }
240  return std::string(field + i, j - i);
241 }
242 
243 namespace {
245  SQLTypes type;
246  if (ti.is_decimal()) {
247  type = decimal_to_int_type(ti);
248  } else if (ti.is_dict_encoded_string()) {
249  type = string_dict_to_int_type(ti);
250  } else {
251  type = ti.get_type();
252  }
253  return type;
254 }
255 } // namespace
256 
258  Datum d;
259  const auto type = get_type_for_datum(ti);
260  switch (type) {
261  case kBOOLEAN:
263  break;
264  case kBIGINT:
266  break;
267  case kINT:
269  break;
270  case kSMALLINT:
272  break;
273  case kTINYINT:
275  break;
276  case kFLOAT:
277  d.floatval = NULL_FLOAT;
278  break;
279  case kDOUBLE:
281  break;
282  case kTIME:
283  case kTIMESTAMP:
284  case kDATE:
286  break;
287  case kPOINT:
288  case kLINESTRING:
289  case kPOLYGON:
290  case kMULTIPOLYGON:
291  throw std::runtime_error("Internal error: geometry type in NullDatum.");
292  default:
293  throw std::runtime_error("Internal error: invalid type in NullDatum.");
294  }
295  return d;
296 }
297 
299  Datum d;
300  const auto type = get_type_for_datum(ti);
301  switch (type) {
302  case kBOOLEAN:
304  break;
305  case kBIGINT:
307  break;
308  case kINT:
310  break;
311  case kSMALLINT:
313  break;
314  case kTINYINT:
316  break;
317  case kFLOAT:
319  break;
320  case kDOUBLE:
322  break;
323  case kTIME:
324  case kTIMESTAMP:
325  case kDATE:
327  break;
328  case kPOINT:
329  case kLINESTRING:
330  case kPOLYGON:
331  case kMULTIPOLYGON:
332  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
333  default:
334  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
335  }
336  return d;
337 }
338 
339 ArrayDatum StringToArray(const std::string& s,
340  const SQLTypeInfo& ti,
341  const CopyParams& copy_params) {
342  SQLTypeInfo elem_ti = ti.get_elem_type();
343  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
344  return ArrayDatum(0, NULL, true);
345  }
346  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
347  LOG(WARNING) << "Malformed array: " << s;
348  return ArrayDatum(0, NULL, true);
349  }
350  std::vector<std::string> elem_strs;
351  size_t last = 1;
352  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
353  i = s.find(copy_params.array_delim, last)) {
354  elem_strs.push_back(s.substr(last, i - last));
355  last = i + 1;
356  }
357  if (last + 1 <= s.size()) {
358  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
359  }
360  if (elem_strs.size() == 1) {
361  auto str = elem_strs.front();
362  auto str_trimmed = trim_space(str.c_str(), str.length());
363  if (str_trimmed == "") {
364  elem_strs.clear(); // Empty array
365  }
366  }
367  if (!elem_ti.is_string()) {
368  size_t len = elem_strs.size() * elem_ti.get_size();
369  int8_t* buf = (int8_t*)checked_malloc(len);
370  int8_t* p = buf;
371  for (auto& es : elem_strs) {
372  auto e = trim_space(es.c_str(), es.length());
373  bool is_null = (e == copy_params.null_str) || e == "NULL";
374  if (!elem_ti.is_string() && e == "") {
375  is_null = true;
376  }
377  if (elem_ti.is_number() || elem_ti.is_time()) {
378  if (!isdigit(e[0]) && e[0] != '-') {
379  is_null = true;
380  }
381  }
382  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
383  p = append_datum(p, d, elem_ti);
384  CHECK(p);
385  }
386  return ArrayDatum(len, buf, false);
387  }
388  // must not be called for array of strings
389  CHECK(false);
390  return ArrayDatum(0, NULL, true);
391 }
392 
394  SQLTypeInfo elem_ti = ti.get_elem_type();
395  auto len = ti.get_size();
396 
397  if (len > 0) {
398  // Compose a NULL fixlen array
399  int8_t* buf = (int8_t*)checked_malloc(len);
400  // First scalar is a NULL_ARRAY sentinel
401  Datum d = NullArrayDatum(elem_ti);
402  int8_t* p = append_datum(buf, d, elem_ti);
403  CHECK(p);
404  // Rest is filled with normal NULL sentinels
405  Datum d0 = NullDatum(elem_ti);
406  while ((p - buf) < len) {
407  p = append_datum(p, d0, elem_ti);
408  CHECK(p);
409  }
410  CHECK((p - buf) == len);
411  return ArrayDatum(len, buf, true);
412  }
413  // NULL varlen array
414  return ArrayDatum(0, NULL, true);
415 }
416 
418  return NullArray(ti);
419 }
420 
422  const SQLTypeInfo& geo_ti) {
423  if (geo_ti.get_compression() == kENCODING_GEOINT) {
424  CHECK(geo_ti.get_comp_param() == 32);
425  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
426  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
427  const size_t len = compressed_null_coords.size();
428  int8_t* buf = (int8_t*)checked_malloc(len);
429  memcpy(buf, compressed_null_coords.data(), len);
430  return ArrayDatum(len, buf, false);
431  }
432  auto modified_ti = coords_ti;
433  modified_ti.set_subtype(kDOUBLE);
435 }
436 
437 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
438  const auto& arr = datum.val.arr_val;
439  for (const auto& elem_datum : arr) {
440  string_vec.push_back(elem_datum.val.str_val);
441  }
442 }
443 
444 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
445  Datum d;
446  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
447  switch (type) {
448  case kBOOLEAN:
449  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
450  break;
451  case kBIGINT:
452  d.bigintval =
453  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
454  break;
455  case kINT:
456  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
457  break;
458  case kSMALLINT:
459  d.smallintval =
460  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
461  break;
462  case kTINYINT:
463  d.tinyintval =
464  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
465  break;
466  case kFLOAT:
467  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
468  break;
469  case kDOUBLE:
470  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
471  break;
472  case kTIME:
473  case kTIMESTAMP:
474  case kDATE:
475  d.bigintval =
476  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
477  break;
478  case kPOINT:
479  case kLINESTRING:
480  case kPOLYGON:
481  case kMULTIPOLYGON:
482  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
483  default:
484  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
485  }
486  return d;
487 }
488 
489 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
490  SQLTypeInfo elem_ti = ti.get_elem_type();
491 
492  CHECK(!elem_ti.is_string());
493 
494  if (datum.is_null) {
495  return NullArray(ti);
496  }
497 
498  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
499  int8_t* buf = (int8_t*)checked_malloc(len);
500  int8_t* p = buf;
501  for (auto& e : datum.val.arr_val) {
502  p = append_datum(p, TDatumToDatum(e, elem_ti), elem_ti);
503  CHECK(p);
504  }
505 
506  return ArrayDatum(len, buf, false);
507 }
508 
509 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
511  std::vector<std::string_view> string_view_vec;
512  string_view_vec.reserve(string_vec.size());
513  for (const auto& str : string_vec) {
514  if (str.size() > StringDictionary::MAX_STRLEN) {
515  std::ostringstream oss;
516  oss << "while processing dictionary for column " << getColumnDesc()->columnName
517  << " a string was detected too long for encoding, string length = "
518  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
519  throw std::runtime_error(oss.str());
520  }
521  string_view_vec.push_back(str);
522  }
523  try {
524  switch (column_desc_->columnType.get_size()) {
525  case 1:
526  string_dict_i8_buffer_->resize(string_view_vec.size());
527  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
528  break;
529  case 2:
530  string_dict_i16_buffer_->resize(string_view_vec.size());
531  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
532  break;
533  case 4:
534  string_dict_i32_buffer_->resize(string_view_vec.size());
535  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
536  break;
537  default:
538  CHECK(false);
539  }
540  } catch (std::exception& e) {
541  std::ostringstream oss;
542  oss << "while processing dictionary for column " << getColumnDesc()->columnName
543  << " : " << e.what();
544  LOG(ERROR) << oss.str();
545  throw std::runtime_error(oss.str());
546  }
547 }
548 
550  const std::string_view val,
551  const bool is_null,
552  const CopyParams& copy_params) {
553  const auto type = cd->columnType.get_type();
554  switch (type) {
555  case kBOOLEAN: {
556  if (is_null) {
557  if (cd->columnType.get_notnull()) {
558  throw std::runtime_error("NULL for column " + cd->columnName);
559  }
561  } else {
562  auto ti = cd->columnType;
563  Datum d = StringToDatum(val, ti);
564  addBoolean(static_cast<int8_t>(d.boolval));
565  }
566  break;
567  }
568  case kTINYINT: {
569  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
570  auto ti = cd->columnType;
571  Datum d = StringToDatum(val, ti);
573  } else {
574  if (cd->columnType.get_notnull()) {
575  throw std::runtime_error("NULL for column " + cd->columnName);
576  }
578  }
579  break;
580  }
581  case kSMALLINT: {
582  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
583  auto ti = cd->columnType;
584  Datum d = StringToDatum(val, ti);
586  } else {
587  if (cd->columnType.get_notnull()) {
588  throw std::runtime_error("NULL for column " + cd->columnName);
589  }
591  }
592  break;
593  }
594  case kINT: {
595  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
596  auto ti = cd->columnType;
597  Datum d = StringToDatum(val, ti);
598  addInt(d.intval);
599  } else {
600  if (cd->columnType.get_notnull()) {
601  throw std::runtime_error("NULL for column " + cd->columnName);
602  }
604  }
605  break;
606  }
607  case kBIGINT: {
608  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
609  auto ti = cd->columnType;
610  Datum d = StringToDatum(val, ti);
611  addBigint(d.bigintval);
612  } else {
613  if (cd->columnType.get_notnull()) {
614  throw std::runtime_error("NULL for column " + cd->columnName);
615  }
617  }
618  break;
619  }
620  case kDECIMAL:
621  case kNUMERIC: {
622  if (!is_null) {
623  auto ti = cd->columnType;
624  Datum d = StringToDatum(val, ti);
625  DecimalOverflowValidator validator(ti);
626  validator.validate(d.bigintval);
627  addBigint(d.bigintval);
628  } else {
629  if (cd->columnType.get_notnull()) {
630  throw std::runtime_error("NULL for column " + cd->columnName);
631  }
633  }
634  break;
635  }
636  case kFLOAT:
637  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
638  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
639  } else {
640  if (cd->columnType.get_notnull()) {
641  throw std::runtime_error("NULL for column " + cd->columnName);
642  }
644  }
645  break;
646  case kDOUBLE:
647  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
648  addDouble(std::atof(std::string(val).c_str()));
649  } else {
650  if (cd->columnType.get_notnull()) {
651  throw std::runtime_error("NULL for column " + cd->columnName);
652  }
654  }
655  break;
656  case kTEXT:
657  case kVARCHAR:
658  case kCHAR: {
659  // @TODO(wei) for now, use empty string for nulls
660  if (is_null) {
661  if (cd->columnType.get_notnull()) {
662  throw std::runtime_error("NULL for column " + cd->columnName);
663  }
664  addString(std::string());
665  } else {
666  if (val.length() > StringDictionary::MAX_STRLEN) {
667  throw std::runtime_error("String too long for column " + cd->columnName +
668  " was " + std::to_string(val.length()) + " max is " +
670  }
671  addString(val);
672  }
673  break;
674  }
675  case kTIME:
676  case kTIMESTAMP:
677  case kDATE:
678  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
679  SQLTypeInfo ti = cd->columnType;
680  Datum d = StringToDatum(val, ti);
681  addBigint(d.bigintval);
682  } else {
683  if (cd->columnType.get_notnull()) {
684  throw std::runtime_error("NULL for column " + cd->columnName);
685  }
687  }
688  break;
689  case kARRAY: {
690  if (is_null && cd->columnType.get_notnull()) {
691  throw std::runtime_error("NULL for column " + cd->columnName);
692  }
693  SQLTypeInfo ti = cd->columnType;
694  if (IS_STRING(ti.get_subtype())) {
695  std::vector<std::string> string_vec;
696  // Just parse string array, don't push it to buffer yet as we might throw
698  std::string(val), copy_params, string_vec);
699  if (!is_null) {
700  if (ti.get_size() > 0) {
701  auto sti = ti.get_elem_type();
702  size_t expected_size = ti.get_size() / sti.get_size();
703  size_t actual_size = string_vec.size();
704  if (actual_size != expected_size) {
705  throw std::runtime_error("Fixed length array column " + cd->columnName +
706  " expects " + std::to_string(expected_size) +
707  " values, received " +
708  std::to_string(actual_size));
709  }
710  }
711  addStringArray(string_vec);
712  } else {
713  addStringArray(std::nullopt);
714  }
715  } else {
716  if (!is_null) {
717  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
718  if (d.is_null) { // val could be "NULL"
719  addArray(NullArray(ti));
720  } else {
721  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
722  throw std::runtime_error("Fixed length array for column " + cd->columnName +
723  " has incorrect length: " + std::string(val));
724  }
725  addArray(d);
726  }
727  } else {
728  addArray(NullArray(ti));
729  }
730  }
731  break;
732  }
733  case kPOINT:
734  case kLINESTRING:
735  case kPOLYGON:
736  case kMULTIPOLYGON:
737  addGeoString(val);
738  break;
739  default:
740  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
741  }
742 }
743 
745  const auto type = column_desc_->columnType.is_decimal()
748  switch (type) {
749  case kBOOLEAN:
750  bool_buffer_->pop_back();
751  break;
752  case kTINYINT:
753  tinyint_buffer_->pop_back();
754  break;
755  case kSMALLINT:
756  smallint_buffer_->pop_back();
757  break;
758  case kINT:
759  int_buffer_->pop_back();
760  break;
761  case kBIGINT:
762  bigint_buffer_->pop_back();
763  break;
764  case kFLOAT:
765  float_buffer_->pop_back();
766  break;
767  case kDOUBLE:
768  double_buffer_->pop_back();
769  break;
770  case kTEXT:
771  case kVARCHAR:
772  case kCHAR:
773  string_buffer_->pop_back();
774  break;
775  case kDATE:
776  case kTIME:
777  case kTIMESTAMP:
778  bigint_buffer_->pop_back();
779  break;
780  case kARRAY:
782  string_array_buffer_->pop_back();
783  } else {
784  array_buffer_->pop_back();
785  }
786  break;
787  case kPOINT:
788  case kLINESTRING:
789  case kPOLYGON:
790  case kMULTIPOLYGON:
791  geo_string_buffer_->pop_back();
792  break;
793  default:
794  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
795  }
796 }
797 
798 struct GeoImportException : std::runtime_error {
799  using std::runtime_error::runtime_error;
800 };
801 
802 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
803 template <typename DATA_TYPE>
805  const ColumnDescriptor* cd,
806  const Array& array,
807  std::vector<DATA_TYPE>& buffer,
808  const ArraySliceRange& slice_range,
809  import_export::BadRowsTracker* const bad_rows_tracker) {
810  auto data =
811  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
812  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
813  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
814  if (bad_rows_tracker && cd->columnType.is_geometry()) {
815  f_add_geo_phy_cols = [&](const int64_t row) {
816  // Populate physical columns (ref. DBHandler::load_table)
817  std::vector<double> coords, bounds;
818  std::vector<int> ring_sizes, poly_rings;
819  int render_group = 0;
820  SQLTypeInfo ti;
821  // replace any unexpected exception from getGeoColumns or other
822  // on this path with a GeoImportException so that we wont over
823  // push a null to the logical column...
824  try {
825  SQLTypeInfo import_ti{ti};
826  if (array.IsNull(row)) {
828  import_ti, coords, bounds, ring_sizes, poly_rings, false);
829  } else {
830  arrow_throw_if<GeoImportException>(
832  ti,
833  coords,
834  bounds,
835  ring_sizes,
836  poly_rings,
837  false),
838  error_context(cd, bad_rows_tracker) + "Invalid geometry");
839  arrow_throw_if<GeoImportException>(
840  cd->columnType.get_type() != ti.get_type(),
841  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
842  }
843  auto col_idx_workpad = col_idx; // what a pitfall!!
845  bad_rows_tracker->importer->getCatalog(),
846  cd,
848  col_idx_workpad,
849  coords,
850  bounds,
851  ring_sizes,
852  poly_rings,
853  render_group);
854  } catch (GeoImportException&) {
855  throw;
856  } catch (std::runtime_error& e) {
857  throw GeoImportException(e.what());
858  } catch (const std::exception& e) {
859  throw GeoImportException(e.what());
860  } catch (...) {
861  throw GeoImportException("unknown exception");
862  }
863  };
864  }
865  auto f_mark_a_bad_row = [&](const auto row) {
866  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
867  bad_rows_tracker->rows.insert(row - slice_range.first);
868  };
869  buffer.reserve(slice_range.second - slice_range.first);
870  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
871  try {
872  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
873  f_add_geo_phy_cols(row);
874  } catch (GeoImportException&) {
875  f_mark_a_bad_row(row);
876  } catch (ArrowImporterException&) {
877  // trace bad rows of each column; otherwise rethrow.
878  if (bad_rows_tracker) {
879  *data << nullptr;
880  f_mark_a_bad_row(row);
881  } else {
882  throw;
883  }
884  }
885  }
886  return buffer.size();
887 }
888 
890  const Array& col,
891  const bool exact_type_match,
892  const ArraySliceRange& slice_range,
893  BadRowsTracker* const bad_rows_tracker) {
894  const auto type = cd->columnType.get_type();
895  if (cd->columnType.get_notnull()) {
896  // We can't have any null values for this column; to have them is an error
897  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
898  }
899 
900  switch (type) {
901  case kBOOLEAN:
902  if (exact_type_match) {
903  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
904  }
906  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
907  case kTINYINT:
908  if (exact_type_match) {
909  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
910  }
912  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
913  case kSMALLINT:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
916  }
918  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
919  case kINT:
920  if (exact_type_match) {
921  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
922  }
924  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
925  case kBIGINT:
926  case kNUMERIC:
927  case kDECIMAL:
928  if (exact_type_match) {
929  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
930  }
932  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
933  case kFLOAT:
934  if (exact_type_match) {
935  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
936  }
938  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
939  case kDOUBLE:
940  if (exact_type_match) {
941  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
942  }
944  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
945  case kTEXT:
946  case kVARCHAR:
947  case kCHAR:
948  if (exact_type_match) {
949  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
950  "Expected string type");
951  }
953  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
954  case kTIME:
955  if (exact_type_match) {
956  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
957  "Expected time32 or time64 type");
958  }
960  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
961  case kTIMESTAMP:
962  if (exact_type_match) {
963  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
964  }
966  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
967  case kDATE:
968  if (exact_type_match) {
969  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
970  "Expected date32 or date64 type");
971  }
973  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
974  case kPOINT:
975  case kLINESTRING:
976  case kPOLYGON:
977  case kMULTIPOLYGON:
978  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
979  "Expected string type");
981  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
982  case kARRAY:
983  throw std::runtime_error("Arrow array appends not yet supported");
984  default:
985  throw std::runtime_error("Invalid Type");
986  }
987 }
988 
989 // this is exclusively used by load_table_binary_columnar
990 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
991  size_t dataSize = 0;
992  if (cd->columnType.get_notnull()) {
993  // We can't have any null values for this column; to have them is an error
994  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
995  throw std::runtime_error("NULL for column " + cd->columnName);
996  }
997  }
998 
999  switch (cd->columnType.get_type()) {
1000  case kBOOLEAN: {
1001  dataSize = col.data.int_col.size();
1002  bool_buffer_->reserve(dataSize);
1003  for (size_t i = 0; i < dataSize; i++) {
1004  if (col.nulls[i]) {
1006  } else {
1007  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
1008  }
1009  }
1010  break;
1011  }
1012  case kTINYINT: {
1013  dataSize = col.data.int_col.size();
1014  tinyint_buffer_->reserve(dataSize);
1015  for (size_t i = 0; i < dataSize; i++) {
1016  if (col.nulls[i]) {
1018  } else {
1019  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1020  }
1021  }
1022  break;
1023  }
1024  case kSMALLINT: {
1025  dataSize = col.data.int_col.size();
1026  smallint_buffer_->reserve(dataSize);
1027  for (size_t i = 0; i < dataSize; i++) {
1028  if (col.nulls[i]) {
1030  } else {
1031  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1032  }
1033  }
1034  break;
1035  }
1036  case kINT: {
1037  dataSize = col.data.int_col.size();
1038  int_buffer_->reserve(dataSize);
1039  for (size_t i = 0; i < dataSize; i++) {
1040  if (col.nulls[i]) {
1042  } else {
1043  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1044  }
1045  }
1046  break;
1047  }
1048  case kBIGINT:
1049  case kNUMERIC:
1050  case kDECIMAL: {
1051  dataSize = col.data.int_col.size();
1052  bigint_buffer_->reserve(dataSize);
1053  for (size_t i = 0; i < dataSize; i++) {
1054  if (col.nulls[i]) {
1056  } else {
1057  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1058  }
1059  }
1060  break;
1061  }
1062  case kFLOAT: {
1063  dataSize = col.data.real_col.size();
1064  float_buffer_->reserve(dataSize);
1065  for (size_t i = 0; i < dataSize; i++) {
1066  if (col.nulls[i]) {
1067  float_buffer_->push_back(NULL_FLOAT);
1068  } else {
1069  float_buffer_->push_back((float)col.data.real_col[i]);
1070  }
1071  }
1072  break;
1073  }
1074  case kDOUBLE: {
1075  dataSize = col.data.real_col.size();
1076  double_buffer_->reserve(dataSize);
1077  for (size_t i = 0; i < dataSize; i++) {
1078  if (col.nulls[i]) {
1079  double_buffer_->push_back(NULL_DOUBLE);
1080  } else {
1081  double_buffer_->push_back((double)col.data.real_col[i]);
1082  }
1083  }
1084  break;
1085  }
1086  case kTEXT:
1087  case kVARCHAR:
1088  case kCHAR: {
1089  // TODO: for now, use empty string for nulls
1090  dataSize = col.data.str_col.size();
1091  string_buffer_->reserve(dataSize);
1092  for (size_t i = 0; i < dataSize; i++) {
1093  if (col.nulls[i]) {
1094  string_buffer_->push_back(std::string());
1095  } else {
1096  string_buffer_->push_back(col.data.str_col[i]);
1097  }
1098  }
1099  break;
1100  }
1101  case kTIME:
1102  case kTIMESTAMP:
1103  case kDATE: {
1104  dataSize = col.data.int_col.size();
1105  bigint_buffer_->reserve(dataSize);
1106  for (size_t i = 0; i < dataSize; i++) {
1107  if (col.nulls[i]) {
1109  } else {
1110  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1111  }
1112  }
1113  break;
1114  }
1115  case kPOINT:
1116  case kLINESTRING:
1117  case kPOLYGON:
1118  case kMULTIPOLYGON: {
1119  dataSize = col.data.str_col.size();
1120  geo_string_buffer_->reserve(dataSize);
1121  for (size_t i = 0; i < dataSize; i++) {
1122  if (col.nulls[i]) {
1123  // TODO: add support for NULL geo
1124  geo_string_buffer_->push_back(std::string());
1125  } else {
1126  geo_string_buffer_->push_back(col.data.str_col[i]);
1127  }
1128  }
1129  break;
1130  }
1131  case kARRAY: {
1132  dataSize = col.data.arr_col.size();
1133  if (IS_STRING(cd->columnType.get_subtype())) {
1134  for (size_t i = 0; i < dataSize; i++) {
1135  OptionalStringVector& string_vec = addStringArray();
1136  if (!col.nulls[i]) {
1137  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1138  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1139  string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1140  }
1141  }
1142  }
1143  } else {
1144  auto elem_ti = cd->columnType.get_subtype();
1145  switch (elem_ti) {
1146  case kBOOLEAN: {
1147  for (size_t i = 0; i < dataSize; i++) {
1148  if (col.nulls[i]) {
1150  } else {
1151  size_t len = col.data.arr_col[i].data.int_col.size();
1152  size_t byteSize = len * sizeof(int8_t);
1153  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1154  int8_t* p = buf;
1155  for (size_t j = 0; j < len; ++j) {
1156  // Explicitly checking the item for null because
1157  // casting null value (-128) to bool results
1158  // incorrect value 1.
1159  if (col.data.arr_col[i].nulls[j]) {
1160  *p = static_cast<int8_t>(
1162  } else {
1163  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1164  }
1165  p += sizeof(bool);
1166  }
1167  addArray(ArrayDatum(byteSize, buf, false));
1168  }
1169  }
1170  break;
1171  }
1172  case kTINYINT: {
1173  for (size_t i = 0; i < dataSize; i++) {
1174  if (col.nulls[i]) {
1176  } else {
1177  size_t len = col.data.arr_col[i].data.int_col.size();
1178  size_t byteSize = len * sizeof(int8_t);
1179  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1180  int8_t* p = buf;
1181  for (size_t j = 0; j < len; ++j) {
1182  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1183  p += sizeof(int8_t);
1184  }
1185  addArray(ArrayDatum(byteSize, buf, false));
1186  }
1187  }
1188  break;
1189  }
1190  case kSMALLINT: {
1191  for (size_t i = 0; i < dataSize; i++) {
1192  if (col.nulls[i]) {
1194  } else {
1195  size_t len = col.data.arr_col[i].data.int_col.size();
1196  size_t byteSize = len * sizeof(int16_t);
1197  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1198  int8_t* p = buf;
1199  for (size_t j = 0; j < len; ++j) {
1200  *(int16_t*)p =
1201  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1202  p += sizeof(int16_t);
1203  }
1204  addArray(ArrayDatum(byteSize, buf, false));
1205  }
1206  }
1207  break;
1208  }
1209  case kINT: {
1210  for (size_t i = 0; i < dataSize; i++) {
1211  if (col.nulls[i]) {
1213  } else {
1214  size_t len = col.data.arr_col[i].data.int_col.size();
1215  size_t byteSize = len * sizeof(int32_t);
1216  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1217  int8_t* p = buf;
1218  for (size_t j = 0; j < len; ++j) {
1219  *(int32_t*)p =
1220  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1221  p += sizeof(int32_t);
1222  }
1223  addArray(ArrayDatum(byteSize, buf, false));
1224  }
1225  }
1226  break;
1227  }
1228  case kBIGINT:
1229  case kNUMERIC:
1230  case kDECIMAL: {
1231  for (size_t i = 0; i < dataSize; i++) {
1232  if (col.nulls[i]) {
1234  } else {
1235  size_t len = col.data.arr_col[i].data.int_col.size();
1236  size_t byteSize = len * sizeof(int64_t);
1237  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1238  int8_t* p = buf;
1239  for (size_t j = 0; j < len; ++j) {
1240  *(int64_t*)p =
1241  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1242  p += sizeof(int64_t);
1243  }
1244  addArray(ArrayDatum(byteSize, buf, false));
1245  }
1246  }
1247  break;
1248  }
1249  case kFLOAT: {
1250  for (size_t i = 0; i < dataSize; i++) {
1251  if (col.nulls[i]) {
1253  } else {
1254  size_t len = col.data.arr_col[i].data.real_col.size();
1255  size_t byteSize = len * sizeof(float);
1256  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1257  int8_t* p = buf;
1258  for (size_t j = 0; j < len; ++j) {
1259  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1260  p += sizeof(float);
1261  }
1262  addArray(ArrayDatum(byteSize, buf, false));
1263  }
1264  }
1265  break;
1266  }
1267  case kDOUBLE: {
1268  for (size_t i = 0; i < dataSize; i++) {
1269  if (col.nulls[i]) {
1271  } else {
1272  size_t len = col.data.arr_col[i].data.real_col.size();
1273  size_t byteSize = len * sizeof(double);
1274  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1275  int8_t* p = buf;
1276  for (size_t j = 0; j < len; ++j) {
1277  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1278  p += sizeof(double);
1279  }
1280  addArray(ArrayDatum(byteSize, buf, false));
1281  }
1282  }
1283  break;
1284  }
1285  case kTIME:
1286  case kTIMESTAMP:
1287  case kDATE: {
1288  for (size_t i = 0; i < dataSize; i++) {
1289  if (col.nulls[i]) {
1291  } else {
1292  size_t len = col.data.arr_col[i].data.int_col.size();
1293  size_t byteWidth = sizeof(int64_t);
1294  size_t byteSize = len * byteWidth;
1295  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1296  int8_t* p = buf;
1297  for (size_t j = 0; j < len; ++j) {
1298  *reinterpret_cast<int64_t*>(p) =
1299  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1300  p += sizeof(int64_t);
1301  }
1302  addArray(ArrayDatum(byteSize, buf, false));
1303  }
1304  }
1305  break;
1306  }
1307  default:
1308  throw std::runtime_error("Invalid Array Type");
1309  }
1310  }
1311  break;
1312  }
1313  default:
1314  throw std::runtime_error("Invalid Type");
1315  }
1316  return dataSize;
1317 }
1318 
1320  const TDatum& datum,
1321  const bool is_null) {
1322  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1323  : cd->columnType.get_type();
1324  switch (type) {
1325  case kBOOLEAN: {
1326  if (is_null) {
1327  if (cd->columnType.get_notnull()) {
1328  throw std::runtime_error("NULL for column " + cd->columnName);
1329  }
1331  } else {
1332  addBoolean((int8_t)datum.val.int_val);
1333  }
1334  break;
1335  }
1336  case kTINYINT:
1337  if (!is_null) {
1338  addTinyint((int8_t)datum.val.int_val);
1339  } else {
1340  if (cd->columnType.get_notnull()) {
1341  throw std::runtime_error("NULL for column " + cd->columnName);
1342  }
1344  }
1345  break;
1346  case kSMALLINT:
1347  if (!is_null) {
1348  addSmallint((int16_t)datum.val.int_val);
1349  } else {
1350  if (cd->columnType.get_notnull()) {
1351  throw std::runtime_error("NULL for column " + cd->columnName);
1352  }
1354  }
1355  break;
1356  case kINT:
1357  if (!is_null) {
1358  addInt((int32_t)datum.val.int_val);
1359  } else {
1360  if (cd->columnType.get_notnull()) {
1361  throw std::runtime_error("NULL for column " + cd->columnName);
1362  }
1364  }
1365  break;
1366  case kBIGINT:
1367  if (!is_null) {
1368  addBigint(datum.val.int_val);
1369  } else {
1370  if (cd->columnType.get_notnull()) {
1371  throw std::runtime_error("NULL for column " + cd->columnName);
1372  }
1374  }
1375  break;
1376  case kFLOAT:
1377  if (!is_null) {
1378  addFloat((float)datum.val.real_val);
1379  } else {
1380  if (cd->columnType.get_notnull()) {
1381  throw std::runtime_error("NULL for column " + cd->columnName);
1382  }
1384  }
1385  break;
1386  case kDOUBLE:
1387  if (!is_null) {
1388  addDouble(datum.val.real_val);
1389  } else {
1390  if (cd->columnType.get_notnull()) {
1391  throw std::runtime_error("NULL for column " + cd->columnName);
1392  }
1394  }
1395  break;
1396  case kTEXT:
1397  case kVARCHAR:
1398  case kCHAR: {
1399  // @TODO(wei) for now, use empty string for nulls
1400  if (is_null) {
1401  if (cd->columnType.get_notnull()) {
1402  throw std::runtime_error("NULL for column " + cd->columnName);
1403  }
1404  addString(std::string());
1405  } else {
1406  addString(datum.val.str_val);
1407  }
1408  break;
1409  }
1410  case kTIME:
1411  case kTIMESTAMP:
1412  case kDATE: {
1413  if (!is_null) {
1414  addBigint(datum.val.int_val);
1415  } else {
1416  if (cd->columnType.get_notnull()) {
1417  throw std::runtime_error("NULL for column " + cd->columnName);
1418  }
1420  }
1421  break;
1422  }
1423  case kARRAY:
1424  if (is_null && cd->columnType.get_notnull()) {
1425  throw std::runtime_error("NULL for column " + cd->columnName);
1426  }
1427  if (IS_STRING(cd->columnType.get_subtype())) {
1428  OptionalStringVector& string_vec = addStringArray();
1429  addBinaryStringArray(datum, *string_vec);
1430  } else {
1431  if (!is_null) {
1432  addArray(TDatumToArrayDatum(datum, cd->columnType));
1433  } else {
1435  }
1436  }
1437  break;
1438  case kPOINT:
1439  case kLINESTRING:
1440  case kPOLYGON:
1441  case kMULTIPOLYGON:
1442  if (is_null) {
1443  if (cd->columnType.get_notnull()) {
1444  throw std::runtime_error("NULL for column " + cd->columnName);
1445  }
1446  addGeoString(std::string());
1447  } else {
1448  addGeoString(datum.val.str_val);
1449  }
1450  break;
1451  default:
1452  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1453  }
1454 }
1455 
1456 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1457  bool is_null = !cd->default_value.has_value();
1458  CHECK(!(is_null && cd->columnType.get_notnull()));
1459  const auto type = cd->columnType.get_type();
1460  auto ti = cd->columnType;
1461  auto val = cd->default_value.value_or("NULL");
1462  CopyParams cp;
1463  switch (type) {
1464  case kBOOLEAN: {
1465  if (!is_null) {
1466  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1467  } else {
1468  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1469  }
1470  break;
1471  }
1472  case kTINYINT: {
1473  if (!is_null) {
1474  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1475  } else {
1477  }
1478  break;
1479  }
1480  case kSMALLINT: {
1481  if (!is_null) {
1482  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1483  } else {
1484  smallint_buffer_->resize(num_rows,
1486  }
1487  break;
1488  }
1489  case kINT: {
1490  if (!is_null) {
1491  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1492  } else {
1493  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1494  }
1495  break;
1496  }
1497  case kBIGINT: {
1498  if (!is_null) {
1499  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1500  } else {
1502  }
1503  break;
1504  }
1505  case kDECIMAL:
1506  case kNUMERIC: {
1507  if (!is_null) {
1508  const auto converted_decimal_value = convert_decimal_value_to_scale(
1509  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1510  bigint_buffer_->resize(num_rows, converted_decimal_value);
1511  } else {
1513  }
1514  break;
1515  }
1516  case kFLOAT:
1517  if (!is_null) {
1518  float_buffer_->resize(num_rows,
1519  static_cast<float>(std::atof(std::string(val).c_str())));
1520  } else {
1521  float_buffer_->resize(num_rows, NULL_FLOAT);
1522  }
1523  break;
1524  case kDOUBLE:
1525  if (!is_null) {
1526  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1527  } else {
1528  double_buffer_->resize(num_rows, NULL_DOUBLE);
1529  }
1530  break;
1531  case kTEXT:
1532  case kVARCHAR:
1533  case kCHAR: {
1534  if (is_null) {
1535  string_buffer_->resize(num_rows, "");
1536  } else {
1537  if (val.length() > StringDictionary::MAX_STRLEN) {
1538  throw std::runtime_error("String too long for column " + cd->columnName +
1539  " was " + std::to_string(val.length()) + " max is " +
1541  }
1542  string_buffer_->resize(num_rows, val);
1543  }
1544  break;
1545  }
1546  case kTIME:
1547  case kTIMESTAMP:
1548  case kDATE:
1549  if (!is_null) {
1550  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1551  } else {
1553  }
1554  break;
1555  case kARRAY: {
1556  if (IS_STRING(ti.get_subtype())) {
1557  std::vector<std::string> string_vec;
1558  // Just parse string array, don't push it to buffer yet as we might throw
1560  std::string(val), cp, string_vec);
1561  if (!is_null) {
1562  // TODO: add support for NULL string arrays
1563  if (ti.get_size() > 0) {
1564  auto sti = ti.get_elem_type();
1565  size_t expected_size = ti.get_size() / sti.get_size();
1566  size_t actual_size = string_vec.size();
1567  if (actual_size != expected_size) {
1568  throw std::runtime_error("Fixed length array column " + cd->columnName +
1569  " expects " + std::to_string(expected_size) +
1570  " values, received " +
1571  std::to_string(actual_size));
1572  }
1573  }
1574  string_array_buffer_->resize(num_rows, string_vec);
1575  } else {
1576  if (ti.get_size() > 0) {
1577  // TODO: remove once NULL fixlen arrays are allowed
1578  throw std::runtime_error("Fixed length array column " + cd->columnName +
1579  " currently cannot accept NULL arrays");
1580  }
1581  // TODO: add support for NULL string arrays, replace with addStringArray(),
1582  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1583  string_array_buffer_->resize(num_rows, string_vec);
1584  }
1585  } else {
1586  if (!is_null) {
1587  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1588  if (d.is_null) { // val could be "NULL"
1589  array_buffer_->resize(num_rows, NullArray(ti));
1590  } else {
1591  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1592  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1593  " has incorrect length: " + std::string(val));
1594  }
1595  array_buffer_->resize(num_rows, d);
1596  }
1597  } else {
1598  array_buffer_->resize(num_rows, NullArray(ti));
1599  }
1600  }
1601  break;
1602  }
1603  case kPOINT:
1604  case kLINESTRING:
1605  case kPOLYGON:
1606  case kMULTIPOLYGON:
1607  geo_string_buffer_->resize(num_rows, val);
1608  break;
1609  default:
1610  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1611  << type;
1612  }
1613 }
1614 
1615 bool importGeoFromLonLat(double lon,
1616  double lat,
1617  std::vector<double>& coords,
1618  SQLTypeInfo& ti) {
1619  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1620  return false;
1621  }
1622  if (ti.transforms()) {
1623  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1624  if (!pt.transform(ti)) {
1625  return false;
1626  }
1627  pt.getColumns(coords);
1628  return true;
1629  }
1630  coords.push_back(lon);
1631  coords.push_back(lat);
1632  return true;
1633 }
1634 
1636  const Catalog_Namespace::Catalog& catalog,
1637  const ColumnDescriptor* cd,
1638  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1639  size_t& col_idx,
1640  std::vector<double>& coords,
1641  std::vector<double>& bounds,
1642  std::vector<int>& ring_sizes,
1643  std::vector<int>& poly_rings,
1644  int render_group) {
1645  const auto col_ti = cd->columnType;
1646  const auto col_type = col_ti.get_type();
1647  auto columnId = cd->columnId;
1648  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1649  bool is_null_geo = false;
1650  bool is_null_point = false;
1651  if (!col_ti.get_notnull()) {
1652  // Check for NULL geo
1653  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1654  is_null_point = true;
1655  coords.clear();
1656  }
1657  is_null_geo = coords.empty();
1658  if (is_null_point) {
1659  coords.push_back(NULL_ARRAY_DOUBLE);
1660  coords.push_back(NULL_DOUBLE);
1661  // Treating POINT coords as notnull, need to store actual encoding
1662  // [un]compressed+[not]null
1663  is_null_geo = false;
1664  }
1665  }
1666  TDatum tdd_coords;
1667  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1668  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1669  // in a fixlen array, compressed and uncompressed.
1670  if (!is_null_geo) {
1671  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1672  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1673  for (auto cc : compressed_coords) {
1674  tdd_coords.val.arr_val.emplace_back();
1675  tdd_coords.val.arr_val.back().val.int_val = cc;
1676  }
1677  }
1678  tdd_coords.is_null = is_null_geo;
1679  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1680 
1681  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1682  // Create ring_sizes array value and add it to the physical column
1683  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1684  TDatum tdd_ring_sizes;
1685  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1686  if (!is_null_geo) {
1687  for (auto ring_size : ring_sizes) {
1688  tdd_ring_sizes.val.arr_val.emplace_back();
1689  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1690  }
1691  }
1692  tdd_ring_sizes.is_null = is_null_geo;
1693  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1694  }
1695 
1696  if (col_type == kMULTIPOLYGON) {
1697  // Create poly_rings array value and add it to the physical column
1698  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1699  TDatum tdd_poly_rings;
1700  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1701  if (!is_null_geo) {
1702  for (auto num_rings : poly_rings) {
1703  tdd_poly_rings.val.arr_val.emplace_back();
1704  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1705  }
1706  }
1707  tdd_poly_rings.is_null = is_null_geo;
1708  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1709  }
1710 
1711  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1712  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1713  TDatum tdd_bounds;
1714  tdd_bounds.val.arr_val.reserve(bounds.size());
1715  if (!is_null_geo) {
1716  for (auto b : bounds) {
1717  tdd_bounds.val.arr_val.emplace_back();
1718  tdd_bounds.val.arr_val.back().val.real_val = b;
1719  }
1720  }
1721  tdd_bounds.is_null = is_null_geo;
1722  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1723  }
1724 
1725  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1726  // Create render_group value and add it to the physical column
1727  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1728  TDatum td_render_group;
1729  td_render_group.val.int_val = render_group;
1730  td_render_group.is_null = is_null_geo;
1731  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1732  }
1733 }
1734 
1736  const Catalog_Namespace::Catalog& catalog,
1737  const ColumnDescriptor* cd,
1738  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1739  size_t& col_idx,
1740  std::vector<std::vector<double>>& coords_column,
1741  std::vector<std::vector<double>>& bounds_column,
1742  std::vector<std::vector<int>>& ring_sizes_column,
1743  std::vector<std::vector<int>>& poly_rings_column,
1744  std::vector<int>& render_groups_column) {
1745  const auto col_ti = cd->columnType;
1746  const auto col_type = col_ti.get_type();
1747  auto columnId = cd->columnId;
1748 
1749  auto coords_row_count = coords_column.size();
1750  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1751  for (auto& coords : coords_column) {
1752  bool is_null_geo = false;
1753  bool is_null_point = false;
1754  if (!col_ti.get_notnull()) {
1755  // Check for NULL geo
1756  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1757  is_null_point = true;
1758  coords.clear();
1759  }
1760  is_null_geo = coords.empty();
1761  if (is_null_point) {
1762  coords.push_back(NULL_ARRAY_DOUBLE);
1763  coords.push_back(NULL_DOUBLE);
1764  // Treating POINT coords as notnull, need to store actual encoding
1765  // [un]compressed+[not]null
1766  is_null_geo = false;
1767  }
1768  }
1769  std::vector<TDatum> td_coords_data;
1770  if (!is_null_geo) {
1771  std::vector<uint8_t> compressed_coords =
1772  Geospatial::compress_coords(coords, col_ti);
1773  for (auto const& cc : compressed_coords) {
1774  TDatum td_byte;
1775  td_byte.val.int_val = cc;
1776  td_coords_data.push_back(td_byte);
1777  }
1778  }
1779  TDatum tdd_coords;
1780  tdd_coords.val.arr_val = td_coords_data;
1781  tdd_coords.is_null = is_null_geo;
1782  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1783  }
1784  col_idx++;
1785 
1786  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1787  if (ring_sizes_column.size() != coords_row_count) {
1788  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1789  }
1790  // Create ring_sizes array value and add it to the physical column
1791  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1792  for (auto const& ring_sizes : ring_sizes_column) {
1793  bool is_null_geo = false;
1794  if (!col_ti.get_notnull()) {
1795  // Check for NULL geo
1796  is_null_geo = ring_sizes.empty();
1797  }
1798  std::vector<TDatum> td_ring_sizes;
1799  for (auto const& ring_size : ring_sizes) {
1800  TDatum td_ring_size;
1801  td_ring_size.val.int_val = ring_size;
1802  td_ring_sizes.push_back(td_ring_size);
1803  }
1804  TDatum tdd_ring_sizes;
1805  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1806  tdd_ring_sizes.is_null = is_null_geo;
1807  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1808  }
1809  col_idx++;
1810  }
1811 
1812  if (col_type == kMULTIPOLYGON) {
1813  if (poly_rings_column.size() != coords_row_count) {
1814  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1815  }
1816  // Create poly_rings array value and add it to the physical column
1817  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1818  for (auto const& poly_rings : poly_rings_column) {
1819  bool is_null_geo = false;
1820  if (!col_ti.get_notnull()) {
1821  // Check for NULL geo
1822  is_null_geo = poly_rings.empty();
1823  }
1824  std::vector<TDatum> td_poly_rings;
1825  for (auto const& num_rings : poly_rings) {
1826  TDatum td_num_rings;
1827  td_num_rings.val.int_val = num_rings;
1828  td_poly_rings.push_back(td_num_rings);
1829  }
1830  TDatum tdd_poly_rings;
1831  tdd_poly_rings.val.arr_val = td_poly_rings;
1832  tdd_poly_rings.is_null = is_null_geo;
1833  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1834  }
1835  col_idx++;
1836  }
1837 
1838  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1839  if (bounds_column.size() != coords_row_count) {
1840  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1841  }
1842  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1843  for (auto const& bounds : bounds_column) {
1844  bool is_null_geo = false;
1845  if (!col_ti.get_notnull()) {
1846  // Check for NULL geo
1847  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1848  }
1849  std::vector<TDatum> td_bounds_data;
1850  for (auto const& b : bounds) {
1851  TDatum td_double;
1852  td_double.val.real_val = b;
1853  td_bounds_data.push_back(td_double);
1854  }
1855  TDatum tdd_bounds;
1856  tdd_bounds.val.arr_val = td_bounds_data;
1857  tdd_bounds.is_null = is_null_geo;
1858  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1859  }
1860  col_idx++;
1861  }
1862 
1863  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1864  // Create render_group value and add it to the physical column
1865  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1866  for (auto const& render_group : render_groups_column) {
1867  TDatum td_render_group;
1868  td_render_group.val.int_val = render_group;
1869  td_render_group.is_null = false;
1870  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1871  }
1872  col_idx++;
1873  }
1874 }
1875 
1876 namespace {
1877 
1878 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1879  const std::list<const ColumnDescriptor*>& col_descs) {
1880  // validate the columns
1881  // for now we can only explode into a single destination column
1882  // which must be of the child type (POLYGON, LINESTRING, POINT)
1883  int collection_col_idx = -1;
1884  int col_idx = 0;
1885  std::string collection_col_name;
1886  SQLTypes collection_child_type = kNULLT;
1887  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1888  auto const& cd = *cd_it;
1889  auto const col_type = cd->columnType.get_type();
1890  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1891  if (collection_col_idx >= 0) {
1892  throw std::runtime_error(
1893  "Explode Collections: Found more than one destination column");
1894  }
1895  collection_col_idx = col_idx;
1896  collection_child_type = col_type;
1897  collection_col_name = cd->columnName;
1898  }
1899  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1900  ++cd_it;
1901  }
1902  col_idx++;
1903  }
1904  if (collection_col_idx < 0) {
1905  throw std::runtime_error(
1906  "Explode Collections: Failed to find a supported column type to explode "
1907  "into");
1908  }
1909  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1910 }
1911 
1913  OGRGeometry* ogr_geometry,
1914  SQLTypes collection_child_type,
1915  const std::string& collection_col_name,
1916  size_t row_or_feature_idx,
1917  std::function<void(OGRGeometry*)> execute_import_lambda) {
1918  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1919  bool is_collection = false;
1920  switch (collection_child_type) {
1921  case kPOINT:
1922  switch (ogr_geometry_type) {
1923  case wkbMultiPoint:
1924  is_collection = true;
1925  break;
1926  case wkbPoint:
1927  break;
1928  default:
1929  throw std::runtime_error(
1930  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1931  }
1932  break;
1933  case kLINESTRING:
1934  switch (ogr_geometry_type) {
1935  case wkbMultiLineString:
1936  is_collection = true;
1937  break;
1938  case wkbLineString:
1939  break;
1940  default:
1941  throw std::runtime_error(
1942  "Explode Collections: Source geo type must be MULTILINESTRING or "
1943  "LINESTRING");
1944  }
1945  break;
1946  case kPOLYGON:
1947  switch (ogr_geometry_type) {
1948  case wkbMultiPolygon:
1949  is_collection = true;
1950  break;
1951  case wkbPolygon:
1952  break;
1953  default:
1954  throw std::runtime_error(
1955  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1956  }
1957  break;
1958  default:
1959  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1960  }
1961 
1962  int64_t us = 0LL;
1963 
1964  // explode or just import
1965  if (is_collection) {
1966  // cast to collection
1967  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1968  CHECK(collection_geometry);
1969 
1970 #if LOG_EXPLODE_COLLECTIONS
1971  // log number of children
1972  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1973  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1974  << " child rows";
1975 #endif
1976 
1977  // loop over children
1978  uint32_t child_geometry_count = 0;
1979  auto child_geometry_it = collection_geometry->begin();
1980  while (child_geometry_it != collection_geometry->end()) {
1981  // get and import this child
1982  OGRGeometry* import_geometry = *child_geometry_it;
1984  [&] { execute_import_lambda(import_geometry); });
1985 
1986  // next child
1987  child_geometry_it++;
1988  child_geometry_count++;
1989  }
1990  } else {
1991  // import non-collection row just once
1993  [&] { execute_import_lambda(ogr_geometry); });
1994  }
1995 
1996  // done
1997  return us;
1998 }
1999 
2000 } // namespace
2001 
2003  int thread_id,
2004  Importer* importer,
2005  std::unique_ptr<char[]> scratch_buffer,
2006  size_t begin_pos,
2007  size_t end_pos,
2008  size_t total_size,
2009  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2010  size_t first_row_index_this_buffer,
2011  const Catalog_Namespace::SessionInfo* session_info,
2012  Executor* executor) {
2013  ImportStatus thread_import_status;
2014  int64_t total_get_row_time_us = 0;
2015  int64_t total_str_to_val_time_us = 0;
2016  auto query_session = session_info ? session_info->get_session_id() : "";
2017  CHECK(scratch_buffer);
2018  auto buffer = scratch_buffer.get();
2019  auto load_ms = measure<>::execution([]() {});
2020 
2021  thread_import_status.thread_id = thread_id;
2022 
2023  auto ms = measure<>::execution([&]() {
2024  const CopyParams& copy_params = importer->get_copy_params();
2025  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2026  size_t begin =
2027  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2028  const char* thread_buf = buffer + begin_pos + begin;
2029  const char* thread_buf_end = buffer + end_pos;
2030  const char* buf_end = buffer + total_size;
2031  bool try_single_thread = false;
2032  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2033  importer->get_import_buffers(thread_id);
2035  int phys_cols = 0;
2036  int point_cols = 0;
2037  for (const auto cd : col_descs) {
2038  const auto& col_ti = cd->columnType;
2039  phys_cols += col_ti.get_physical_cols();
2040  if (cd->columnType.get_type() == kPOINT) {
2041  point_cols++;
2042  }
2043  }
2044  auto num_cols = col_descs.size() - phys_cols;
2045  for (const auto& p : import_buffers) {
2046  p->clear();
2047  }
2048  std::vector<std::string_view> row;
2049  size_t row_index_plus_one = 0;
2050  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2051  row.clear();
2052  std::vector<std::unique_ptr<char[]>>
2053  tmp_buffers; // holds string w/ removed escape chars, etc
2054  if (DEBUG_TIMING) {
2057  thread_buf_end,
2058  buf_end,
2059  copy_params,
2060  importer->get_is_array(),
2061  row,
2062  tmp_buffers,
2063  try_single_thread,
2064  true);
2065  });
2066  total_get_row_time_us += us;
2067  } else {
2069  thread_buf_end,
2070  buf_end,
2071  copy_params,
2072  importer->get_is_array(),
2073  row,
2074  tmp_buffers,
2075  try_single_thread,
2076  true);
2077  }
2078  row_index_plus_one++;
2079  // Each POINT could consume two separate coords instead of a single WKT
2080  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2081  thread_import_status.rows_rejected++;
2082  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2083  << row.size() << "): " << shared::printContainer(row);
2084  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2085  break;
2086  }
2087  continue;
2088  }
2089 
2090  //
2091  // lambda for importing a row (perhaps multiple times if exploding a collection)
2092  //
2093 
2094  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2095  size_t import_idx = 0;
2096  size_t col_idx = 0;
2097  try {
2098  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2099  auto cd = *cd_it;
2100  const auto& col_ti = cd->columnType;
2101 
2102  bool is_null =
2103  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
2104  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2105  // So initially nullness may be missed and not passed to add_value,
2106  // which then might also check and still decide it's actually a NULL, e.g.
2107  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2108  // So "NULL" is not recognized as NULL but then it's not recognized as
2109  // a valid kINT, so it's a NULL after all.
2110  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2111 
2112  // Treating empty as NULL
2113  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2114  is_null = true;
2115  }
2116 
2117  if (col_ti.get_physical_cols() == 0) {
2118  // not geo
2119 
2120  import_buffers[col_idx]->add_value(
2121  cd, row[import_idx], is_null, copy_params);
2122 
2123  // next
2124  ++import_idx;
2125  ++col_idx;
2126  } else {
2127  // geo
2128 
2129  // store null string in the base column
2130  import_buffers[col_idx]->add_value(
2131  cd, copy_params.null_str, true, copy_params);
2132 
2133  // WKT from string we're not storing
2134  auto const& geo_string = row[import_idx];
2135 
2136  // next
2137  ++import_idx;
2138  ++col_idx;
2139 
2140  SQLTypes col_type = col_ti.get_type();
2141  CHECK(IS_GEO(col_type));
2142 
2143  std::vector<double> coords;
2144  std::vector<double> bounds;
2145  std::vector<int> ring_sizes;
2146  std::vector<int> poly_rings;
2147  int render_group = 0;
2148 
2149  // if this is a POINT column, and the field is not null, and
2150  // looks like a scalar numeric value (and not a hex blob)
2151  // attempt to import two columns as lon/lat (or lat/lon)
2152  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
2153  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2154  geo_string[0] == '-') &&
2155  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2156  double lon = std::atof(std::string(geo_string).c_str());
2157  double lat = NAN;
2158  auto lat_str = row[import_idx];
2159  ++import_idx;
2160  if (lat_str.size() > 0 &&
2161  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2162  lat = std::atof(std::string(lat_str).c_str());
2163  }
2164  // Swap coordinates if this table uses a reverse order: lat/lon
2165  if (!copy_params.lonlat) {
2166  std::swap(lat, lon);
2167  }
2168  // TODO: should check if POINT column should have been declared with
2169  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2170  // throw std::runtime_error("POINT column " + cd->columnName + " is
2171  // not WGS84, cannot insert lon/lat");
2172  // }
2173  SQLTypeInfo import_ti{col_ti};
2174  if (copy_params.source_type ==
2176  import_ti.get_output_srid() == 4326) {
2177  auto srid0 = copy_params.source_srid;
2178  if (srid0 > 0) {
2179  // srid0 -> 4326 transform is requested on import
2180  import_ti.set_input_srid(srid0);
2181  }
2182  }
2183  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2184  throw std::runtime_error(
2185  "Cannot read lon/lat to insert into POINT column " +
2186  cd->columnName);
2187  }
2188  } else {
2189  // import it
2190  SQLTypeInfo import_ti{col_ti};
2191  if (copy_params.source_type ==
2193  import_ti.get_output_srid() == 4326) {
2194  auto srid0 = copy_params.source_srid;
2195  if (srid0 > 0) {
2196  // srid0 -> 4326 transform is requested on import
2197  import_ti.set_input_srid(srid0);
2198  }
2199  }
2200  if (is_null) {
2201  if (col_ti.get_notnull()) {
2202  throw std::runtime_error("NULL geo for column " + cd->columnName);
2203  }
2205  import_ti,
2206  coords,
2207  bounds,
2208  ring_sizes,
2209  poly_rings,
2211  } else {
2212  if (import_geometry) {
2213  // geometry already exploded
2215  import_geometry,
2216  import_ti,
2217  coords,
2218  bounds,
2219  ring_sizes,
2220  poly_rings,
2222  std::string msg =
2223  "Failed to extract valid geometry from exploded row " +
2224  std::to_string(first_row_index_this_buffer +
2225  row_index_plus_one) +
2226  " for column " + cd->columnName;
2227  throw std::runtime_error(msg);
2228  }
2229  } else {
2230  // extract geometry directly from WKT
2232  std::string(geo_string),
2233  import_ti,
2234  coords,
2235  bounds,
2236  ring_sizes,
2237  poly_rings,
2239  std::string msg = "Failed to extract valid geometry from row " +
2240  std::to_string(first_row_index_this_buffer +
2241  row_index_plus_one) +
2242  " for column " + cd->columnName;
2243  throw std::runtime_error(msg);
2244  }
2245  }
2246 
2247  // validate types
2248  if (col_type != import_ti.get_type()) {
2250  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2251  col_type == SQLTypes::kMULTIPOLYGON)) {
2252  throw std::runtime_error(
2253  "Imported geometry doesn't match the type of column " +
2254  cd->columnName);
2255  }
2256  }
2257  }
2258 
2259  // assign render group?
2260  if (columnIdToRenderGroupAnalyzerMap.size()) {
2261  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2262  if (ring_sizes.size()) {
2263  // get a suitable render group for these poly coords
2264  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2265  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2266  render_group =
2267  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2268  } else {
2269  // empty poly
2270  render_group = -1;
2271  }
2272  }
2273  }
2274  }
2275 
2276  // import extracted geo
2278  cd,
2279  import_buffers,
2280  col_idx,
2281  coords,
2282  bounds,
2283  ring_sizes,
2284  poly_rings,
2285  render_group);
2286 
2287  // skip remaining physical columns
2288  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2289  ++cd_it;
2290  }
2291  }
2292  }
2293  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2294  check_session_interrupted(query_session, executor))) {
2295  thread_import_status.load_failed = true;
2296  thread_import_status.load_msg =
2297  "Table load was cancelled via Query Interrupt";
2298  return;
2299  }
2300  thread_import_status.rows_completed++;
2301  } catch (const std::exception& e) {
2302  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2303  import_buffers[col_idx_to_pop]->pop_value();
2304  }
2305  thread_import_status.rows_rejected++;
2306  LOG(ERROR) << "Input exception thrown: " << e.what()
2307  << ". Row discarded. Data: " << shared::printContainer(row);
2308  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2309  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2310  thread_import_status.load_failed = true;
2311  thread_import_status.load_msg =
2312  "Load was cancelled due to max reject rows being reached";
2313  }
2314  }
2315  }; // End of lambda
2316 
2317  if (copy_params.geo_explode_collections) {
2318  // explode and import
2319  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2320  explode_collections_step1(col_descs);
2321  // pull out the collection WKT or WKB hex
2322  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2323  auto const& collection_geo_string = row[collection_col_idx];
2324  // convert to OGR
2325  OGRGeometry* ogr_geometry = nullptr;
2326  ScopeGuard destroy_ogr_geometry = [&] {
2327  if (ogr_geometry) {
2328  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2329  }
2330  };
2332  std::string(collection_geo_string));
2333  // do the explode and import
2334  us = explode_collections_step2(ogr_geometry,
2335  collection_child_type,
2336  collection_col_name,
2337  first_row_index_this_buffer + row_index_plus_one,
2338  execute_import_row);
2339  } else {
2340  // import non-collection row just once
2342  [&] { execute_import_row(nullptr); });
2343  }
2344 
2345  if (thread_import_status.load_failed) {
2346  break;
2347  }
2348  } // end thread
2349  total_str_to_val_time_us += us;
2350  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2351  load_ms = measure<>::execution([&]() {
2352  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2353  });
2354  }
2355  }); // end execution
2356 
2357  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2358  thread_import_status.rows_completed > 0) {
2359  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2360  << thread_import_status.rows_completed << " rows inserted in "
2361  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2362  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2363  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2364  << "sec" << std::endl;
2365  }
2366 
2367  return thread_import_status;
2368 }
2369 
2370 class ColumnNotGeoError : public std::runtime_error {
2371  public:
2372  ColumnNotGeoError(const std::string& column_name)
2373  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2374 };
2375 
2377  int thread_id,
2378  Importer* importer,
2379  OGRSpatialReference* poGeographicSR,
2380  const FeaturePtrVector& features,
2381  size_t firstFeature,
2382  size_t numFeatures,
2383  const FieldNameToIndexMapType& fieldNameToIndexMap,
2384  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2385  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2386  const Catalog_Namespace::SessionInfo* session_info,
2387  Executor* executor,
2388  const MetadataColumnInfos& metadata_column_infos) {
2389  ImportStatus thread_import_status;
2390  const CopyParams& copy_params = importer->get_copy_params();
2391  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2392  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2393  importer->get_import_buffers(thread_id);
2394  auto query_session = session_info ? session_info->get_session_id() : "";
2395  for (const auto& p : import_buffers) {
2396  p->clear();
2397  }
2398 
2399  auto convert_timer = timer_start();
2400 
2401  // we create this on the fly based on the first feature's SR
2402  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2403  bool checked_transformation{false};
2404 
2405  // for all the features in this chunk...
2406  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2407  // ignore null features
2408  if (!features[iFeature]) {
2409  continue;
2410  }
2411 
2412  // get this feature's geometry
2413  // for geodatabase, we need to consider features with no geometry
2414  // as we still want to create a table, even if it has no geo column
2415  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2416  if (pGeometry) {
2417  // check if we need to make a CoordinateTransformation
2418  // we assume that all features in this chunk will have
2419  // the same source SR, so the CT will be valid for all
2420  // transforming to a reusable CT is faster than to an SR
2421  if (!checked_transformation) {
2422  // get the SR of the incoming geo
2423  auto geometry_sr = pGeometry->getSpatialReference();
2424  // if the SR is non-null and non-empty and different from what we want
2425  // we need to make a reusable CoordinateTransformation
2426  if (geometry_sr &&
2427 #if GDAL_VERSION_MAJOR >= 3
2428  !geometry_sr->IsEmpty() &&
2429 #endif
2430  !geometry_sr->IsSame(poGeographicSR)) {
2431  // validate the SR before trying to use it
2432  if (geometry_sr->Validate() != OGRERR_NONE) {
2433  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
2434  }
2435  // create the OGRCoordinateTransformation that will be used for
2436  // all the features in this chunk
2437  if (coordinate_transformation == nullptr) {
2438  coordinate_transformation.reset(
2439  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2440  if (coordinate_transformation == nullptr) {
2441  throw std::runtime_error(
2442  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2443  }
2444  }
2445  }
2446  checked_transformation = true;
2447  }
2448 
2449  // if we have a transformation, use it
2450  if (coordinate_transformation) {
2451  pGeometry->transform(coordinate_transformation.get());
2452  }
2453  }
2454 
2455  //
2456  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2457  //
2458 
2459  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2460  size_t col_idx = 0;
2461  try {
2462  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2463  check_session_interrupted(query_session, executor))) {
2464  thread_import_status.load_failed = true;
2465  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2467  }
2468 
2469  uint32_t field_column_count{0u};
2470  uint32_t metadata_column_count{0u};
2471 
2472  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2473  auto cd = *cd_it;
2474 
2475  // is this a geo column?
2476  const auto& col_ti = cd->columnType;
2477  if (col_ti.is_geometry()) {
2478  // Note that this assumes there is one and only one geo column in the
2479  // table. Currently, the importer only supports reading a single
2480  // geospatial feature from an input shapefile / geojson file, but this
2481  // code will need to be modified if that changes
2482  SQLTypes col_type = col_ti.get_type();
2483 
2484  // store null string in the base column
2485  import_buffers[col_idx]->add_value(
2486  cd, copy_params.null_str, true, copy_params);
2487  ++col_idx;
2488 
2489  // the data we now need to extract for the other columns
2490  std::vector<double> coords;
2491  std::vector<double> bounds;
2492  std::vector<int> ring_sizes;
2493  std::vector<int> poly_rings;
2494  int render_group = 0;
2495 
2496  // extract it
2497  SQLTypeInfo import_ti{col_ti};
2498  bool is_null_geo = !import_geometry;
2499  if (is_null_geo) {
2500  if (col_ti.get_notnull()) {
2501  throw std::runtime_error("NULL geo for column " + cd->columnName);
2502  }
2504  import_ti,
2505  coords,
2506  bounds,
2507  ring_sizes,
2508  poly_rings,
2510  } else {
2512  import_geometry,
2513  import_ti,
2514  coords,
2515  bounds,
2516  ring_sizes,
2517  poly_rings,
2519  std::string msg = "Failed to extract valid geometry from feature " +
2520  std::to_string(firstFeature + iFeature + 1) +
2521  " for column " + cd->columnName;
2522  throw std::runtime_error(msg);
2523  }
2524 
2525  // validate types
2526  if (col_type != import_ti.get_type()) {
2528  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2529  col_type == SQLTypes::kMULTIPOLYGON)) {
2530  throw std::runtime_error(
2531  "Imported geometry doesn't match the type of column " +
2532  cd->columnName);
2533  }
2534  }
2535  }
2536 
2537  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2538  if (ring_sizes.size()) {
2539  // get a suitable render group for these poly coords
2540  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2541  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2542  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2543  } else {
2544  // empty poly
2545  render_group = -1;
2546  }
2547  }
2548 
2549  // create coords array value and add it to the physical column
2550  ++cd_it;
2551  auto cd_coords = *cd_it;
2552  std::vector<TDatum> td_coord_data;
2553  if (!is_null_geo) {
2554  std::vector<uint8_t> compressed_coords =
2555  Geospatial::compress_coords(coords, col_ti);
2556  for (auto cc : compressed_coords) {
2557  TDatum td_byte;
2558  td_byte.val.int_val = cc;
2559  td_coord_data.push_back(td_byte);
2560  }
2561  }
2562  TDatum tdd_coords;
2563  tdd_coords.val.arr_val = td_coord_data;
2564  tdd_coords.is_null = is_null_geo;
2565  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2566  ++col_idx;
2567 
2568  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2569  // Create ring_sizes array value and add it to the physical column
2570  ++cd_it;
2571  auto cd_ring_sizes = *cd_it;
2572  std::vector<TDatum> td_ring_sizes;
2573  if (!is_null_geo) {
2574  for (auto ring_size : ring_sizes) {
2575  TDatum td_ring_size;
2576  td_ring_size.val.int_val = ring_size;
2577  td_ring_sizes.push_back(td_ring_size);
2578  }
2579  }
2580  TDatum tdd_ring_sizes;
2581  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2582  tdd_ring_sizes.is_null = is_null_geo;
2583  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2584  ++col_idx;
2585  }
2586 
2587  if (col_type == kMULTIPOLYGON) {
2588  // Create poly_rings array value and add it to the physical column
2589  ++cd_it;
2590  auto cd_poly_rings = *cd_it;
2591  std::vector<TDatum> td_poly_rings;
2592  if (!is_null_geo) {
2593  for (auto num_rings : poly_rings) {
2594  TDatum td_num_rings;
2595  td_num_rings.val.int_val = num_rings;
2596  td_poly_rings.push_back(td_num_rings);
2597  }
2598  }
2599  TDatum tdd_poly_rings;
2600  tdd_poly_rings.val.arr_val = td_poly_rings;
2601  tdd_poly_rings.is_null = is_null_geo;
2602  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2603  ++col_idx;
2604  }
2605 
2606  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2607  col_type == kMULTIPOLYGON) {
2608  // Create bounds array value and add it to the physical column
2609  ++cd_it;
2610  auto cd_bounds = *cd_it;
2611  std::vector<TDatum> td_bounds_data;
2612  if (!is_null_geo) {
2613  for (auto b : bounds) {
2614  TDatum td_double;
2615  td_double.val.real_val = b;
2616  td_bounds_data.push_back(td_double);
2617  }
2618  }
2619  TDatum tdd_bounds;
2620  tdd_bounds.val.arr_val = td_bounds_data;
2621  tdd_bounds.is_null = is_null_geo;
2622  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2623  ++col_idx;
2624  }
2625 
2626  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2627  // Create render_group value and add it to the physical column
2628  ++cd_it;
2629  auto cd_render_group = *cd_it;
2630  TDatum td_render_group;
2631  td_render_group.val.int_val = render_group;
2632  td_render_group.is_null = is_null_geo;
2633  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2634  ++col_idx;
2635  }
2636  } else if (field_column_count < fieldNameToIndexMap.size()) {
2637  //
2638  // field column
2639  //
2640  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2641  CHECK(cit != columnNameToSourceNameMap.end());
2642  auto const& field_name = cit->second;
2643 
2644  auto const fit = fieldNameToIndexMap.find(field_name);
2645  if (fit == fieldNameToIndexMap.end()) {
2646  throw ColumnNotGeoError(cd->columnName);
2647  }
2648 
2649  auto const& field_index = fit->second;
2650  CHECK(field_index < fieldNameToIndexMap.size());
2651 
2652  auto const& feature = features[iFeature];
2653 
2654  auto field_defn = feature->GetFieldDefnRef(field_index);
2655  CHECK(field_defn);
2656 
2657  // OGRFeature::GetFieldAsString() can only return 80 characters
2658  // so for array columns, we are obliged to fetch the actual values
2659  // and construct the concatenated string ourselves
2660 
2661  std::string value_string;
2662  int array_index = 0, array_size = 0;
2663 
2664  auto stringify_numeric_list = [&](auto* values) {
2665  value_string = "{";
2666  while (array_index < array_size) {
2667  auto separator = (array_index > 0) ? "," : "";
2668  value_string += separator + std::to_string(values[array_index]);
2669  array_index++;
2670  }
2671  value_string += "}";
2672  };
2673 
2674  auto field_type = field_defn->GetType();
2675  switch (field_type) {
2676  case OFTInteger:
2677  case OFTInteger64:
2678  case OFTReal:
2679  case OFTString:
2680  case OFTBinary:
2681  case OFTDate:
2682  case OFTTime:
2683  case OFTDateTime: {
2684  value_string = feature->GetFieldAsString(field_index);
2685  } break;
2686  case OFTIntegerList: {
2687  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2688  stringify_numeric_list(values);
2689  } break;
2690  case OFTInteger64List: {
2691  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2692  stringify_numeric_list(values);
2693  } break;
2694  case OFTRealList: {
2695  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2696  stringify_numeric_list(values);
2697  } break;
2698  case OFTStringList: {
2699  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2700  value_string = "{";
2701  if (array_of_strings) {
2702  while (auto* this_string = array_of_strings[array_index]) {
2703  auto separator = (array_index > 0) ? "," : "";
2704  value_string += separator + std::string(this_string);
2705  array_index++;
2706  }
2707  }
2708  value_string += "}";
2709  } break;
2710  default:
2711  throw std::runtime_error("Unsupported geo file field type (" +
2712  std::to_string(static_cast<int>(field_type)) +
2713  ")");
2714  }
2715 
2716  import_buffers[col_idx]->add_value(cd, value_string, false, copy_params);
2717  ++col_idx;
2718  field_column_count++;
2719  } else if (metadata_column_count < metadata_column_infos.size()) {
2720  //
2721  // metadata column
2722  //
2723  auto const& mci = metadata_column_infos[metadata_column_count];
2724  if (mci.column_descriptor.columnName != cd->columnName) {
2725  throw std::runtime_error("Metadata column name mismatch");
2726  }
2727  import_buffers[col_idx]->add_value(cd, mci.value, false, copy_params);
2728  ++col_idx;
2729  metadata_column_count++;
2730  } else {
2731  throw std::runtime_error("Column count mismatch");
2732  }
2733  }
2734  thread_import_status.rows_completed++;
2735  } catch (QueryExecutionError& e) {
2737  throw e;
2738  }
2739  } catch (ColumnNotGeoError& e) {
2740  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2741  throw std::runtime_error(e.what());
2742  } catch (const std::exception& e) {
2743  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2744  import_buffers[col_idx_to_pop]->pop_value();
2745  }
2746  thread_import_status.rows_rejected++;
2747  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2748  }
2749  };
2750 
2751  if (pGeometry && copy_params.geo_explode_collections) {
2752  // explode and import
2753  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2754  explode_collections_step1(col_descs);
2755  explode_collections_step2(pGeometry,
2756  collection_child_type,
2757  collection_col_name,
2758  firstFeature + iFeature + 1,
2759  execute_import_feature);
2760  } else {
2761  // import non-collection or null feature just once
2762  execute_import_feature(pGeometry);
2763  }
2764  } // end features
2765 
2766  float convert_s = TIMER_STOP(convert_timer);
2767 
2768  float load_s = 0.0f;
2769  if (thread_import_status.rows_completed > 0) {
2770  auto load_timer = timer_start();
2771  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2772  load_s = TIMER_STOP(load_timer);
2773  }
2774 
2775  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2776  LOG(INFO) << "DEBUG: Process " << convert_s << "s";
2777  LOG(INFO) << "DEBUG: Load " << load_s << "s";
2778  LOG(INFO) << "DEBUG: Total " << (convert_s + load_s) << "s";
2779  }
2780 
2781  thread_import_status.thread_id = thread_id;
2782 
2783  return thread_import_status;
2784 }
2785 
2787  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2788  size_t row_count,
2789  const Catalog_Namespace::SessionInfo* session_info) {
2790  return loadImpl(import_buffers, row_count, false, session_info);
2791 }
2792 
2793 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2794  size_t row_count,
2795  const Catalog_Namespace::SessionInfo* session_info) {
2796  return loadImpl(import_buffers, row_count, true, session_info);
2797 }
2798 
2799 namespace {
2800 
2801 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2802  const auto& ti = import_buffer.getTypeInfo();
2803  const int8_t* values_buffer{nullptr};
2804  if (ti.is_string()) {
2805  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2806  values_buffer = import_buffer.getStringDictBuffer();
2807  } else {
2808  values_buffer = import_buffer.getAsBytes();
2809  }
2810  CHECK(values_buffer);
2811  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2812  switch (logical_size) {
2813  case 1: {
2814  return values_buffer[index];
2815  }
2816  case 2: {
2817  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2818  }
2819  case 4: {
2820  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2821  }
2822  case 8: {
2823  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2824  }
2825  default:
2826  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2827  }
2828  UNREACHABLE();
2829  return 0;
2830 }
2831 
2832 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2833  const auto& ti = import_buffer.getTypeInfo();
2834  CHECK_EQ(kFLOAT, ti.get_type());
2835  const auto values_buffer = import_buffer.getAsBytes();
2836  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2837 }
2838 
2839 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2840  const auto& ti = import_buffer.getTypeInfo();
2841  CHECK_EQ(kDOUBLE, ti.get_type());
2842  const auto values_buffer = import_buffer.getAsBytes();
2843  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2844 }
2845 
2846 } // namespace
2847 
2848 void Loader::fillShardRow(const size_t row_index,
2849  OneShardBuffers& shard_output_buffers,
2850  const OneShardBuffers& import_buffers) {
2851  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2852  const auto& input_buffer = import_buffers[col_idx];
2853  const auto& col_ti = input_buffer->getTypeInfo();
2854  const auto type =
2855  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2856 
2857  switch (type) {
2858  case kBOOLEAN:
2859  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2860  break;
2861  case kTINYINT:
2862  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2863  break;
2864  case kSMALLINT:
2865  shard_output_buffers[col_idx]->addSmallint(
2866  int_value_at(*input_buffer, row_index));
2867  break;
2868  case kINT:
2869  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2870  break;
2871  case kBIGINT:
2872  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2873  break;
2874  case kFLOAT:
2875  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2876  break;
2877  case kDOUBLE:
2878  shard_output_buffers[col_idx]->addDouble(
2879  double_value_at(*input_buffer, row_index));
2880  break;
2881  case kTEXT:
2882  case kVARCHAR:
2883  case kCHAR: {
2884  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2885  shard_output_buffers[col_idx]->addString(
2886  (*input_buffer->getStringBuffer())[row_index]);
2887  break;
2888  }
2889  case kTIME:
2890  case kTIMESTAMP:
2891  case kDATE:
2892  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2893  break;
2894  case kARRAY:
2895  if (IS_STRING(col_ti.get_subtype())) {
2896  CHECK(input_buffer->getStringArrayBuffer());
2897  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2898  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2899  shard_output_buffers[col_idx]->addStringArray(input_arr);
2900  } else {
2901  shard_output_buffers[col_idx]->addArray(
2902  (*input_buffer->getArrayBuffer())[row_index]);
2903  }
2904  break;
2905  case kPOINT:
2906  case kLINESTRING:
2907  case kPOLYGON:
2908  case kMULTIPOLYGON: {
2909  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2910  shard_output_buffers[col_idx]->addGeoString(
2911  (*input_buffer->getGeoStringBuffer())[row_index]);
2912  break;
2913  }
2914  default:
2915  CHECK(false);
2916  }
2917  }
2918 }
2919 
2921  std::vector<OneShardBuffers>& all_shard_import_buffers,
2922  std::vector<size_t>& all_shard_row_counts,
2923  const OneShardBuffers& import_buffers,
2924  const size_t row_count,
2925  const size_t shard_count,
2926  const Catalog_Namespace::SessionInfo* session_info) {
2927  int col_idx{0};
2928  const ColumnDescriptor* shard_col_desc{nullptr};
2929  for (const auto col_desc : column_descs_) {
2930  ++col_idx;
2931  if (col_idx == table_desc_->shardedColumnId) {
2932  shard_col_desc = col_desc;
2933  break;
2934  }
2935  }
2936  CHECK(shard_col_desc);
2937  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2938  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2939  const auto& shard_col_ti = shard_col_desc->columnType;
2940  CHECK(shard_col_ti.is_integer() ||
2941  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2942  shard_col_ti.is_time());
2943  if (shard_col_ti.is_string()) {
2944  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2945  CHECK(payloads_ptr);
2946  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2947  }
2948 
2949  for (size_t i = 0; i < row_count; ++i) {
2950  const size_t shard =
2951  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2952  auto& shard_output_buffers = all_shard_import_buffers[shard];
2953  fillShardRow(i, shard_output_buffers, import_buffers);
2954  ++all_shard_row_counts[shard];
2955  }
2956 }
2957 
2959  std::vector<OneShardBuffers>& all_shard_import_buffers,
2960  std::vector<size_t>& all_shard_row_counts,
2961  const OneShardBuffers& import_buffers,
2962  const size_t row_count,
2963  const size_t shard_count,
2964  const Catalog_Namespace::SessionInfo* session_info) {
2965  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2966  CHECK(shard_tds.size() == shard_count);
2967 
2968  for (size_t shard = 0; shard < shard_count; ++shard) {
2969  auto& shard_output_buffers = all_shard_import_buffers[shard];
2970  if (row_count != 0) {
2971  fillShardRow(0, shard_output_buffers, import_buffers);
2972  }
2973  // when replicating a column, row count of a shard == replicate count of the column
2974  // on the shard
2975  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2976  }
2977 }
2978 
2979 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2980  std::vector<size_t>& all_shard_row_counts,
2981  const OneShardBuffers& import_buffers,
2982  const size_t row_count,
2983  const size_t shard_count,
2984  const Catalog_Namespace::SessionInfo* session_info) {
2985  all_shard_row_counts.resize(shard_count);
2986  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2987  all_shard_import_buffers.emplace_back();
2988  for (const auto& typed_import_buffer : import_buffers) {
2989  all_shard_import_buffers.back().emplace_back(
2990  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2991  typed_import_buffer->getStringDictionary()));
2992  }
2993  }
2995  if (isAddingColumns()) {
2996  distributeToShardsNewColumns(all_shard_import_buffers,
2997  all_shard_row_counts,
2998  import_buffers,
2999  row_count,
3000  shard_count,
3001  session_info);
3002  } else {
3003  distributeToShardsExistingColumns(all_shard_import_buffers,
3004  all_shard_row_counts,
3005  import_buffers,
3006  row_count,
3007  shard_count,
3008  session_info);
3009  }
3010 }
3011 
3013  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3014  size_t row_count,
3015  bool checkpoint,
3016  const Catalog_Namespace::SessionInfo* session_info) {
3017  if (load_callback_) {
3018  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
3019  return load_callback_(import_buffers, data_blocks, row_count);
3020  }
3021  if (table_desc_->nShards) {
3022  std::vector<OneShardBuffers> all_shard_import_buffers;
3023  std::vector<size_t> all_shard_row_counts;
3024  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
3025  distributeToShards(all_shard_import_buffers,
3026  all_shard_row_counts,
3027  import_buffers,
3028  row_count,
3029  shard_tables.size(),
3030  session_info);
3031  bool success = true;
3032  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3033  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3034  all_shard_row_counts[shard_idx],
3035  shard_tables[shard_idx],
3036  checkpoint,
3037  session_info);
3038  }
3039  return success;
3040  }
3041  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3042 }
3043 
3045  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3046  std::vector<DataBlockPtr> result(import_buffers.size());
3047  std::vector<std::pair<const size_t, std::future<int8_t*>>>
3048  encoded_data_block_ptrs_futures;
3049  // make all async calls to string dictionary here and then continue execution
3050  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3051  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
3052  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
3053  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3054  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
3055 
3056  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3057  buf_idx,
3058  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
3059  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3060  return import_buffers[buf_idx]->getStringDictBuffer();
3061  })));
3062  }
3063  }
3064 
3065  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3066  DataBlockPtr p;
3067  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
3068  import_buffers[buf_idx]->getTypeInfo().is_time() ||
3069  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
3070  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
3071  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
3072  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3073  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
3074  p.stringsPtr = string_payload_ptr;
3075  } else {
3076  // This condition means we have column which is ENCODED string. We already made
3077  // Async request to gain the encoded integer values above so we should skip this
3078  // iteration and continue.
3079  continue;
3080  }
3081  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
3082  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
3083  p.stringsPtr = geo_payload_ptr;
3084  } else {
3085  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
3086  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
3087  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
3088  import_buffers[buf_idx]->addDictEncodedStringArray(
3089  *import_buffers[buf_idx]->getStringArrayBuffer());
3090  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
3091  } else {
3092  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
3093  }
3094  }
3095  result[buf_idx] = p;
3096  }
3097 
3098  // wait for the async requests we made for string dictionary
3099  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3100  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3101  }
3102  return result;
3103 }
3104 
3106  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3107  size_t row_count,
3108  const TableDescriptor* shard_table,
3109  bool checkpoint,
3110  const Catalog_Namespace::SessionInfo* session_info) {
3111  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3113  ins_data.numRows = row_count;
3114  bool success = false;
3115  try {
3116  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3117  } catch (std::exception& e) {
3118  std::ostringstream oss;
3119  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3120  << e.what();
3121 
3122  LOG(ERROR) << oss.str();
3123  error_msg_ = oss.str();
3124  return success;
3125  }
3126  if (isAddingColumns()) {
3127  // when Adding columns we omit any columns except the ones being added
3128  ins_data.columnIds.clear();
3129  ins_data.is_default.clear();
3130  for (auto& buffer : import_buffers) {
3131  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3132  ins_data.is_default.push_back(true);
3133  }
3134  } else {
3135  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3136  }
3137  // release loader_lock so that in InsertOrderFragmenter::insertDat
3138  // we can have multiple threads sort/shuffle InsertData
3139  loader_lock.unlock();
3140  success = true;
3141  {
3142  try {
3143  if (checkpoint) {
3144  shard_table->fragmenter->insertData(ins_data);
3145  } else {
3146  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3147  }
3148  } catch (std::exception& e) {
3149  std::ostringstream oss;
3150  oss << "Fragmenter Insert Exception when processing Table "
3151  << shard_table->tableName << " issue was " << e.what();
3152 
3153  LOG(ERROR) << oss.str();
3154  loader_lock.lock();
3155  error_msg_ = oss.str();
3156  success = false;
3157  }
3158  }
3159  return success;
3160 }
3161 
3162 void Loader::dropColumns(const std::vector<int>& columnIds) {
3163  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3164  if (table_desc_->nShards) {
3166  }
3167  for (auto table_desc : table_descs) {
3168  table_desc->fragmenter->dropColumns(columnIds);
3169  }
3170 }
3171 
3175  for (auto cd : column_descs_) {
3176  insert_data_.columnIds.push_back(cd->columnId);
3177  if (cd->columnType.get_compression() == kENCODING_DICT) {
3178  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3179  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3180  CHECK(dd);
3181  dict_map_[cd->columnId] = dd->stringDict.get();
3182  }
3183  }
3184  insert_data_.numRows = 0;
3185 }
3186 
3189  split_raw_data();
3191 }
3192 
3194  const std::string& file_path,
3195  const bool decompressed,
3196  const Catalog_Namespace::SessionInfo* session_info) {
3197  // we do not check interrupt status for this detection
3198  if (!p_file) {
3199  p_file = fopen(file_path.c_str(), "rb");
3200  }
3201  if (!p_file) {
3202  throw std::runtime_error("failed to open file '" + file_path +
3203  "': " + strerror(errno));
3204  }
3205 
3206  // somehow clang does not support ext/stdio_filebuf.h, so
3207  // need to diy readline with customized copy_params.line_delim...
3208  std::string line;
3209  line.reserve(1 * 1024 * 1024);
3210  auto end_time = std::chrono::steady_clock::now() +
3211  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3212  try {
3213  while (!feof(p_file)) {
3214  int c;
3215  size_t n = 0;
3216  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3217  if (n++ >= line.capacity()) {
3218  break;
3219  }
3220  line += c;
3221  }
3222  if (0 == n) {
3223  break;
3224  }
3225  // remember the first line, which is possibly a header line, to
3226  // ignore identical header line(s) in 2nd+ files of a archive;
3227  // otherwise, 2nd+ header may be mistaken as an all-string row
3228  // and so be final column types.
3229  if (line1.empty()) {
3230  line1 = line;
3231  } else if (line == line1) {
3232  line.clear();
3233  continue;
3234  }
3235 
3236  raw_data += line;
3238  line.clear();
3240  if (std::chrono::steady_clock::now() > end_time) {
3241  if (import_status_.rows_completed > 10000) {
3242  break;
3243  }
3244  }
3245  }
3246  } catch (std::exception& e) {
3247  }
3248 
3249  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3250  import_status_.load_failed = true;
3251 
3252  fclose(p_file);
3253  p_file = nullptr;
3254  return import_status_;
3255 }
3256 
3258  // this becomes analogous to Importer::import()
3259  (void)DataStreamSink::archivePlumber(nullptr);
3260 }
3261 
3263  if (copy_params.delimiter == '\0') {
3264  copy_params.delimiter = ',';
3265  if (boost::filesystem::extension(file_path) == ".tsv") {
3266  copy_params.delimiter = '\t';
3267  }
3268  }
3269 }
3270 
3272  const char* buf = raw_data.c_str();
3273  const char* buf_end = buf + raw_data.size();
3274  bool try_single_thread = false;
3275  for (const char* p = buf; p < buf_end; p++) {
3276  std::vector<std::string> row;
3277  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3279  buf_end,
3280  buf_end,
3281  copy_params,
3282  nullptr,
3283  row,
3284  tmp_buffers,
3285  try_single_thread,
3286  true);
3287  raw_rows.push_back(row);
3288  if (try_single_thread) {
3289  break;
3290  }
3291  }
3292  if (try_single_thread) {
3293  copy_params.threads = 1;
3294  raw_rows.clear();
3295  for (const char* p = buf; p < buf_end; p++) {
3296  std::vector<std::string> row;
3297  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3299  buf_end,
3300  buf_end,
3301  copy_params,
3302  nullptr,
3303  row,
3304  tmp_buffers,
3305  try_single_thread,
3306  true);
3307  raw_rows.push_back(row);
3308  }
3309  }
3310 }
3311 
3312 template <class T>
3313 bool try_cast(const std::string& str) {
3314  try {
3315  boost::lexical_cast<T>(str);
3316  } catch (const boost::bad_lexical_cast& e) {
3317  return false;
3318  }
3319  return true;
3320 }
3321 
3322 SQLTypes Detector::detect_sqltype(const std::string& str) {
3323  SQLTypes type = kTEXT;
3324  if (try_cast<double>(str)) {
3325  type = kDOUBLE;
3326  /*if (try_cast<bool>(str)) {
3327  type = kBOOLEAN;
3328  }*/
3329  if (try_cast<int16_t>(str)) {
3330  type = kSMALLINT;
3331  } else if (try_cast<int32_t>(str)) {
3332  type = kINT;
3333  } else if (try_cast<int64_t>(str)) {
3334  type = kBIGINT;
3335  } else if (try_cast<float>(str)) {
3336  type = kFLOAT;
3337  }
3338  }
3339 
3340  // check for geo types
3341  if (type == kTEXT) {
3342  // convert to upper case
3343  std::string str_upper_case = str;
3345  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3346 
3347  // then test for leading words
3348  if (str_upper_case.find("POINT") == 0) {
3349  type = kPOINT;
3350  } else if (str_upper_case.find("LINESTRING") == 0) {
3351  type = kLINESTRING;
3352  } else if (str_upper_case.find("POLYGON") == 0) {
3354  type = kMULTIPOLYGON;
3355  } else {
3356  type = kPOLYGON;
3357  }
3358  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3359  type = kMULTIPOLYGON;
3360  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3361  std::string::npos &&
3362  (str_upper_case.size() % 2) == 0) {
3363  // simple hex blob (two characters per byte, not uu-encode or base64)
3364  if (str_upper_case.size() >= 10) {
3365  // match WKB blobs for supported geometry types
3366  // the first byte specifies if the data is big-endian or little-endian
3367  // the next four bytes are the geometry type (1 = POINT etc.)
3368  // @TODO support eWKB, which has extra bits set in the geometry type
3369  auto first_five_bytes = str_upper_case.substr(0, 10);
3370  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3371  type = kPOINT;
3372  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3373  type = kLINESTRING;
3374  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3375  type = kPOLYGON;
3376  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3377  type = kMULTIPOLYGON;
3378  } else {
3379  // unsupported WKB type
3380  return type;
3381  }
3382  } else {
3383  // too short to be WKB
3384  return type;
3385  }
3386  }
3387  }
3388 
3389  // check for time types
3390  if (type == kTEXT) {
3391  // This won't match unix timestamp, since floats and ints were checked above.
3392  if (dateTimeParseOptional<kTIME>(str, 0)) {
3393  type = kTIME;
3394  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3395  type = kTIMESTAMP;
3396  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3397  type = kDATE;
3398  }
3399  }
3400 
3401  return type;
3402 }
3403 
3404 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3405  std::vector<SQLTypes> types(row.size());
3406  for (size_t i = 0; i < row.size(); i++) {
3407  types[i] = detect_sqltype(row[i]);
3408  }
3409  return types;
3410 }
3411 
3413  static std::array<int, kSQLTYPE_LAST> typeorder;
3414  typeorder[kCHAR] = 0;
3415  typeorder[kBOOLEAN] = 2;
3416  typeorder[kSMALLINT] = 3;
3417  typeorder[kINT] = 4;
3418  typeorder[kBIGINT] = 5;
3419  typeorder[kFLOAT] = 6;
3420  typeorder[kDOUBLE] = 7;
3421  typeorder[kTIMESTAMP] = 8;
3422  typeorder[kTIME] = 9;
3423  typeorder[kDATE] = 10;
3424  typeorder[kPOINT] = 11;
3425  typeorder[kLINESTRING] = 11;
3426  typeorder[kPOLYGON] = 11;
3427  typeorder[kMULTIPOLYGON] = 11;
3428  typeorder[kTEXT] = 12;
3429 
3430  // note: b < a instead of a < b because the map is ordered most to least restrictive
3431  return typeorder[b] < typeorder[a];
3432 }
3433 
3436  best_encodings =
3437  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3438  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3439  switch (copy_params.has_header) {
3441  has_headers = detect_headers(head_types, best_sqltypes);
3442  if (has_headers) {
3444  } else {
3446  }
3447  break;
3449  has_headers = false;
3450  break;
3452  has_headers = true;
3453  break;
3454  }
3455 }
3456 
3459 }
3460 
3461 std::vector<SQLTypes> Detector::find_best_sqltypes(
3462  const std::vector<std::vector<std::string>>& raw_rows,
3463  const CopyParams& copy_params) {
3464  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3465 }
3466 
3467 std::vector<SQLTypes> Detector::find_best_sqltypes(
3468  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3469  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3470  const CopyParams& copy_params) {
3471  if (raw_rows.size() < 1) {
3472  throw std::runtime_error("No rows found in: " +
3473  boost::filesystem::basename(file_path));
3474  }
3475  auto end_time = std::chrono::steady_clock::now() + timeout;
3476  size_t num_cols = raw_rows.front().size();
3477  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3478  std::vector<size_t> non_null_col_counts(num_cols, 0);
3479  for (auto row = row_begin; row != row_end; row++) {
3480  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3481  best_types.push_back(kCHAR);
3482  non_null_col_counts.push_back(0);
3483  }
3484  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3485  // do not count nulls
3486  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3487  continue;
3488  }
3489  SQLTypes t = detect_sqltype(row->at(col_idx));
3490  non_null_col_counts[col_idx]++;
3491  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3492  best_types[col_idx] = t;
3493  }
3494  }
3495  if (std::chrono::steady_clock::now() > end_time) {
3496  break;
3497  }
3498  }
3499  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3500  // if we don't have any non-null values for this column make it text to be
3501  // safe b/c that is least restrictive type
3502  if (non_null_col_counts[col_idx] == 0) {
3503  best_types[col_idx] = kTEXT;
3504  }
3505  }
3506 
3507  return best_types;
3508 }
3509 
3510 std::vector<EncodingType> Detector::find_best_encodings(
3511  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3512  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3513  const std::vector<SQLTypes>& best_types) {
3514  if (raw_rows.size() < 1) {
3515  throw std::runtime_error("No rows found in: " +
3516  boost::filesystem::basename(file_path));
3517  }
3518  size_t num_cols = best_types.size();
3519  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3520  std::vector<size_t> num_rows_per_col(num_cols, 1);
3521  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3522  for (auto row = row_begin; row != row_end; row++) {
3523  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3524  if (IS_STRING(best_types[col_idx])) {
3525  count_set[col_idx].insert(row->at(col_idx));
3526  num_rows_per_col[col_idx]++;
3527  }
3528  }
3529  }
3530  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3531  if (IS_STRING(best_types[col_idx])) {
3532  float uniqueRatio =
3533  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3534  if (uniqueRatio < 0.75) {
3535  best_encodes[col_idx] = kENCODING_DICT;
3536  }
3537  }
3538  }
3539  return best_encodes;
3540 }
3541 
3542 // detect_headers returns true if:
3543 // - all elements of the first argument are kTEXT
3544 // - there is at least one instance where tail_types is more restrictive than head_types
3545 // (ie, not kTEXT)
3546 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3547  const std::vector<SQLTypes>& tail_types) {
3548  if (head_types.size() != tail_types.size()) {
3549  return false;
3550  }
3551  bool has_headers = false;
3552  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3553  if (head_types[col_idx] != kTEXT) {
3554  return false;
3555  }
3556  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3557  }
3558  return has_headers;
3559 }
3560 
3561 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3562  {
3563  n = std::min(n, raw_rows.size());
3564  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3565  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3566  raw_rows.begin() + n);
3567  return sample_rows;
3568  }
3569 }
3570 
3571 std::vector<std::string> Detector::get_headers() {
3572  {
3573  std::vector<std::string> headers(best_sqltypes.size());
3574  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3575  if (has_headers && i < raw_rows[0].size()) {
3576  headers[i] = raw_rows[0][i];
3577  } else {
3578  headers[i] = "column_" + std::to_string(i + 1);
3579  }
3580  }
3581  return headers;
3582  }
3583 }
3584 
3585 std::vector<SQLTypeInfo> Detector::getBestColumnTypes() const {
3586  {
3587  std::vector<SQLTypeInfo> types;
3588  CHECK_EQ(best_sqltypes.size(), best_encodings.size());
3589  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3590  types.emplace_back(best_sqltypes[i], false, best_encodings[i]);
3591  }
3592  return types;
3593  }
3594 }
3595 
3596 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3597  size_t row_count,
3598  const Catalog_Namespace::SessionInfo* session_info) {
3599  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3600  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3601  import_status_.load_failed = true;
3602  import_status_.load_msg = loader->getErrorMessage();
3603  }
3604 }
3605 
3607  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3608  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3609  mapd_lock_guard<mapd_shared_mutex> read_lock(import_mutex_);
3611  // rollback to starting epoch - undo all the added records
3612  loader->setTableEpochs(table_epochs);
3613  } else {
3614  loader->checkpoint();
3615  }
3616  }
3617 
3618  if (loader->getTableDesc()->persistenceLevel ==
3619  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3620  // tables
3621  auto ms = measure<>::execution([&]() {
3622  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3623  if (!import_status_.load_failed) {
3624  for (auto& p : import_buffers_vec[0]) {
3625  if (!p->stringDictCheckpoint()) {
3626  LOG(ERROR) << "Checkpointing Dictionary for Column "
3627  << p->getColumnDesc()->columnName << " failed.";
3628  import_status_.load_failed = true;
3629  import_status_.load_msg = "Dictionary checkpoint failed";
3630  break;
3631  }
3632  }
3633  }
3634  });
3635  if (DEBUG_TIMING) {
3636  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3637  << std::endl;
3638  }
3639  }
3640 }
3641 
3643  const Catalog_Namespace::SessionInfo* session_info) {
3644  // in generalized importing scheme, reaching here file_path may
3645  // contain a file path, a url or a wildcard of file paths.
3646  // see CopyTableStmt::execute.
3647 
3648  std::vector<std::string> file_paths;
3649  try {
3656  } catch (const shared::FileNotFoundException& e) {
3657  // After finding no matching files locally, file_path may still be an s3 url
3658  file_paths.push_back(file_path);
3659  }
3660 
3661  // sum up sizes of all local files -- only for local files. if
3662  // file_path is a s3 url, sizes will be obtained via S3Archive.
3663  for (const auto& file_path : file_paths) {
3665  }
3666 
3667  // s3 parquet goes different route because the files do not use libarchive
3668  // but parquet api, and they need to landed like .7z files.
3669  //
3670  // note: parquet must be explicitly specified by a WITH parameter
3671  // "source_type='parquet_file'", because for example spark sql users may specify a
3672  // output url w/o file extension like this:
3673  // df.write
3674  // .mode("overwrite")
3675  // .parquet("s3://bucket/folder/parquet/mydata")
3676  // without the parameter, it means plain or compressed csv files.
3677  // note: .ORC and AVRO files should follow a similar path to Parquet?
3679 #ifdef ENABLE_IMPORT_PARQUET
3680  import_parquet(file_paths, session_info);
3681 #else
3682  throw std::runtime_error("Parquet not supported!");
3683 #endif
3684  } else {
3685  import_compressed(file_paths, session_info);
3686  }
3687 
3688  return import_status_;
3689 }
3690 
3691 Detector::Detector(const boost::filesystem::path& fp, CopyParams& cp)
3692  : DataStreamSink(cp, fp.string()), file_path(fp) {
3693  {
3694  read_file();
3695  init();
3696  }
3697 }
3698 
3699 #ifdef ENABLE_IMPORT_PARQUET
3700 inline auto open_parquet_table(const std::string& file_path,
3701  std::shared_ptr<arrow::io::ReadableFile>& infile,
3702  std::unique_ptr<parquet::arrow::FileReader>& reader,
3703  std::shared_ptr<arrow::Table>& table) {
3704  using namespace parquet::arrow;
3705  auto file_result = arrow::io::ReadableFile::Open(file_path);
3706  PARQUET_THROW_NOT_OK(file_result.status());
3707  infile = file_result.ValueOrDie();
3708 
3709  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3710  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3711  const auto num_row_groups = reader->num_row_groups();
3712  const auto num_columns = table->num_columns();
3713  const auto num_rows = table->num_rows();
3714  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3715  << " columns in " << num_row_groups << " groups.";
3716  return std::make_tuple(num_row_groups, num_columns, num_rows);
3717 } // namespace import_export
3718 
3719 void Detector::import_local_parquet(const std::string& file_path,
3720  const Catalog_Namespace::SessionInfo* session_info) {
3721  /*Skip interrupt checking in detector*/
3722  std::shared_ptr<arrow::io::ReadableFile> infile;
3723  std::unique_ptr<parquet::arrow::FileReader> reader;
3724  std::shared_ptr<arrow::Table> table;
3725  int num_row_groups, num_columns;
3726  int64_t num_rows;
3727  std::tie(num_row_groups, num_columns, num_rows) =
3728  open_parquet_table(file_path, infile, reader, table);
3729  // make up header line if not yet
3730  if (0 == raw_data.size()) {
3732  copy_params.line_delim = '\n';
3733  copy_params.delimiter = ',';
3734  // must quote values to skip any embedded delimiter
3735  copy_params.quoted = true;
3736  copy_params.quote = '"';
3737  copy_params.escape = '"';
3738  for (int c = 0; c < num_columns; ++c) {
3739  if (c) {
3741  }
3742  raw_data += table->ColumnNames().at(c);
3743  }
3745  }
3746  // make up raw data... rowwize...
3747  const ColumnDescriptor cd;
3748  for (int g = 0; g < num_row_groups; ++g) {
3749  // data is columnwise
3750  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3751  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3752  arrays.resize(num_columns);
3753  for (int c = 0; c < num_columns; ++c) {
3754  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3755  for (auto chunk : arrays[c]->chunks()) {
3756  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3757  }
3758  }
3759  for (int r = 0; r < num_rows; ++r) {
3760  for (int c = 0; c < num_columns; ++c) {
3761  std::vector<std::string> buffer;
3762  for (auto chunk : arrays[c]->chunks()) {
3763  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3764  if (c) {
3766  }
3767  if (!chunk->IsNull(r)) {
3769  raw_data += boost::replace_all_copy(
3770  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3772  }
3773  }
3774  }
3776  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3777  if (++import_status_.rows_completed >= 10000) {
3778  // as if load truncated
3779  import_status_.load_failed = true;
3780  import_status_.load_msg = "Detector processed 10000 records";
3781  return;
3782  }
3783  }
3784  }
3785 }
3786 
3787 template <typename DATA_TYPE>
3789  std::vector<DATA_TYPE>& buffer,
3790  import_export::BadRowsTracker* const bad_rows_tracker) {
3791  const auto old_size = buffer.size();
3792  // erase backward to minimize memory movement overhead
3793  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3794  ++rit) {
3795  buffer.erase(buffer.begin() + *rit);
3796  }
3797  return std::make_tuple(old_size, buffer.size());
3798 }
3799 
3801  BadRowsTracker* const bad_rows_tracker) {
3802  switch (type) {
3803  case kBOOLEAN:
3804  return del_values(*bool_buffer_, bad_rows_tracker);
3805  case kTINYINT:
3806  return del_values(*tinyint_buffer_, bad_rows_tracker);
3807  case kSMALLINT:
3808  return del_values(*smallint_buffer_, bad_rows_tracker);
3809  case kINT:
3810  return del_values(*int_buffer_, bad_rows_tracker);
3811  case kBIGINT:
3812  case kNUMERIC:
3813  case kDECIMAL:
3814  case kTIME:
3815  case kTIMESTAMP:
3816  case kDATE:
3817  return del_values(*bigint_buffer_, bad_rows_tracker);
3818  case kFLOAT:
3819  return del_values(*float_buffer_, bad_rows_tracker);
3820  case kDOUBLE:
3821  return del_values(*double_buffer_, bad_rows_tracker);
3822  case kTEXT:
3823  case kVARCHAR:
3824  case kCHAR:
3825  return del_values(*string_buffer_, bad_rows_tracker);
3826  case kPOINT:
3827  case kLINESTRING:
3828  case kPOLYGON:
3829  case kMULTIPOLYGON:
3830  return del_values(*geo_string_buffer_, bad_rows_tracker);
3831  case kARRAY:
3832  return del_values(*array_buffer_, bad_rows_tracker);
3833  default:
3834  throw std::runtime_error("Invalid Type");
3835  }
3836 }
3837 
3838 void Importer::import_local_parquet(const std::string& file_path,
3839  const Catalog_Namespace::SessionInfo* session_info) {
3840  std::shared_ptr<arrow::io::ReadableFile> infile;
3841  std::unique_ptr<parquet::arrow::FileReader> reader;
3842  std::shared_ptr<arrow::Table> table;
3843  int num_row_groups, num_columns;
3844  int64_t nrow_in_file;
3845  std::tie(num_row_groups, num_columns, nrow_in_file) =
3846  open_parquet_table(file_path, infile, reader, table);
3847  // column_list has no $deleted
3848  const auto& column_list = get_column_descs();
3849  // for now geo columns expect a wkt or wkb hex string
3850  std::vector<const ColumnDescriptor*> cds;
3851  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3852  int num_physical_cols = 0;
3853  for (auto& cd : column_list) {
3854  cds.push_back(cd);
3855  num_physical_cols += cd->columnType.get_physical_cols();
3856  }
3857  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3858  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3859  std::to_string(num_columns) + " columns in file vs " +
3860  std::to_string(column_list.size() - num_physical_cols) +
3861  " columns in table.");
3862  // slice each group to import slower columns faster, eg. geo or string
3864  VLOG(1) << "Parquet import # threads: " << max_threads;
3865  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3866  // init row estimate for this file
3867  const auto filesize = get_filesize(file_path);
3868  size_t nrow_completed{0};
3869  file_offsets.push_back(0);
3870  // map logic column index to physical column index
3871  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3872  int physical_col_idx = 0;
3873  for (int i = 0; i < logic_col_idx; ++i) {
3874  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3875  }
3876  return physical_col_idx;
3877  };
3878  // load a file = nested iteration of row groups, row slices and logical columns
3879  auto query_session = session_info ? session_info->get_session_id() : "";
3880  auto ms_load_a_file = measure<>::execution([&]() {
3881  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3882  {
3883  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
3885  break;
3886  }
3887  }
3888  // a sliced row group will be handled like a (logic) parquet file, with
3889  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3890  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3891  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3892  import_status_.load_failed = true;
3893  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3894  }
3895  import_buffers_vec.resize(num_slices);
3896  for (int slice = 0; slice < num_slices; slice++) {
3897  import_buffers_vec[slice].clear();
3898  for (const auto cd : cds) {
3899  import_buffers_vec[slice].emplace_back(
3900  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3901  }
3902  }
3903  /*
3904  * A caveat here is: Parquet files or arrow data is imported column wise.
3905  * Unlike importing row-wise csv files, a error on any row of any column
3906  * forces to give up entire row group of all columns, unless there is a
3907  * sophisticated method to trace erroneous rows in individual columns so
3908  * that we can union bad rows and drop them from corresponding
3909  * import_buffers_vec; otherwise, we may exceed maximum number of
3910  * truncated rows easily even with very sparse errors in the files.
3911  */
3912  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3913  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3914  auto& bad_rows_tracker = bad_rows_trackers[slice];
3915  bad_rows_tracker.file_name = file_path;
3916  bad_rows_tracker.row_group = slice;
3917  bad_rows_tracker.importer = this;
3918  }
3919  // process arrow arrays to import buffers
3920  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3921  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3922  const auto cd = cds[physical_col_idx];
3923  std::shared_ptr<arrow::ChunkedArray> array;
3924  PARQUET_THROW_NOT_OK(
3925  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3926  const size_t array_size = array->length();
3927  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3928  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3929  for (int slice = 0; slice < num_slices; ++slice) {
3930  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3931  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3932  import_status_.load_failed = true;
3933  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3934  }
3935  thread_controller.startThread([&, slice] {
3936  const auto slice_offset = slice % num_slices;
3937  ArraySliceRange slice_range(
3938  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3939  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3940  auto& bad_rows_tracker = bad_rows_trackers[slice];
3941  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3942  import_buffer->import_buffers = &import_buffers_vec[slice];
3943  import_buffer->col_idx = physical_col_idx + 1;
3944  for (auto chunk : array->chunks()) {
3945  import_buffer->add_arrow_values(
3946  cd, *chunk, false, slice_range, &bad_rows_tracker);
3947  }
3948  });
3949  }
3950  thread_controller.finish();
3951  }
3952  std::vector<size_t> nrow_in_slice_raw(num_slices);
3953  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3954  // trim bad rows from import buffers
3955  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3956  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3957  const auto cd = cds[physical_col_idx];
3958  for (int slice = 0; slice < num_slices; ++slice) {
3959  auto& bad_rows_tracker = bad_rows_trackers[slice];
3960  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3961  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3962  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3963  }
3964  }
3965  // flush slices of this row group to chunks
3966  for (int slice = 0; slice < num_slices; ++slice) {
3967  load(import_buffers_vec[slice],
3968  nrow_in_slice_successfully_loaded[slice],
3969  session_info);
3970  }
3971  // update import stats
3972  const auto nrow_original =
3973  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3974  const auto nrow_imported =
3975  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3976  nrow_in_slice_successfully_loaded.end(),
3977  0);
3978  const auto nrow_dropped = nrow_original - nrow_imported;
3979  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3980  << " rows, drop " << nrow_dropped << " rows.";
3981  {
3982  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3983  import_status_.rows_completed += nrow_imported;
3984  import_status_.rows_rejected += nrow_dropped;
3986  import_status_.load_failed = true;
3988  ") rows rejected exceeded. Halting load.";
3989  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3990  << ") rows rejected exceeded. Halting load.";
3991  }
3992  }
3993  // row estimate
3994  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3995  nrow_completed += nrow_imported;
3996  file_offsets.back() =
3997  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3998  // sum up current total file offsets
3999  const auto total_file_offset =
4000  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
4001  // estimate number of rows per current total file offset
4002  if (total_file_offset) {
4004  (float)total_file_size / total_file_offset * import_status_.rows_completed;
4005  VLOG(3) << "rows_completed " << import_status_.rows_completed
4006  << ", rows_estimated " << import_status_.rows_estimated
4007  << ", total_file_size " << total_file_size << ", total_file_offset "
4008  << total_file_offset;
4009  }
4010  }
4011  });
4012  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
4013  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
4014 }
4015 
4016 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4017  const Catalog_Namespace::SessionInfo* session_info) {
4018  auto importer = dynamic_cast<Importer*>(this);
4019  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4020  : std::vector<Catalog_Namespace::TableEpochInfo>{};
4021  try {
4022  std::exception_ptr teptr;
4023  // file_paths may contain one local file path, a list of local file paths
4024  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4025  for (auto const& file_path : file_paths) {
4026  std::map<int, std::string> url_parts;
4027  Archive::parse_url(file_path, url_parts);
4028 
4029  // for a s3 url we need to know the obj keys that it comprises
4030  std::vector<std::string> objkeys;
4031  std::unique_ptr<S3ParquetArchive> us3arch;
4032  if ("s3" == url_parts[2]) {
4033 #ifdef HAVE_AWS_S3
4034  us3arch.reset(new S3ParquetArchive(file_path,
4044  us3arch->init_for_read();
4045  total_file_size += us3arch->get_total_file_size();
4046  objkeys = us3arch->get_objkeys();
4047 #else
4048  throw std::runtime_error("AWS S3 support not available");
4049 #endif // HAVE_AWS_S3
4050  } else {
4051  objkeys.emplace_back(file_path);
4052  }
4053 
4054  // for each obj key of a s3 url we need to land it before
4055  // importing it like doing with a 'local file'.
4056  for (auto const& objkey : objkeys) {
4057  try {
4058  auto file_path =
4059  us3arch
4060  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4061  : objkey;
4062  import_local_parquet(file_path, session_info);
4063  if (us3arch) {
4064  us3arch->vacuum(objkey);
4065  }
4066  } catch (...) {
4067  if (us3arch) {
4068  us3arch->vacuum(objkey);
4069  }
4070  throw;
4071  }
4072  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4074  break;
4075  }
4076  }
4077  }
4078  // rethrow any exception happened herebefore
4079  if (teptr) {
4080  std::rethrow_exception(teptr);
4081  }
4082  } catch (const shared::NoRegexFilterMatchException& e) {
4083  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4084  import_status_.load_failed = true;
4085  import_status_.load_msg = e.what();
4086  throw e;
4087  } catch (const std::exception& e) {
4088  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4089  import_status_.load_failed = true;
4090  import_status_.load_msg = e.what();
4091  }
4092 
4093  if (importer) {
4094  importer->checkpoint(table_epochs);
4095  }
4096 }
4097 #endif // ENABLE_IMPORT_PARQUET
4098 
4100  std::vector<std::string>& file_paths,
4101  const Catalog_Namespace::SessionInfo* session_info) {
4102  // a new requirement is to have one single input stream into
4103  // Importer::importDelimited, so need to move pipe related
4104  // stuff to the outmost block.
4105  int fd[2];
4106 #ifdef _WIN32
4107  // For some reason when folly is used to create the pipe, reader can
4108  // read nothing.
4109  auto pipe_res =
4110  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4111 #else
4112  auto pipe_res = pipe(fd);
4113 #endif
4114  if (pipe_res < 0) {
4115  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4116  }
4117 #ifndef _WIN32
4118  signal(SIGPIPE, SIG_IGN);
4119 #endif
4120 
4121  std::exception_ptr teptr;
4122  // create a thread to read uncompressed byte stream out of pipe and
4123  // feed into importDelimited()
4124  ImportStatus ret1;
4125  auto th_pipe_reader = std::thread([&]() {
4126  try {
4127  // importDelimited will read from FILE* p_file
4128  if (0 == (p_file = fdopen(fd[0], "r"))) {
4129  throw std::runtime_error(std::string("failed to open a pipe: ") +
4130  strerror(errno));
4131  }
4132 
4133  // in future, depending on data types of this uncompressed stream
4134  // it can be feed into other function such like importParquet, etc
4135  ret1 = importDelimited(file_path, true, session_info);
4136 
4137  } catch (...) {
4138  if (!teptr) { // no replace
4139  teptr = std::current_exception();
4140  }
4141  }
4142 
4143  if (p_file) {
4144  fclose(p_file);
4145  }
4146  p_file = 0;
4147  });
4148 
4149  // create a thread to iterate all files (in all archives) and
4150  // forward the uncompressed byte stream to fd[1] which is
4151  // then feed into importDelimited, importParquet, and etc.
4152  auto th_pipe_writer = std::thread([&]() {
4153  std::unique_ptr<S3Archive> us3arch;
4154  bool stop = false;
4155  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4156  try {
4157  auto file_path = file_paths[fi];
4158  std::unique_ptr<Archive> uarch;
4159  std::map<int, std::string> url_parts;
4160  Archive::parse_url(file_path, url_parts);
4161  const std::string S3_objkey_url_scheme = "s3ok";
4162  if ("file" == url_parts[2] || "" == url_parts[2]) {
4163  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4164  } else if ("s3" == url_parts[2]) {
4165 #ifdef HAVE_AWS_S3
4166  // new a S3Archive with a shared s3client.
4167  // should be safe b/c no wildcard with s3 url
4168  us3arch.reset(new S3Archive(file_path,
4178  us3arch->init_for_read();
4179  total_file_size += us3arch->get_total_file_size();
4180  // not land all files here but one by one in following iterations
4181  for (const auto& objkey : us3arch->get_objkeys()) {
4182  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4183  }
4184  continue;
4185 #else
4186  throw std::runtime_error("AWS S3 support not available");
4187 #endif // HAVE_AWS_S3
4188  } else if (S3_objkey_url_scheme == url_parts[2]) {
4189 #ifdef HAVE_AWS_S3
4190  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4191  auto file_path =
4192  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4193  if (0 == file_path.size()) {
4194  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4195  }
4196  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4197  // file not removed until file closed
4198  us3arch->vacuum(objkey);
4199 #else
4200  throw std::runtime_error("AWS S3 support not available");
4201 #endif // HAVE_AWS_S3
4202  }
4203 #if 0 // TODO(ppan): implement and enable any other archive class
4204  else
4205  if ("hdfs" == url_parts[2])
4206  uarch.reset(new HdfsArchive(file_path));
4207 #endif
4208  else {
4209  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4210  }
4211 
4212  // init the archive for read
4213  auto& arch = *uarch;
4214 
4215  // coming here, the archive of url should be ready to be read, unarchived
4216  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4217  const void* buf;
4218  size_t size;
4219  bool just_saw_archive_header;
4220  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4221  bool first_text_header_skipped = false;
4222  // start reading uncompressed bytes of this archive from libarchive
4223  // note! this archive may contain more than one files!
4224  file_offsets.push_back(0);
4225  size_t num_block_read = 0;
4226  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4227  bool insert_line_delim_after_this_file = false;
4228  while (!stop) {
4229  int64_t offset{-1};
4230  auto ok = arch.read_data_block(&buf, &size, &offset);
4231  // can't use (uncompressed) size, so track (max) file offset.
4232  // also we want to capture offset even on e.o.f.
4233  if (offset > 0) {
4234  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4235  file_offsets.back() = offset;
4236  }
4237  if (!ok) {
4238  break;
4239  }
4240  // one subtle point here is now we concatenate all files
4241  // to a single FILE stream with which we call importDelimited
4242  // only once. this would make it misunderstand that only one
4243  // header line is with this 'single' stream, while actually
4244  // we may have one header line for each of the files.
4245  // so we need to skip header lines here instead in importDelimited.
4246  const char* buf2 = (const char*)buf;
4247  int size2 = size;
4249  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4250  while (size2-- > 0) {
4251  if (*buf2++ == copy_params.line_delim) {
4252  break;
4253  }
4254  }
4255  if (size2 <= 0) {
4256  LOG(WARNING) << "No line delimiter in block." << std::endl;
4257  } else {
4258  just_saw_archive_header = false;
4259  first_text_header_skipped = true;
4260  }
4261  }
4262  // In very rare occasions the write pipe somehow operates in a mode similar
4263  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4264  // means blocking mode. On such a unreliable blocking mode, a possible fix
4265  // is to loop reading till no bytes left, otherwise the annoying `failed to
4266  // write pipe: Success`...
4267  if (size2 > 0) {
4268  int nremaining = size2;
4269  while (nremaining > 0) {
4270  // try to write the entire remainder of the buffer to the pipe
4271  int nwritten = write(fd[1], buf2, nremaining);
4272  // how did we do?
4273  if (nwritten < 0) {
4274  // something bad happened
4275  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4276  // ignore these, assume nothing written, try again
4277  nwritten = 0;
4278  } else {
4279  // a real error
4280  throw std::runtime_error(
4281  std::string("failed or interrupted write to pipe: ") +
4282  strerror(errno));
4283  }
4284  } else if (nwritten == nremaining) {
4285  // we wrote everything; we're done
4286  break;
4287  }
4288  // only wrote some (or nothing), try again
4289  nremaining -= nwritten;
4290  buf2 += nwritten;
4291  // no exception when too many rejected
4292  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4294  stop = true;
4295  break;
4296  }
4297  }
4298  // check that this file (buf for size) ended with a line delim
4299  if (size > 0) {
4300  const char* plast = static_cast<const char*>(buf) + (size - 1);
4301  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4302  }
4303  }
4304  ++num_block_read;
4305  }
4306 
4307  // if that file didn't end with a line delim, we insert one here to terminate
4308  // that file's stream use a loop for the same reason as above
4309  if (insert_line_delim_after_this_file) {
4310  while (true) {
4311  // write the delim char to the pipe
4312  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4313  // how did we do?
4314  if (nwritten < 0) {
4315  // something bad happened
4316  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4317  // ignore these, assume nothing written, try again
4318  nwritten = 0;
4319  } else {
4320  // a real error
4321  throw std::runtime_error(
4322  std::string("failed or interrupted write to pipe: ") +
4323  strerror(errno));
4324  }
4325  } else if (nwritten == 1) {
4326  // we wrote it; we're done
4327  break;
4328  }
4329  }
4330  }
4331  }
4332  } catch (...) {
4333  // when import is aborted because too many data errors or because end of a
4334  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4335  // suppressed.
4336  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4338  break;
4339  }
4340  if (import_status_.rows_completed > 0) {
4341  if (nullptr != dynamic_cast<Detector*>(this)) {
4342  break;
4343  }
4344  }
4345  if (!teptr) { // no replace
4346  teptr = std::current_exception();
4347  }
4348  break;
4349  }
4350  }
4351  // close writer end
4352  close(fd[1]);
4353  });
4354 
4355  th_pipe_reader.join();
4356  th_pipe_writer.join();
4357 
4358  // rethrow any exception happened herebefore
4359  if (teptr) {
4360  std::rethrow_exception(teptr);
4361  }
4362 }
4363 
4365  return DataStreamSink::archivePlumber(session_info);
4366 }
4367 
4369  const std::string& file_path,
4370  const bool decompressed,
4371  const Catalog_Namespace::SessionInfo* session_info) {
4373  auto query_session = session_info ? session_info->get_session_id() : "";
4374 
4375  if (!p_file) {
4376  p_file = fopen(file_path.c_str(), "rb");
4377  }
4378  if (!p_file) {
4379  throw std::runtime_error("failed to open file '" + file_path +
4380  "': " + strerror(errno));
4381  }
4382 
4383  if (!decompressed) {
4384  (void)fseek(p_file, 0, SEEK_END);
4385  file_size = ftell(p_file);
4386  }
4387 
4388  max_threads = num_import_threads(copy_params.threads);
4389  VLOG(1) << "Delimited import # threads: " << max_threads;
4390 
4391  // deal with small files
4392  size_t alloc_size = copy_params.buffer_size;
4393  if (!decompressed && file_size < alloc_size) {
4394  alloc_size = file_size;
4395  }
4396 
4397  for (size_t i = 0; i < max_threads; i++) {
4398  import_buffers_vec.emplace_back();
4399  for (const auto cd : loader->get_column_descs()) {
4400  import_buffers_vec[i].emplace_back(
4401  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4402  }
4403  }
4404 
4405  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4406  size_t current_pos = 0;
4407  size_t end_pos;
4408  size_t begin_pos = 0;
4409 
4410  (void)fseek(p_file, current_pos, SEEK_SET);
4411  size_t size =
4412  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4413 
4414  // make render group analyzers for each poly column
4415  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4417  auto& cat = loader->getCatalog();
4418  auto* td = loader->getTableDesc();
4419  CHECK(td);
4420  auto column_descriptors =
4421  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4422  for (auto const& cd : column_descriptors) {
4423  if (IS_GEO_POLY(cd->columnType.get_type())) {
4424  auto rga = std::make_shared<RenderGroupAnalyzer>();
4425  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4426  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4427  }
4428  }
4429  }
4430 
4431  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4432  loader->getTableDesc()->tableId};
4433  auto table_epochs = loader->getTableEpochs();
4435  {
4436  std::list<std::future<ImportStatus>> threads;
4437 
4438  // use a stack to track thread_ids which must not overlap among threads
4439  // because thread_id is used to index import_buffers_vec[]
4440  std::stack<size_t> stack_thread_ids;
4441  for (size_t i = 0; i < max_threads; i++) {
4442  stack_thread_ids.push(i);
4443  }
4444  // added for true row index on error
4445  size_t first_row_index_this_buffer = 0;
4446 
4447  while (size > 0) {
4448  unsigned int num_rows_this_buffer = 0;
4449  CHECK(scratch_buffer);
4450  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4451  scratch_buffer,
4452  size,
4453  copy_params,
4454  first_row_index_this_buffer,
4455  num_rows_this_buffer,
4456  p_file);
4457 
4458  // unput residual
4459  int nresidual = size - end_pos;
4460  std::unique_ptr<char[]> unbuf;
4461  if (nresidual > 0) {
4462  unbuf = std::make_unique<char[]>(nresidual);
4463  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4464  }
4465 
4466  // get a thread_id not in use
4467  auto thread_id = stack_thread_ids.top();
4468  stack_thread_ids.pop();
4469  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4470 
4471  threads.push_back(std::async(std::launch::async,
4473  thread_id,
4474  this,
4475  std::move(scratch_buffer),
4476  begin_pos,
4477  end_pos,
4478  end_pos,
4479  columnIdToRenderGroupAnalyzerMap,
4480  first_row_index_this_buffer,
4481  session_info,
4482  executor));
4483 
4484  first_row_index_this_buffer += num_rows_this_buffer;
4485 
4486  current_pos += end_pos;
4487  scratch_buffer = std::make_unique<char[]>(alloc_size);
4488  CHECK(scratch_buffer);
4489  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4490  size = nresidual +
4491  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4492 
4493  begin_pos = 0;
4494  while (threads.size() > 0) {
4495  int nready = 0;
4496  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4497  it != threads.end();) {
4498  auto& p = *it;
4499  std::chrono::milliseconds span(0);
4500  if (p.wait_for(span) == std::future_status::ready) {
4501  auto ret_import_status = p.get();
4502  {
4503  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4504  import_status_ += ret_import_status;
4505  if (ret_import_status.load_failed) {
4507  }
4508  }
4509  // sum up current total file offsets
4510  size_t total_file_offset{0};
4511  if (decompressed) {
4512  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4513  for (const auto file_offset : file_offsets) {
4514  total_file_offset += file_offset;
4515  }
4516  }
4517  // estimate number of rows per current total file offset
4518  if (decompressed ? total_file_offset : current_pos) {
4520  (decompressed ? (float)total_file_size / total_file_offset
4521  : (float)file_size / current_pos) *
4523  }
4524  VLOG(3) << "rows_completed " << import_status_.rows_completed
4525  << ", rows_estimated " << import_status_.rows_estimated
4526  << ", total_file_size " << total_file_size << ", total_file_offset "
4527  << total_file_offset;
4529  // recall thread_id for reuse
4530  stack_thread_ids.push(ret_import_status.thread_id);
4531  threads.erase(it++);
4532  ++nready;
4533  } else {
4534  ++it;
4535  }
4536  }
4537 
4538  if (nready == 0) {
4539  std::this_thread::yield();
4540  }
4541 
4542  // on eof, wait all threads to finish
4543  if (0 == size) {
4544  continue;
4545  }
4546 
4547  // keep reading if any free thread slot
4548  // this is one of the major difference from old threading model !!
4549  if (threads.size() < max_threads) {
4550  break;
4551  }
4552  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4554  break;
4555  }
4556  }
4557  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
4559  import_status_.load_failed = true;
4560  // todo use better message
4561  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4562  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4563  break;
4564  }
4566  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4567  break;
4568  }
4569  }
4570 
4571  // join dangling threads in case of LOG(ERROR) above
4572  for (auto& p : threads) {
4573  p.wait();
4574  }
4575  }
4576 
4577  checkpoint(table_epochs);
4578 
4579  fclose(p_file);
4580  p_file = nullptr;
4581  return import_status_;
4582 }
4583 
4585  if (getTableDesc()->persistenceLevel ==
4586  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4587  // tables
4589  }
4590 }
4591 
4592 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4593  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4594  getTableDesc()->tableId);
4595 }
4596 
4598  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4599  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4600 }
4601 
4602 /* static */
4604  const std::string& file_name,
4605  const CopyParams& copy_params) {
4608  copy_params.s3_endpoint,
4609  copy_params.s3_access_key,
4610  copy_params.s3_secret_key,
4611  copy_params.s3_session_token);
4612  if (copy_params.source_type != import_export::SourceType::kGeoFile) {
4613  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4614  std::to_string(static_cast<int>(copy_params.source_type)) +
4615  ")");
4616  }
4618 }
4619 
4620 namespace {
4621 
4622 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4624  const std::string& file_name) {
4625  // get layer with specified name, or default to first layer
4626  OGRLayer* poLayer = nullptr;
4627  if (geo_layer_name.size()) {
4628  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4629  if (poLayer == nullptr) {
4630  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4631  file_name);
4632  }
4633  } else {
4634  poLayer = poDS->GetLayer(0);
4635  if (poLayer == nullptr) {
4636  throw std::runtime_error("No layers found in " + file_name);
4637  }
4638  }
4639  return *poLayer;
4640 }
4641 
4642 } // namespace
4643 
4644 /* static */
4646  const std::string& file_name,
4647  const std::string& geo_column_name,
4648  std::map<std::string, std::vector<std::string>>& sample_data,
4649  int row_limit,
4650  const CopyParams& copy_params) {
4652  openGDALDataSource(file_name, copy_params));
4653  if (datasource == nullptr) {
4654  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4655  file_name);
4656  }
4657 
4658  OGRLayer& layer =
4659  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4660 
4661  auto const* feature_defn = layer.GetLayerDefn();
4662  CHECK(feature_defn);
4663 
4664  // metadata columns?
4665  auto const metadata_column_infos =
4666  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4667 
4668  // get limited feature count
4669  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4670  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4671  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4672 
4673  // prepare sample data map
4674  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4675  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4676  CHECK(column_name);
4677  sample_data[column_name] = {};
4678  }
4679  sample_data[geo_column_name] = {};
4680  for (auto const& mci : metadata_column_infos) {
4681  sample_data[mci.column_descriptor.columnName] = {};
4682  }
4683 
4684  // prepare to read
4685  layer.ResetReading();
4686 
4687  // read features (up to limited count)
4688  uint64_t feature_index{0u};
4689  while (feature_index < num_features) {
4690  // get (and take ownership of) feature
4691  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4692  if (!feature) {
4693  break;
4694  }
4695 
4696  // get feature geometry
4697  auto const* geometry = feature->GetGeometryRef();
4698  if (geometry == nullptr) {
4699  break;
4700  }
4701 
4702  // validate geom type (again?)
4703  switch (wkbFlatten(geometry->getGeometryType())) {
4704  case wkbPoint:
4705  case wkbLineString:
4706  case wkbPolygon:
4707  case wkbMultiPolygon:
4708  break;
4709  case wkbMultiPoint:
4710  case wkbMultiLineString:
4711  // supported if geo_explode_collections is specified
4712  if (!copy_params.geo_explode_collections) {
4713  throw std::runtime_error("Unsupported geometry type: " +
4714  std::string(geometry->getGeometryName()));
4715  }
4716  break;
4717  default:
4718  throw std::runtime_error("Unsupported geometry type: " +
4719  std::string(geometry->getGeometryName()));
4720  }
4721 
4722  // populate sample data for regular field columns
4723  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4724  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4725  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4726  }
4727 
4728  // populate sample data for metadata columns?
4729  for (auto const& mci : metadata_column_infos) {
4730  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4731  }
4732 
4733  // populate sample data for geo column with WKT string
4734  char* wkts = nullptr;
4735  geometry->exportToWkt(&wkts);
4736  CHECK(wkts);
4737  sample_data[geo_column_name].push_back(wkts);
4738  CPLFree(wkts);
4739 
4740  // next feature
4741  feature_index++;
4742  }
4743 }
4744 
4745 namespace {
4746 
4747 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4748  switch (ogr_type) {
4749  case OFTInteger:
4750  return std::make_pair(kINT, false);
4751  case OFTIntegerList:
4752  return std::make_pair(kINT, true);
4753 #if GDAL_VERSION_MAJOR > 1
4754  case OFTInteger64:
4755  return std::make_pair(kBIGINT, false);
4756  case OFTInteger64List:
4757  return std::make_pair(kBIGINT, true);
4758 #endif
4759  case OFTReal:
4760  return std::make_pair(kDOUBLE, false);
4761  case OFTRealList:
4762  return std::make_pair(kDOUBLE, true);
4763  case OFTString:
4764  return std::make_pair(kTEXT, false);
4765  case OFTStringList:
4766  return std::make_pair(kTEXT, true);
4767  case OFTDate:
4768  return std::make_pair(kDATE, false);
4769  case OFTTime:
4770  return std::make_pair(kTIME, false);
4771  case OFTDateTime:
4772  return std::make_pair(kTIMESTAMP, false);
4773  case OFTBinary:
4774  // Interpret binary blobs as byte arrays here
4775  // but actual import will store NULL as GDAL will not
4776  // extract the blob (OGRFeature::GetFieldAsString will
4777  // result in the import buffers having an empty string)
4778  return std::make_pair(kTINYINT, true);
4779  default:
4780  break;
4781  }
4782  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4783 }
4784 
4785 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4786  switch (ogr_type) {
4787  case wkbPoint:
4788  return kPOINT;
4789  case wkbLineString:
4790  return kLINESTRING;
4791  case wkbPolygon:
4792  return kPOLYGON;
4793  case wkbMultiPolygon:
4794  return kMULTIPOLYGON;
4795  default:
4796  break;
4797  }
4798  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4799 }
4800 
4802  const import_export::RasterPointType raster_point_type) {
4803  switch (raster_point_type) {
4818  }
4819  UNREACHABLE();
4821 }
4822 
4824  const import_export::RasterPointTransform raster_point_transform) {
4825  switch (raster_point_transform) {
4834  }
4835  UNREACHABLE();
4837 }
4838 
4839 } // namespace
4840 
4841 /* static */
4842 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4843  const std::string& file_name,
4844  const bool is_raster,
4845  const std::string& geo_column_name,
4846  const CopyParams& copy_params) {
4847  if (is_raster) {
4848  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4849  }
4850  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4851 }
4852 
4853 /* static */
4854 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsRaster(
4855  const std::string& file_name,
4856  const std::string& geo_column_name,
4857  const CopyParams& copy_params) {
4858  // lazy init GDAL
4861  copy_params.s3_endpoint,
4862  copy_params.s3_access_key,
4863  copy_params.s3_secret_key,
4864  copy_params.s3_session_token);
4865 
4866  // prepare for metadata column
4867  auto metadata_column_infos =
4868  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4869 
4870  // create a raster importer and do the detect
4871  RasterImporter raster_importer;
4872  raster_importer.detect(
4873  file_name,
4874  copy_params.raster_import_bands,
4875  copy_params.raster_import_dimensions,
4878  copy_params.raster_point_compute_angle,
4879  false,
4880  metadata_column_infos);
4881 
4882  // prepare to capture column descriptors
4883  std::list<ColumnDescriptor> cds;
4884 
4885  // get the point column info
4886  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4887 
4888  // create the columns for the point in the specified type
4889  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4890  ColumnDescriptor cd;
4891  cd.columnName = cd.sourceName = col_name;
4892  cd.columnType.set_type(sql_type);
4893  // hardwire other POINT attributes for now
4894  if (sql_type == kPOINT) {
4896  cd.columnType.set_input_srid(4326);
4897  cd.columnType.set_output_srid(4326);
4899  cd.columnType.set_comp_param(32);
4900  }
4901  cds.push_back(cd);
4902  }
4903 
4904  // get the names and types for the band column(s)
4905  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4906 
4907  // add column descriptors for each band
4908  for (auto const& [band_name, sql_type] : band_names_and_types) {
4909  ColumnDescriptor cd;
4910  cd.columnName = cd.sourceName = band_name;
4911  cd.columnType.set_type(sql_type);
4913  cds.push_back(cd);
4914  }
4915 
4916  // metadata columns?
4917  for (auto& mci : metadata_column_infos) {
4918  cds.push_back(std::move(mci.column_descriptor));
4919  }
4920 
4921  // return the results
4922  return cds;
4923 }
4924 
4925 /* static */
4926 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsGeo(
4927  const std::string& file_name,
4928  const std::string& geo_column_name,
4929  const CopyParams& copy_params) {
4930  std::list<ColumnDescriptor> cds;
4931 
4932  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
4933  if (poDS == nullptr) {
4934  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4935  file_name);
4936  }
4937  if (poDS->GetLayerCount() == 0) {
4938  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
4939  " has no layers");
4940  }
4941 
4942  OGRLayer& layer =
4943  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4944 
4945  layer.ResetReading();
4946  // TODO(andrewseidl): support multiple features
4947  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
4948  if (poFeature == nullptr) {
4949  throw std::runtime_error("No features found in " + file_name);
4950  }
4951  // get fields as regular columns
4952  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4953  CHECK(poFDefn);
4954  int iField;
4955  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4956  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4957  auto typePair = ogr_to_type(poFieldDefn->GetType());
4958  ColumnDescriptor cd;
4959  cd.columnName = poFieldDefn->GetNameRef();
4960  cd.sourceName = poFieldDefn->GetNameRef();
4961  SQLTypeInfo ti;
4962  if (typePair.second) {
4963  ti.set_type(kARRAY);
4964  ti.set_subtype(typePair.first);
4965  } else {
4966  ti.set_type(typePair.first);
4967  }
4968  if (typePair.first == kTEXT) {
4970  ti.set_comp_param(32);
4971  }
4972  ti.set_fixed_size();
4973  cd.columnType = ti;
4974  cds.push_back(cd);
4975  }
4976  // get geo column, if any
4977  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4978  if (poGeometry) {
4979  ColumnDescriptor cd;
4980  cd.columnName = geo_column_name;
4981  cd.sourceName = geo_column_name;
4982 
4983  // get GDAL type
4984  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4985 
4986  // if exploding, override any collection type to child type
4987  if (copy_params.geo_explode_collections) {
4988  if (ogr_type == wkbMultiPolygon) {
4989  ogr_type = wkbPolygon;
4990  } else if (ogr_type == wkbMultiLineString) {
4991  ogr_type = wkbLineString;
4992  } else if (ogr_type == wkbMultiPoint) {
4993  ogr_type = wkbPoint;
4994  }
4995  }
4996 
4997  // convert to internal type
4998  SQLTypes geoType = ogr_to_type(ogr_type);
4999 
5000  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5002  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5003  }
5004 
5005  // build full internal type
5006  SQLTypeInfo ti;
5007  ti.set_type(geoType);
5008  ti.set_subtype(copy_params.geo_coords_type);
5009  ti.set_input_srid(copy_params.geo_coords_srid);
5010  ti.set_output_srid(copy_params.geo_coords_srid);
5011  ti.set_compression(copy_params.geo_coords_encoding);
5012  ti.set_comp_param(copy_params.geo_coords_comp_param);
5013  cd.columnType = ti;
5014 
5015  cds.push_back(cd);
5016  }
5017 
5018  // metadata columns?
5019  auto metadata_column_infos =
5020  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
5021  for (auto& mci : metadata_column_infos) {
5022  cds.push_back(std::move(mci.column_descriptor));
5023  }
5024 
5025  return cds;
5026 }
5027 
5028 bool Importer::gdalStatInternal(const std::string& path,
5029  const CopyParams& copy_params,
5030  bool also_dir) {
5031  // lazy init GDAL
5034  copy_params.s3_endpoint,
5035  copy_params.s3_access_key,
5036  copy_params.s3_secret_key,
5037  copy_params.s3_session_token);
5038 
5039 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5040  // clear GDAL stat cache
5041  // without this, file existence will be cached, even if authentication changes
5042  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5043  VSICurlClearCache();
5044 #endif
5045 
5046  // stat path
5047  VSIStatBufL sb;
5048  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5049  if (result < 0) {
5050  return false;
5051  }
5052 
5053  // exists?
5054  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5055  return true;
5056  } else if (VSI_ISREG(sb.st_mode)) {
5057  return true;
5058  }
5059  return false;
5060 }
5061 
5062 /* static */
5063 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
5064  return gdalStatInternal(path, copy_params, false);
5065 }
5066 
5067 /* static */
5068 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
5069  const CopyParams& copy_params) {
5070  return gdalStatInternal(path, copy_params, true);
5071 }
5072 
5073 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
5074  std::vector<std::string>& files) {
5075  // prepare to gather subdirectories
5076  std::vector<std::string> subdirectories;
5077 
5078  // get entries
5079  char** entries = VSIReadDir(archive_path.c_str());
5080  if (!entries) {
5081  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
5082  return;
5083  }
5084 
5085  // force scope
5086  {
5087  // request clean-up
5088  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5089 
5090  // check all the entries
5091  int index = 0;
5092  while (true) {
5093  // get next entry, or drop out if there isn't one
5094  char* entry_c = entries[index++];
5095  if (!entry_c) {
5096  break;
5097  }
5098  std::string entry(entry_c);
5099 
5100  // ignore '.' and '..'
5101  if (entry == "." || entry == "..") {
5102  continue;
5103  }
5104 
5105  // build the full path
5106  std::string entry_path = archive_path + std::string("/") + entry;
5107 
5108  // is it a file or a sub-folder
5109  VSIStatBufL sb;
5110  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5111  if (result < 0) {
5112  break;
5113  }
5114 
5115  if (VSI_ISDIR(sb.st_mode)) {
5116  // a directory that ends with .gdb could be a Geodatabase bundle
5117  // arguably dangerous to decide this purely by name, but any further
5118  // validation would be very complex especially at this scope
5119  if (boost::iends_with(entry_path, ".gdb")) {
5120  // add the directory as if it was a file and don't recurse into it
5121  files.push_back(entry_path);
5122  } else {
5123  // add subdirectory to be recursed into
5124  subdirectories.push_back(entry_path);
5125  }
5126  } else {
5127  // add this file
5128  files.push_back(entry_path);
5129  }
5130  }
5131  }
5132 
5133  // recurse into each subdirectories we found
5134  for (const auto& subdirectory : subdirectories) {
5135  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5136  }
5137 }
5138 
5139 /* static */
5140 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5141  const std::string& archive_path,
5142  const CopyParams& copy_params) {
5143  // lazy init GDAL
5146  copy_params.s3_endpoint,
5147  copy_params.s3_access_key,
5148  copy_params.s3_secret_key,
5149  copy_params.s3_session_token);
5150 
5151  // prepare to gather files
5152  std::vector<std::string> files;
5153 
5154  // gather the files recursively
5155  gdalGatherFilesInArchiveRecursive(archive_path, files);
5156 
5157  // convert to relative paths inside archive
5158  for (auto& file : files) {
5159  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5160  }
5161 
5162  // done
5163  return files;
5164 }
5165 
5166 /* static */
5167 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5168  const std::string& file_name,
5169  const CopyParams& copy_params) {
5170  // lazy init GDAL
5173  copy_params.s3_endpoint,
5174  copy_params.s3_access_key,
5175  copy_params.s3_secret_key,
5176  copy_params.s3_session_token);
5177 
5178  // prepare to gather layer info
5179  std::vector<GeoFileLayerInfo> layer_info;
5180 
5181  // open the data set
5182  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5183  if (poDS == nullptr) {
5184  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5185  file_name);
5186  }
5187 
5188  // enumerate the layers
5189  for (auto&& poLayer : poDS->GetLayers()) {
5191  // prepare to read this layer
5192  poLayer->ResetReading();
5193  // skip layer if empty
5194  if (poLayer->GetFeatureCount() > 0) {
5195  // get first feature
5196  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5197  CHECK(first_feature);
5198  // check feature for geometry
5199  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5200  if (!geometry) {
5201  // layer has no geometry
5202  contents = GeoFileLayerContents::NON_GEO;
5203  } else {
5204  // check the geometry type
5205  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5206  switch (wkbFlatten(geometry_type)) {
5207  case wkbPoint:
5208  case wkbLineString:
5209  case wkbPolygon:
5210  case wkbMultiPolygon:
5211  // layer has supported geo
5212  contents = GeoFileLayerContents::GEO;
5213  break;
5214  case wkbMultiPoint:
5215  case wkbMultiLineString:
5216  // supported if geo_explode_collections is specified
5217  contents = copy_params.geo_explode_collections
5220  break;
5221  default:
5222  // layer has unsupported geometry
5224  break;
5225  }
5226  }
5227  }
5228  // store info for this layer
5229  layer_info.emplace_back(poLayer->GetName(), contents);
5230  }
5231 
5232  // done
5233  return layer_info;
5234 }
5235 
5237  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5238  const Catalog_Namespace::SessionInfo* session_info,
5239  const bool is_raster) {
5240  if (is_raster) {
5241  return importGDALRaster(session_info);
5242  }
5243  return importGDALGeo(columnNameToSourceNameMap, session_info);
5244 }
5245 
5247  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5248  const Catalog_Namespace::SessionInfo* session_info) {
5249  // initial status
5252  if (poDS == nullptr) {
5253  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5254  file_path);
5255  }
5256 
5257  OGRLayer& layer =
5259 
5260  // get the number of features in this layer
5261  size_t numFeatures = layer.GetFeatureCount();
5262 
5263  // build map of metadata field (additional columns) name to index
5264  // use shared_ptr since we need to pass it to the worker
5265  FieldNameToIndexMapType fieldNameToIndexMap;
5266  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5267  CHECK(poFDefn);
5268  size_t numFields = poFDefn->GetFieldCount();
5269  for (size_t iField = 0; iField < numFields; iField++) {
5270  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5271  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5272  }
5273 
5274  // the geographic spatial reference we want to put everything in
5275  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5276  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5277 
5278 #if GDAL_VERSION_MAJOR >= 3
5279  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5280  // this results in X and Y being transposed for angle-based
5281  // coordinate systems. This restores the previous behavior.
5282  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5283 #endif
5284 
5285 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5286  // just one "thread"
5287  max_threads = 1;
5288 #else
5289  // how many threads to use
5290  max_threads = num_import_threads(copy_params.threads);
5291 #endif
5292  VLOG(1) << "GDAL import # threads: " << max_threads;
5293 
5294  // metadata columns?
5295  auto const metadata_column_infos =
5297 
5298  // import geo table is specifically handled in both DBHandler and QueryRunner
5299  // that is separate path against a normal SQL execution
5300  // so we here explicitly enroll the import session to allow interruption
5301  // while importing geo table
5302  auto query_session = session_info ? session_info->get_session_id() : "";
5303  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5305  auto is_session_already_registered = false;
5306  {
5307  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
5308  is_session_already_registered =
5309  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5310  }
5311  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5312  !is_session_already_registered) {
5313  executor->enrollQuerySession(query_session,
5314  "IMPORT_GEO_TABLE",
5315  query_submitted_time,
5317  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5318  }
5319  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5320  // reset the runtime query interrupt status
5321  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5322  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5323  }
5324  };
5325 
5326  // make an import buffer for each thread
5327  CHECK_EQ(import_buffers_vec.size(), 0u);
5328  import_buffers_vec.resize(max_threads);
5329  for (size_t i = 0; i < max_threads; i++) {
5330  for (const auto cd : loader->get_column_descs()) {
5331  import_buffers_vec[i].emplace_back(
5332  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5333  }
5334  }
5335 
5336  // make render group analyzers for each poly column
5337  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5339  auto& cat = loader->getCatalog();
5340  auto* td = loader->getTableDesc();
5341  CHECK(td);
5342  auto column_descriptors =
5343  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5344  for (auto const& cd : column_descriptors) {
5345  if (IS_GEO_POLY(cd->columnType.get_type())) {
5346  auto rga = std::make_shared<RenderGroupAnalyzer>();
5347  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5348  columnIdToRenderGroupAnalyzerMap[cd->