OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @brief Functions for Importer class
20  *
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
28 #include <gdal.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
55 #include "Archive/S3Archive.h"
56 #include "ArrowImporter.h"
57 #include "Catalog/os/UserMapping.h"
58 #ifdef ENABLE_IMPORT_PARQUET
60 #endif
61 #if defined(ENABLE_IMPORT_PARQUET)
62 #include "Catalog/ForeignTable.h"
64 #endif
65 #ifdef ENABLE_IMPORT_PARQUET
67 #endif
68 #include "Geospatial/Compression.h"
69 #include "Geospatial/GDAL.h"
70 #include "Geospatial/Transforms.h"
71 #include "Geospatial/Types.h"
76 #include "Logger/Logger.h"
78 #include "QueryEngine/Execute.h"
80 #include "RenderGroupAnalyzer.h"
81 #include "Shared/DateTimeParser.h"
82 #include "Shared/SqlTypesLayout.h"
84 #include "Shared/file_path_util.h"
85 #include "Shared/import_helpers.h"
86 #include "Shared/likely.h"
87 #include "Shared/measure.h"
88 #include "Shared/misc.h"
89 #include "Shared/scope.h"
90 #include "Shared/shard_key.h"
91 #include "Shared/thread_count.h"
93 
94 #include "gen-cpp/Heavy.h"
95 
96 #ifdef _WIN32
97 #include <fcntl.h>
98 #include <io.h>
99 #endif
100 
101 #define TIMER_STOP(t) \
102  (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
103  t)) / \
104  1.0E6f)
105 
107  32; // Max number of default import threads to use (num hardware threads will be used
108 // if lower, and can also be explicitly overriden in copy statement with threads
109 // option)
110 size_t g_archive_read_buf_size = 1 << 20;
111 
112 std::optional<size_t> g_detect_test_sample_size = std::nullopt;
113 
114 static constexpr int kMaxRasterScanlinesPerThread = 32;
115 
116 inline auto get_filesize(const std::string& file_path) {
117  boost::filesystem::path boost_file_path{file_path};
118  boost::system::error_code ec;
119  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
120  return ec ? 0 : filesize;
121 }
122 
123 namespace {
124 
125 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
126  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
128  executor->getSessionLock());
129  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
130  }
131  return false;
132 }
133 
134 const size_t num_import_threads(const int copy_params_threads) {
135  if (copy_params_threads > 0) {
136  return static_cast<size_t>(copy_params_threads);
137  }
138  return std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
140 }
141 
142 } // namespace
143 
144 // For logging std::vector<std::string> row.
145 namespace boost {
146 namespace log {
147 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
148  out << '[';
149  for (size_t i = 0; i < row.size(); ++i) {
150  out << (i ? ", " : "") << row[i];
151  }
152  out << ']';
153  return out;
154 }
155 } // namespace log
156 } // namespace boost
157 
158 namespace import_export {
159 
160 using FieldNameToIndexMapType = std::map<std::string, size_t>;
161 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
163  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
164 using FeaturePtrVector = std::vector<Geospatial::GDAL::FeatureUqPtr>;
165 
166 #define DEBUG_TIMING false
167 #define DEBUG_RENDER_GROUP_ANALYZER 0
168 #define DEBUG_AWS_AUTHENTICATION 0
169 
170 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
171 
172 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
173 
175 static std::map<std::string, ImportStatus> import_status_map;
176 
178  const TableDescriptor* t,
179  const std::string& f,
180  const CopyParams& p)
181  : Importer(new Loader(c, t), f, p) {}
182 
183 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
184  : DataStreamSink(p, f), loader(providedLoader) {
185  import_id = boost::filesystem::path(file_path).filename().string();
186  file_size = 0;
187  max_threads = 0;
188  p_file = nullptr;
189  buffer[0] = nullptr;
190  buffer[1] = nullptr;
191  // we may be overallocating a little more memory here due to dropping phy cols.
192  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
193  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
194  int i = 0;
195  bool has_array = false;
196  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
197  int skip_physical_cols = 0;
198  for (auto& p : loader->get_column_descs()) {
199  // phy geo columns can't be in input file
200  if (skip_physical_cols-- > 0) {
201  continue;
202  }
203  // neither are rowid or $deleted$
204  // note: columns can be added after rowid/$deleted$
205  if (p->isVirtualCol || p->isDeletedCol) {
206  continue;
207  }
208  skip_physical_cols = p->columnType.get_physical_cols();
209  if (p->columnType.get_type() == kARRAY) {
210  is_array.get()[i] = true;
211  has_array = true;
212  } else {
213  is_array.get()[i] = false;
214  }
215  ++i;
216  }
217  if (has_array) {
218  is_array_a = std::unique_ptr<bool[]>(is_array.release());
219  } else {
220  is_array_a = std::unique_ptr<bool[]>(nullptr);
221  }
222 }
223 
225  if (p_file != nullptr) {
226  fclose(p_file);
227  }
228  if (buffer[0] != nullptr) {
229  free(buffer[0]);
230  }
231  if (buffer[1] != nullptr) {
232  free(buffer[1]);
233  }
234 }
235 
236 ImportStatus Importer::get_import_status(const std::string& import_id) {
238  return import_status_map.at(import_id);
239 }
240 
241 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
243  is.end = std::chrono::steady_clock::now();
244  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
246 }
247 
248 static const std::string trim_space(const char* field, const size_t len) {
249  size_t i = 0;
250  size_t j = len;
251  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
252  i++;
253  }
254  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
255  j--;
256  }
257  return std::string(field + i, j - i);
258 }
259 
260 namespace {
262  SQLTypes type;
263  if (ti.is_decimal()) {
264  type = decimal_to_int_type(ti);
265  } else if (ti.is_dict_encoded_string()) {
266  type = string_dict_to_int_type(ti);
267  } else {
268  type = ti.get_type();
269  }
270  return type;
271 }
272 } // namespace
273 
275  Datum d;
276  const auto type = get_type_for_datum(ti);
277  switch (type) {
278  case kBOOLEAN:
280  break;
281  case kBIGINT:
283  break;
284  case kINT:
286  break;
287  case kSMALLINT:
289  break;
290  case kTINYINT:
292  break;
293  case kFLOAT:
294  d.floatval = NULL_FLOAT;
295  break;
296  case kDOUBLE:
298  break;
299  case kTIME:
300  case kTIMESTAMP:
301  case kDATE:
303  break;
304  case kPOINT:
305  case kLINESTRING:
306  case kPOLYGON:
307  case kMULTIPOLYGON:
308  throw std::runtime_error("Internal error: geometry type in NullDatum.");
309  default:
310  throw std::runtime_error("Internal error: invalid type in NullDatum.");
311  }
312  return d;
313 }
314 
316  Datum d;
317  const auto type = get_type_for_datum(ti);
318  switch (type) {
319  case kBOOLEAN:
321  break;
322  case kBIGINT:
324  break;
325  case kINT:
327  break;
328  case kSMALLINT:
330  break;
331  case kTINYINT:
333  break;
334  case kFLOAT:
336  break;
337  case kDOUBLE:
339  break;
340  case kTIME:
341  case kTIMESTAMP:
342  case kDATE:
344  break;
345  case kPOINT:
346  case kLINESTRING:
347  case kPOLYGON:
348  case kMULTIPOLYGON:
349  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
350  default:
351  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
352  }
353  return d;
354 }
355 
356 ArrayDatum StringToArray(const std::string& s,
357  const SQLTypeInfo& ti,
358  const CopyParams& copy_params) {
359  SQLTypeInfo elem_ti = ti.get_elem_type();
360  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
361  return ArrayDatum(0, NULL, true);
362  }
363  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
364  LOG(WARNING) << "Malformed array: " << s;
365  return ArrayDatum(0, NULL, true);
366  }
367  std::vector<std::string> elem_strs;
368  size_t last = 1;
369  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
370  i = s.find(copy_params.array_delim, last)) {
371  elem_strs.push_back(s.substr(last, i - last));
372  last = i + 1;
373  }
374  if (last + 1 <= s.size()) {
375  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
376  }
377  if (elem_strs.size() == 1) {
378  auto str = elem_strs.front();
379  auto str_trimmed = trim_space(str.c_str(), str.length());
380  if (str_trimmed == "") {
381  elem_strs.clear(); // Empty array
382  }
383  }
384  if (!elem_ti.is_string()) {
385  size_t len = elem_strs.size() * elem_ti.get_size();
386  std::unique_ptr<int8_t, FreeDeleter> buf(
387  reinterpret_cast<int8_t*>(checked_malloc(len)));
388  int8_t* p = buf.get();
389  for (auto& es : elem_strs) {
390  auto e = trim_space(es.c_str(), es.length());
391  bool is_null = (e == copy_params.null_str) || e == "NULL";
392  if (!elem_ti.is_string() && e == "") {
393  is_null = true;
394  }
395  if (elem_ti.is_number() || elem_ti.is_time()) {
396  if (!isdigit(e[0]) && e[0] != '-') {
397  is_null = true;
398  }
399  }
400  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
401  p = append_datum(p, d, elem_ti);
402  CHECK(p);
403  }
404  return ArrayDatum(len, buf.release(), false);
405  }
406  // must not be called for array of strings
407  CHECK(false);
408  return ArrayDatum(0, NULL, true);
409 }
410 
412  SQLTypeInfo elem_ti = ti.get_elem_type();
413  auto len = ti.get_size();
414 
415  if (len > 0) {
416  // Compose a NULL fixlen array
417  int8_t* buf = (int8_t*)checked_malloc(len);
418  // First scalar is a NULL_ARRAY sentinel
419  Datum d = NullArrayDatum(elem_ti);
420  int8_t* p = append_datum(buf, d, elem_ti);
421  CHECK(p);
422  // Rest is filled with normal NULL sentinels
423  Datum d0 = NullDatum(elem_ti);
424  while ((p - buf) < len) {
425  p = append_datum(p, d0, elem_ti);
426  CHECK(p);
427  }
428  CHECK((p - buf) == len);
429  return ArrayDatum(len, buf, true);
430  }
431  // NULL varlen array
432  return ArrayDatum(0, NULL, true);
433 }
434 
436  return NullArray(ti);
437 }
438 
440  const SQLTypeInfo& geo_ti) {
441  if (geo_ti.get_compression() == kENCODING_GEOINT) {
442  CHECK(geo_ti.get_comp_param() == 32);
443  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
444  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
445  const size_t len = compressed_null_coords.size();
446  int8_t* buf = (int8_t*)checked_malloc(len);
447  memcpy(buf, compressed_null_coords.data(), len);
448  return ArrayDatum(len, buf, false);
449  }
450  auto modified_ti = coords_ti;
451  modified_ti.set_subtype(kDOUBLE);
453 }
454 
455 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
456  const auto& arr = datum.val.arr_val;
457  for (const auto& elem_datum : arr) {
458  string_vec.push_back(elem_datum.val.str_val);
459  }
460 }
461 
462 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
463  Datum d;
464  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
465  switch (type) {
466  case kBOOLEAN:
467  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
468  break;
469  case kBIGINT:
470  d.bigintval =
471  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
472  break;
473  case kINT:
474  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
475  break;
476  case kSMALLINT:
477  d.smallintval =
478  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
479  break;
480  case kTINYINT:
481  d.tinyintval =
482  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
483  break;
484  case kFLOAT:
485  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
486  break;
487  case kDOUBLE:
488  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
489  break;
490  case kTIME:
491  case kTIMESTAMP:
492  case kDATE:
493  d.bigintval =
494  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
495  break;
496  case kPOINT:
497  case kLINESTRING:
498  case kPOLYGON:
499  case kMULTIPOLYGON:
500  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
501  default:
502  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
503  }
504  return d;
505 }
506 
507 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
508  SQLTypeInfo elem_ti = ti.get_elem_type();
509 
510  CHECK(!elem_ti.is_string());
511 
512  if (datum.is_null) {
513  return NullArray(ti);
514  }
515 
516  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
517  int8_t* buf = (int8_t*)checked_malloc(len);
518  int8_t* p = buf;
519  for (auto& e : datum.val.arr_val) {
520  p = append_datum(p, TDatumToDatum(e, elem_ti), elem_ti);
521  CHECK(p);
522  }
523 
524  return ArrayDatum(len, buf, false);
525 }
526 
527 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
529  std::vector<std::string_view> string_view_vec;
530  string_view_vec.reserve(string_vec.size());
531  for (const auto& str : string_vec) {
532  if (str.size() > StringDictionary::MAX_STRLEN) {
533  std::ostringstream oss;
534  oss << "while processing dictionary for column " << getColumnDesc()->columnName
535  << " a string was detected too long for encoding, string length = "
536  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
537  throw std::runtime_error(oss.str());
538  }
539  string_view_vec.push_back(str);
540  }
541  try {
542  switch (column_desc_->columnType.get_size()) {
543  case 1:
544  string_dict_i8_buffer_->resize(string_view_vec.size());
545  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
546  break;
547  case 2:
548  string_dict_i16_buffer_->resize(string_view_vec.size());
549  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
550  break;
551  case 4:
552  string_dict_i32_buffer_->resize(string_view_vec.size());
553  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
554  break;
555  default:
556  CHECK(false);
557  }
558  } catch (std::exception& e) {
559  std::ostringstream oss;
560  oss << "while processing dictionary for column " << getColumnDesc()->columnName
561  << " : " << e.what();
562  LOG(ERROR) << oss.str();
563  throw std::runtime_error(oss.str());
564  }
565 }
566 
568  const std::string_view val,
569  const bool is_null,
570  const CopyParams& copy_params,
571  const bool check_not_null) {
572  const auto type = cd->columnType.get_type();
573  switch (type) {
574  case kBOOLEAN: {
575  if (is_null) {
576  if (check_not_null && cd->columnType.get_notnull()) {
577  throw std::runtime_error("NULL for column " + cd->columnName);
578  }
580  } else {
581  auto ti = cd->columnType;
582  Datum d = StringToDatum(val, ti);
583  addBoolean(static_cast<int8_t>(d.boolval));
584  }
585  break;
586  }
587  case kTINYINT: {
588  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
589  auto ti = cd->columnType;
590  Datum d = StringToDatum(val, ti);
592  } else {
593  if (check_not_null && cd->columnType.get_notnull()) {
594  throw std::runtime_error("NULL for column " + cd->columnName);
595  }
597  }
598  break;
599  }
600  case kSMALLINT: {
601  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
602  auto ti = cd->columnType;
603  Datum d = StringToDatum(val, ti);
605  } else {
606  if (check_not_null && cd->columnType.get_notnull()) {
607  throw std::runtime_error("NULL for column " + cd->columnName);
608  }
610  }
611  break;
612  }
613  case kINT: {
614  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
615  auto ti = cd->columnType;
616  Datum d = StringToDatum(val, ti);
617  addInt(d.intval);
618  } else {
619  if (check_not_null && cd->columnType.get_notnull()) {
620  throw std::runtime_error("NULL for column " + cd->columnName);
621  }
623  }
624  break;
625  }
626  case kBIGINT: {
627  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
628  auto ti = cd->columnType;
629  Datum d = StringToDatum(val, ti);
630  addBigint(d.bigintval);
631  } else {
632  if (check_not_null && cd->columnType.get_notnull()) {
633  throw std::runtime_error("NULL for column " + cd->columnName);
634  }
636  }
637  break;
638  }
639  case kDECIMAL:
640  case kNUMERIC: {
641  if (!is_null) {
642  auto ti = cd->columnType;
643  Datum d = StringToDatum(val, ti);
644  DecimalOverflowValidator validator(ti);
645  validator.validate(d.bigintval);
646  addBigint(d.bigintval);
647  } else {
648  if (check_not_null && cd->columnType.get_notnull()) {
649  throw std::runtime_error("NULL for column " + cd->columnName);
650  }
652  }
653  break;
654  }
655  case kFLOAT:
656  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
657  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
658  } else {
659  if (check_not_null && cd->columnType.get_notnull()) {
660  throw std::runtime_error("NULL for column " + cd->columnName);
661  }
663  }
664  break;
665  case kDOUBLE:
666  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
667  addDouble(std::atof(std::string(val).c_str()));
668  } else {
669  if (check_not_null && cd->columnType.get_notnull()) {
670  throw std::runtime_error("NULL for column " + cd->columnName);
671  }
673  }
674  break;
675  case kTEXT:
676  case kVARCHAR:
677  case kCHAR: {
678  // @TODO(wei) for now, use empty string for nulls
679  if (is_null) {
680  if (check_not_null && cd->columnType.get_notnull()) {
681  throw std::runtime_error("NULL for column " + cd->columnName);
682  }
683  addString(std::string());
684  } else {
685  if (val.length() > StringDictionary::MAX_STRLEN) {
686  throw std::runtime_error("String too long for column " + cd->columnName +
687  " was " + std::to_string(val.length()) + " max is " +
689  }
690  addString(val);
691  }
692  break;
693  }
694  case kTIME:
695  case kTIMESTAMP:
696  case kDATE:
697  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
698  SQLTypeInfo ti = cd->columnType;
699  Datum d = StringToDatum(val, ti);
700  addBigint(d.bigintval);
701  } else {
702  if (check_not_null && cd->columnType.get_notnull()) {
703  throw std::runtime_error("NULL for column " + cd->columnName);
704  }
706  }
707  break;
708  case kARRAY: {
709  if (check_not_null && is_null && cd->columnType.get_notnull()) {
710  throw std::runtime_error("NULL for column " + cd->columnName);
711  }
712  SQLTypeInfo ti = cd->columnType;
713  if (IS_STRING(ti.get_subtype())) {
714  std::vector<std::string> string_vec;
715  // Just parse string array, don't push it to buffer yet as we might throw
717  std::string(val), copy_params, string_vec);
718  if (!is_null) {
719  if (ti.get_size() > 0) {
720  auto sti = ti.get_elem_type();
721  size_t expected_size = ti.get_size() / sti.get_size();
722  size_t actual_size = string_vec.size();
723  if (actual_size != expected_size) {
724  throw std::runtime_error("Fixed length array column " + cd->columnName +
725  " expects " + std::to_string(expected_size) +
726  " values, received " +
727  std::to_string(actual_size));
728  }
729  }
730  addStringArray(string_vec);
731  } else {
732  addStringArray(std::nullopt);
733  }
734  } else {
735  if (!is_null) {
736  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
737  if (d.is_null) { // val could be "NULL"
738  addArray(NullArray(ti));
739  } else {
740  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
741  throw std::runtime_error("Fixed length array for column " + cd->columnName +
742  " has incorrect length: " + std::string(val));
743  }
744  addArray(d);
745  }
746  } else {
747  addArray(NullArray(ti));
748  }
749  }
750  break;
751  }
752  case kPOINT:
753  case kLINESTRING:
754  case kPOLYGON:
755  case kMULTIPOLYGON:
756  addGeoString(val);
757  break;
758  default:
759  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
760  }
761 }
762 
764  const auto type = column_desc_->columnType.is_decimal()
767  switch (type) {
768  case kBOOLEAN:
769  bool_buffer_->pop_back();
770  break;
771  case kTINYINT:
772  tinyint_buffer_->pop_back();
773  break;
774  case kSMALLINT:
775  smallint_buffer_->pop_back();
776  break;
777  case kINT:
778  int_buffer_->pop_back();
779  break;
780  case kBIGINT:
781  bigint_buffer_->pop_back();
782  break;
783  case kFLOAT:
784  float_buffer_->pop_back();
785  break;
786  case kDOUBLE:
787  double_buffer_->pop_back();
788  break;
789  case kTEXT:
790  case kVARCHAR:
791  case kCHAR:
792  string_buffer_->pop_back();
793  break;
794  case kDATE:
795  case kTIME:
796  case kTIMESTAMP:
797  bigint_buffer_->pop_back();
798  break;
799  case kARRAY:
801  string_array_buffer_->pop_back();
802  } else {
803  array_buffer_->pop_back();
804  }
805  break;
806  case kPOINT:
807  case kLINESTRING:
808  case kPOLYGON:
809  case kMULTIPOLYGON:
810  geo_string_buffer_->pop_back();
811  break;
812  default:
813  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
814  }
815 }
816 
817 struct GeoImportException : std::runtime_error {
818  using std::runtime_error::runtime_error;
819 };
820 
821 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
822 template <typename DATA_TYPE>
824  const ColumnDescriptor* cd,
825  const Array& array,
826  std::vector<DATA_TYPE>& buffer,
827  const ArraySliceRange& slice_range,
828  import_export::BadRowsTracker* const bad_rows_tracker) {
829  auto data =
830  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
831  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
832  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
833  if (bad_rows_tracker && cd->columnType.is_geometry()) {
834  f_add_geo_phy_cols = [&](const int64_t row) {
835  // Populate physical columns (ref. DBHandler::load_table)
836  std::vector<double> coords, bounds;
837  std::vector<int> ring_sizes, poly_rings;
838  int render_group = 0;
839  SQLTypeInfo ti;
840  // replace any unexpected exception from getGeoColumns or other
841  // on this path with a GeoImportException so that we wont over
842  // push a null to the logical column...
843  try {
844  SQLTypeInfo import_ti{ti};
845  if (array.IsNull(row)) {
847  import_ti, coords, bounds, ring_sizes, poly_rings, false);
848  } else {
849  arrow_throw_if<GeoImportException>(
851  ti,
852  coords,
853  bounds,
854  ring_sizes,
855  poly_rings,
856  false),
857  error_context(cd, bad_rows_tracker) + "Invalid geometry");
858  arrow_throw_if<GeoImportException>(
859  cd->columnType.get_type() != ti.get_type(),
860  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
861  }
862  auto col_idx_workpad = col_idx; // what a pitfall!!
864  bad_rows_tracker->importer->getCatalog(),
865  cd,
867  col_idx_workpad,
868  coords,
869  bounds,
870  ring_sizes,
871  poly_rings,
872  render_group);
873  } catch (GeoImportException&) {
874  throw;
875  } catch (std::runtime_error& e) {
876  throw GeoImportException(e.what());
877  } catch (const std::exception& e) {
878  throw GeoImportException(e.what());
879  } catch (...) {
880  throw GeoImportException("unknown exception");
881  }
882  };
883  }
884  auto f_mark_a_bad_row = [&](const auto row) {
885  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
886  bad_rows_tracker->rows.insert(row - slice_range.first);
887  };
888  buffer.reserve(slice_range.second - slice_range.first);
889  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
890  try {
891  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
892  f_add_geo_phy_cols(row);
893  } catch (GeoImportException&) {
894  f_mark_a_bad_row(row);
895  } catch (ArrowImporterException&) {
896  // trace bad rows of each column; otherwise rethrow.
897  if (bad_rows_tracker) {
898  *data << nullptr;
899  f_mark_a_bad_row(row);
900  } else {
901  throw;
902  }
903  }
904  }
905  return buffer.size();
906 }
907 
909  const Array& col,
910  const bool exact_type_match,
911  const ArraySliceRange& slice_range,
912  BadRowsTracker* const bad_rows_tracker) {
913  const auto type = cd->columnType.get_type();
914  if (cd->columnType.get_notnull()) {
915  // We can't have any null values for this column; to have them is an error
916  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
917  }
918 
919  switch (type) {
920  case kBOOLEAN:
921  if (exact_type_match) {
922  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
923  }
925  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
926  case kTINYINT:
927  if (exact_type_match) {
928  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
929  }
931  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
932  case kSMALLINT:
933  if (exact_type_match) {
934  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
935  }
937  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
938  case kINT:
939  if (exact_type_match) {
940  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
941  }
943  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
944  case kBIGINT:
945  case kNUMERIC:
946  case kDECIMAL:
947  if (exact_type_match) {
948  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
949  }
951  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
952  case kFLOAT:
953  if (exact_type_match) {
954  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
955  }
957  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
958  case kDOUBLE:
959  if (exact_type_match) {
960  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
961  }
963  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
964  case kTEXT:
965  case kVARCHAR:
966  case kCHAR:
967  if (exact_type_match) {
968  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
969  "Expected string type");
970  }
972  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
973  case kTIME:
974  if (exact_type_match) {
975  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
976  "Expected time32 or time64 type");
977  }
979  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
980  case kTIMESTAMP:
981  if (exact_type_match) {
982  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
983  }
985  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
986  case kDATE:
987  if (exact_type_match) {
988  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
989  "Expected date32 or date64 type");
990  }
992  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
993  case kPOINT:
994  case kLINESTRING:
995  case kPOLYGON:
996  case kMULTIPOLYGON:
997  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
998  "Expected string type");
1000  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
1001  case kARRAY:
1002  throw std::runtime_error("Arrow array appends not yet supported");
1003  default:
1004  throw std::runtime_error("Invalid Type");
1005  }
1006 }
1007 
1008 // this is exclusively used by load_table_binary_columnar
1009 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
1010  size_t dataSize = 0;
1011  if (cd->columnType.get_notnull()) {
1012  // We can't have any null values for this column; to have them is an error
1013  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
1014  throw std::runtime_error("NULL for column " + cd->columnName);
1015  }
1016  }
1017 
1018  switch (cd->columnType.get_type()) {
1019  case kBOOLEAN: {
1020  dataSize = col.data.int_col.size();
1021  bool_buffer_->reserve(dataSize);
1022  for (size_t i = 0; i < dataSize; i++) {
1023  if (col.nulls[i]) {
1025  } else {
1026  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
1027  }
1028  }
1029  break;
1030  }
1031  case kTINYINT: {
1032  dataSize = col.data.int_col.size();
1033  tinyint_buffer_->reserve(dataSize);
1034  for (size_t i = 0; i < dataSize; i++) {
1035  if (col.nulls[i]) {
1037  } else {
1038  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1039  }
1040  }
1041  break;
1042  }
1043  case kSMALLINT: {
1044  dataSize = col.data.int_col.size();
1045  smallint_buffer_->reserve(dataSize);
1046  for (size_t i = 0; i < dataSize; i++) {
1047  if (col.nulls[i]) {
1049  } else {
1050  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1051  }
1052  }
1053  break;
1054  }
1055  case kINT: {
1056  dataSize = col.data.int_col.size();
1057  int_buffer_->reserve(dataSize);
1058  for (size_t i = 0; i < dataSize; i++) {
1059  if (col.nulls[i]) {
1061  } else {
1062  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1063  }
1064  }
1065  break;
1066  }
1067  case kBIGINT:
1068  case kNUMERIC:
1069  case kDECIMAL: {
1070  dataSize = col.data.int_col.size();
1071  bigint_buffer_->reserve(dataSize);
1072  for (size_t i = 0; i < dataSize; i++) {
1073  if (col.nulls[i]) {
1075  } else {
1076  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1077  }
1078  }
1079  break;
1080  }
1081  case kFLOAT: {
1082  dataSize = col.data.real_col.size();
1083  float_buffer_->reserve(dataSize);
1084  for (size_t i = 0; i < dataSize; i++) {
1085  if (col.nulls[i]) {
1086  float_buffer_->push_back(NULL_FLOAT);
1087  } else {
1088  float_buffer_->push_back((float)col.data.real_col[i]);
1089  }
1090  }
1091  break;
1092  }
1093  case kDOUBLE: {
1094  dataSize = col.data.real_col.size();
1095  double_buffer_->reserve(dataSize);
1096  for (size_t i = 0; i < dataSize; i++) {
1097  if (col.nulls[i]) {
1098  double_buffer_->push_back(NULL_DOUBLE);
1099  } else {
1100  double_buffer_->push_back((double)col.data.real_col[i]);
1101  }
1102  }
1103  break;
1104  }
1105  case kTEXT:
1106  case kVARCHAR:
1107  case kCHAR: {
1108  // TODO: for now, use empty string for nulls
1109  dataSize = col.data.str_col.size();
1110  string_buffer_->reserve(dataSize);
1111  for (size_t i = 0; i < dataSize; i++) {
1112  if (col.nulls[i]) {
1113  string_buffer_->push_back(std::string());
1114  } else {
1115  string_buffer_->push_back(col.data.str_col[i]);
1116  }
1117  }
1118  break;
1119  }
1120  case kTIME:
1121  case kTIMESTAMP:
1122  case kDATE: {
1123  dataSize = col.data.int_col.size();
1124  bigint_buffer_->reserve(dataSize);
1125  for (size_t i = 0; i < dataSize; i++) {
1126  if (col.nulls[i]) {
1128  } else {
1129  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1130  }
1131  }
1132  break;
1133  }
1134  case kPOINT:
1135  case kLINESTRING:
1136  case kPOLYGON:
1137  case kMULTIPOLYGON: {
1138  dataSize = col.data.str_col.size();
1139  geo_string_buffer_->reserve(dataSize);
1140  for (size_t i = 0; i < dataSize; i++) {
1141  if (col.nulls[i]) {
1142  // TODO: add support for NULL geo
1143  geo_string_buffer_->push_back(std::string());
1144  } else {
1145  geo_string_buffer_->push_back(col.data.str_col[i]);
1146  }
1147  }
1148  break;
1149  }
1150  case kARRAY: {
1151  dataSize = col.data.arr_col.size();
1152  if (IS_STRING(cd->columnType.get_subtype())) {
1153  for (size_t i = 0; i < dataSize; i++) {
1154  OptionalStringVector& string_vec = addStringArray();
1155  if (!col.nulls[i]) {
1156  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1157  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1158  string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1159  }
1160  }
1161  }
1162  } else {
1163  auto elem_ti = cd->columnType.get_subtype();
1164  switch (elem_ti) {
1165  case kBOOLEAN: {
1166  for (size_t i = 0; i < dataSize; i++) {
1167  if (col.nulls[i]) {
1169  } else {
1170  size_t len = col.data.arr_col[i].data.int_col.size();
1171  size_t byteSize = len * sizeof(int8_t);
1172  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1173  int8_t* p = buf;
1174  for (size_t j = 0; j < len; ++j) {
1175  // Explicitly checking the item for null because
1176  // casting null value (-128) to bool results
1177  // incorrect value 1.
1178  if (col.data.arr_col[i].nulls[j]) {
1179  *p = static_cast<int8_t>(
1181  } else {
1182  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1183  }
1184  p += sizeof(bool);
1185  }
1186  addArray(ArrayDatum(byteSize, buf, false));
1187  }
1188  }
1189  break;
1190  }
1191  case kTINYINT: {
1192  for (size_t i = 0; i < dataSize; i++) {
1193  if (col.nulls[i]) {
1195  } else {
1196  size_t len = col.data.arr_col[i].data.int_col.size();
1197  size_t byteSize = len * sizeof(int8_t);
1198  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1199  int8_t* p = buf;
1200  for (size_t j = 0; j < len; ++j) {
1201  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1202  p += sizeof(int8_t);
1203  }
1204  addArray(ArrayDatum(byteSize, buf, false));
1205  }
1206  }
1207  break;
1208  }
1209  case kSMALLINT: {
1210  for (size_t i = 0; i < dataSize; i++) {
1211  if (col.nulls[i]) {
1213  } else {
1214  size_t len = col.data.arr_col[i].data.int_col.size();
1215  size_t byteSize = len * sizeof(int16_t);
1216  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1217  int8_t* p = buf;
1218  for (size_t j = 0; j < len; ++j) {
1219  *(int16_t*)p =
1220  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1221  p += sizeof(int16_t);
1222  }
1223  addArray(ArrayDatum(byteSize, buf, false));
1224  }
1225  }
1226  break;
1227  }
1228  case kINT: {
1229  for (size_t i = 0; i < dataSize; i++) {
1230  if (col.nulls[i]) {
1232  } else {
1233  size_t len = col.data.arr_col[i].data.int_col.size();
1234  size_t byteSize = len * sizeof(int32_t);
1235  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1236  int8_t* p = buf;
1237  for (size_t j = 0; j < len; ++j) {
1238  *(int32_t*)p =
1239  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1240  p += sizeof(int32_t);
1241  }
1242  addArray(ArrayDatum(byteSize, buf, false));
1243  }
1244  }
1245  break;
1246  }
1247  case kBIGINT:
1248  case kNUMERIC:
1249  case kDECIMAL: {
1250  for (size_t i = 0; i < dataSize; i++) {
1251  if (col.nulls[i]) {
1253  } else {
1254  size_t len = col.data.arr_col[i].data.int_col.size();
1255  size_t byteSize = len * sizeof(int64_t);
1256  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1257  int8_t* p = buf;
1258  for (size_t j = 0; j < len; ++j) {
1259  *(int64_t*)p =
1260  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1261  p += sizeof(int64_t);
1262  }
1263  addArray(ArrayDatum(byteSize, buf, false));
1264  }
1265  }
1266  break;
1267  }
1268  case kFLOAT: {
1269  for (size_t i = 0; i < dataSize; i++) {
1270  if (col.nulls[i]) {
1272  } else {
1273  size_t len = col.data.arr_col[i].data.real_col.size();
1274  size_t byteSize = len * sizeof(float);
1275  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1276  int8_t* p = buf;
1277  for (size_t j = 0; j < len; ++j) {
1278  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1279  p += sizeof(float);
1280  }
1281  addArray(ArrayDatum(byteSize, buf, false));
1282  }
1283  }
1284  break;
1285  }
1286  case kDOUBLE: {
1287  for (size_t i = 0; i < dataSize; i++) {
1288  if (col.nulls[i]) {
1290  } else {
1291  size_t len = col.data.arr_col[i].data.real_col.size();
1292  size_t byteSize = len * sizeof(double);
1293  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1294  int8_t* p = buf;
1295  for (size_t j = 0; j < len; ++j) {
1296  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1297  p += sizeof(double);
1298  }
1299  addArray(ArrayDatum(byteSize, buf, false));
1300  }
1301  }
1302  break;
1303  }
1304  case kTIME:
1305  case kTIMESTAMP:
1306  case kDATE: {
1307  for (size_t i = 0; i < dataSize; i++) {
1308  if (col.nulls[i]) {
1310  } else {
1311  size_t len = col.data.arr_col[i].data.int_col.size();
1312  size_t byteWidth = sizeof(int64_t);
1313  size_t byteSize = len * byteWidth;
1314  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1315  int8_t* p = buf;
1316  for (size_t j = 0; j < len; ++j) {
1317  *reinterpret_cast<int64_t*>(p) =
1318  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1319  p += sizeof(int64_t);
1320  }
1321  addArray(ArrayDatum(byteSize, buf, false));
1322  }
1323  }
1324  break;
1325  }
1326  default:
1327  throw std::runtime_error("Invalid Array Type");
1328  }
1329  }
1330  break;
1331  }
1332  default:
1333  throw std::runtime_error("Invalid Type");
1334  }
1335  return dataSize;
1336 }
1337 
1339  const TDatum& datum,
1340  const bool is_null) {
1341  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1342  : cd->columnType.get_type();
1343  switch (type) {
1344  case kBOOLEAN: {
1345  if (is_null) {
1346  if (cd->columnType.get_notnull()) {
1347  throw std::runtime_error("NULL for column " + cd->columnName);
1348  }
1350  } else {
1351  addBoolean((int8_t)datum.val.int_val);
1352  }
1353  break;
1354  }
1355  case kTINYINT:
1356  if (!is_null) {
1357  addTinyint((int8_t)datum.val.int_val);
1358  } else {
1359  if (cd->columnType.get_notnull()) {
1360  throw std::runtime_error("NULL for column " + cd->columnName);
1361  }
1363  }
1364  break;
1365  case kSMALLINT:
1366  if (!is_null) {
1367  addSmallint((int16_t)datum.val.int_val);
1368  } else {
1369  if (cd->columnType.get_notnull()) {
1370  throw std::runtime_error("NULL for column " + cd->columnName);
1371  }
1373  }
1374  break;
1375  case kINT:
1376  if (!is_null) {
1377  addInt((int32_t)datum.val.int_val);
1378  } else {
1379  if (cd->columnType.get_notnull()) {
1380  throw std::runtime_error("NULL for column " + cd->columnName);
1381  }
1383  }
1384  break;
1385  case kBIGINT:
1386  if (!is_null) {
1387  addBigint(datum.val.int_val);
1388  } else {
1389  if (cd->columnType.get_notnull()) {
1390  throw std::runtime_error("NULL for column " + cd->columnName);
1391  }
1393  }
1394  break;
1395  case kFLOAT:
1396  if (!is_null) {
1397  addFloat((float)datum.val.real_val);
1398  } else {
1399  if (cd->columnType.get_notnull()) {
1400  throw std::runtime_error("NULL for column " + cd->columnName);
1401  }
1403  }
1404  break;
1405  case kDOUBLE:
1406  if (!is_null) {
1407  addDouble(datum.val.real_val);
1408  } else {
1409  if (cd->columnType.get_notnull()) {
1410  throw std::runtime_error("NULL for column " + cd->columnName);
1411  }
1413  }
1414  break;
1415  case kTEXT:
1416  case kVARCHAR:
1417  case kCHAR: {
1418  // @TODO(wei) for now, use empty string for nulls
1419  if (is_null) {
1420  if (cd->columnType.get_notnull()) {
1421  throw std::runtime_error("NULL for column " + cd->columnName);
1422  }
1423  addString(std::string());
1424  } else {
1425  addString(datum.val.str_val);
1426  }
1427  break;
1428  }
1429  case kTIME:
1430  case kTIMESTAMP:
1431  case kDATE: {
1432  if (!is_null) {
1433  addBigint(datum.val.int_val);
1434  } else {
1435  if (cd->columnType.get_notnull()) {
1436  throw std::runtime_error("NULL for column " + cd->columnName);
1437  }
1439  }
1440  break;
1441  }
1442  case kARRAY:
1443  if (is_null && cd->columnType.get_notnull()) {
1444  throw std::runtime_error("NULL for column " + cd->columnName);
1445  }
1446  if (IS_STRING(cd->columnType.get_subtype())) {
1447  OptionalStringVector& string_vec = addStringArray();
1448  addBinaryStringArray(datum, *string_vec);
1449  } else {
1450  if (!is_null) {
1451  addArray(TDatumToArrayDatum(datum, cd->columnType));
1452  } else {
1454  }
1455  }
1456  break;
1457  case kPOINT:
1458  case kLINESTRING:
1459  case kPOLYGON:
1460  case kMULTIPOLYGON:
1461  if (is_null) {
1462  if (cd->columnType.get_notnull()) {
1463  throw std::runtime_error("NULL for column " + cd->columnName);
1464  }
1465  addGeoString(std::string());
1466  } else {
1467  addGeoString(datum.val.str_val);
1468  }
1469  break;
1470  default:
1471  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1472  }
1473 }
1474 
1475 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1476  bool is_null = !cd->default_value.has_value();
1477  CHECK(!(is_null && cd->columnType.get_notnull()));
1478  const auto type = cd->columnType.get_type();
1479  auto ti = cd->columnType;
1480  auto val = cd->default_value.value_or("NULL");
1481  CopyParams cp;
1482  switch (type) {
1483  case kBOOLEAN: {
1484  if (!is_null) {
1485  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1486  } else {
1487  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1488  }
1489  break;
1490  }
1491  case kTINYINT: {
1492  if (!is_null) {
1493  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1494  } else {
1496  }
1497  break;
1498  }
1499  case kSMALLINT: {
1500  if (!is_null) {
1501  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1502  } else {
1503  smallint_buffer_->resize(num_rows,
1505  }
1506  break;
1507  }
1508  case kINT: {
1509  if (!is_null) {
1510  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1511  } else {
1512  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1513  }
1514  break;
1515  }
1516  case kBIGINT: {
1517  if (!is_null) {
1518  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1519  } else {
1521  }
1522  break;
1523  }
1524  case kDECIMAL:
1525  case kNUMERIC: {
1526  if (!is_null) {
1527  const auto converted_decimal_value = convert_decimal_value_to_scale(
1528  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1529  bigint_buffer_->resize(num_rows, converted_decimal_value);
1530  } else {
1532  }
1533  break;
1534  }
1535  case kFLOAT:
1536  if (!is_null) {
1537  float_buffer_->resize(num_rows,
1538  static_cast<float>(std::atof(std::string(val).c_str())));
1539  } else {
1540  float_buffer_->resize(num_rows, NULL_FLOAT);
1541  }
1542  break;
1543  case kDOUBLE:
1544  if (!is_null) {
1545  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1546  } else {
1547  double_buffer_->resize(num_rows, NULL_DOUBLE);
1548  }
1549  break;
1550  case kTEXT:
1551  case kVARCHAR:
1552  case kCHAR: {
1553  if (is_null) {
1554  string_buffer_->resize(num_rows, "");
1555  } else {
1556  if (val.length() > StringDictionary::MAX_STRLEN) {
1557  throw std::runtime_error("String too long for column " + cd->columnName +
1558  " was " + std::to_string(val.length()) + " max is " +
1560  }
1561  string_buffer_->resize(num_rows, val);
1562  }
1563  break;
1564  }
1565  case kTIME:
1566  case kTIMESTAMP:
1567  case kDATE:
1568  if (!is_null) {
1569  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1570  } else {
1572  }
1573  break;
1574  case kARRAY: {
1575  if (IS_STRING(ti.get_subtype())) {
1576  std::vector<std::string> string_vec;
1577  // Just parse string array, don't push it to buffer yet as we might throw
1579  std::string(val), cp, string_vec);
1580  if (!is_null) {
1581  // TODO: add support for NULL string arrays
1582  if (ti.get_size() > 0) {
1583  auto sti = ti.get_elem_type();
1584  size_t expected_size = ti.get_size() / sti.get_size();
1585  size_t actual_size = string_vec.size();
1586  if (actual_size != expected_size) {
1587  throw std::runtime_error("Fixed length array column " + cd->columnName +
1588  " expects " + std::to_string(expected_size) +
1589  " values, received " +
1590  std::to_string(actual_size));
1591  }
1592  }
1593  string_array_buffer_->resize(num_rows, string_vec);
1594  } else {
1595  if (ti.get_size() > 0) {
1596  // TODO: remove once NULL fixlen arrays are allowed
1597  throw std::runtime_error("Fixed length array column " + cd->columnName +
1598  " currently cannot accept NULL arrays");
1599  }
1600  // TODO: add support for NULL string arrays, replace with addStringArray(),
1601  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1602  string_array_buffer_->resize(num_rows, string_vec);
1603  }
1604  } else {
1605  if (!is_null) {
1606  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1607  if (d.is_null) { // val could be "NULL"
1608  array_buffer_->resize(num_rows, NullArray(ti));
1609  } else {
1610  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1611  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1612  " has incorrect length: " + std::string(val));
1613  }
1614  array_buffer_->resize(num_rows, d);
1615  }
1616  } else {
1617  array_buffer_->resize(num_rows, NullArray(ti));
1618  }
1619  }
1620  break;
1621  }
1622  case kPOINT:
1623  case kLINESTRING:
1624  case kPOLYGON:
1625  case kMULTIPOLYGON:
1626  geo_string_buffer_->resize(num_rows, val);
1627  break;
1628  default:
1629  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1630  << type;
1631  }
1632 }
1633 
1634 bool importGeoFromLonLat(double lon,
1635  double lat,
1636  std::vector<double>& coords,
1637  SQLTypeInfo& ti) {
1638  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1639  return false;
1640  }
1641  if (ti.transforms()) {
1642  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1643  if (!pt.transform(ti)) {
1644  return false;
1645  }
1646  pt.getColumns(coords);
1647  return true;
1648  }
1649  coords.push_back(lon);
1650  coords.push_back(lat);
1651  return true;
1652 }
1653 
1655  const Catalog_Namespace::Catalog& catalog,
1656  const ColumnDescriptor* cd,
1657  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1658  size_t& col_idx,
1659  std::vector<double>& coords,
1660  std::vector<double>& bounds,
1661  std::vector<int>& ring_sizes,
1662  std::vector<int>& poly_rings,
1663  int render_group,
1664  const bool force_null) {
1665  const auto col_ti = cd->columnType;
1666  const auto col_type = col_ti.get_type();
1667  auto columnId = cd->columnId;
1668  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1669  bool is_null_geo = false;
1670  bool is_null_point = false;
1671  if (!col_ti.get_notnull()) {
1672  // Check for NULL geo
1673  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1674  is_null_point = true;
1675  coords.clear();
1676  }
1677  is_null_geo = coords.empty();
1678  if (is_null_point) {
1679  coords.push_back(NULL_ARRAY_DOUBLE);
1680  coords.push_back(NULL_DOUBLE);
1681  // Treating POINT coords as notnull, need to store actual encoding
1682  // [un]compressed+[not]null
1683  is_null_geo = false;
1684  }
1685  }
1686  if (force_null) {
1687  is_null_geo = true;
1688  }
1689  TDatum tdd_coords;
1690  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1691  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1692  // in a fixlen array, compressed and uncompressed.
1693  if (!is_null_geo) {
1694  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1695  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1696  for (auto cc : compressed_coords) {
1697  tdd_coords.val.arr_val.emplace_back();
1698  tdd_coords.val.arr_val.back().val.int_val = cc;
1699  }
1700  }
1701  tdd_coords.is_null = is_null_geo;
1702  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1703 
1704  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1705  // Create ring_sizes array value and add it to the physical column
1706  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1707  TDatum tdd_ring_sizes;
1708  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1709  if (!is_null_geo) {
1710  for (auto ring_size : ring_sizes) {
1711  tdd_ring_sizes.val.arr_val.emplace_back();
1712  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1713  }
1714  }
1715  tdd_ring_sizes.is_null = is_null_geo;
1716  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1717  }
1718 
1719  if (col_type == kMULTIPOLYGON) {
1720  // Create poly_rings array value and add it to the physical column
1721  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1722  TDatum tdd_poly_rings;
1723  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1724  if (!is_null_geo) {
1725  for (auto num_rings : poly_rings) {
1726  tdd_poly_rings.val.arr_val.emplace_back();
1727  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1728  }
1729  }
1730  tdd_poly_rings.is_null = is_null_geo;
1731  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1732  }
1733 
1734  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1735  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1736  TDatum tdd_bounds;
1737  tdd_bounds.val.arr_val.reserve(bounds.size());
1738  if (!is_null_geo) {
1739  for (auto b : bounds) {
1740  tdd_bounds.val.arr_val.emplace_back();
1741  tdd_bounds.val.arr_val.back().val.real_val = b;
1742  }
1743  }
1744  tdd_bounds.is_null = is_null_geo;
1745  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1746  }
1747 
1748  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1749  // Create render_group value and add it to the physical column
1750  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1751  TDatum td_render_group;
1752  td_render_group.val.int_val = render_group;
1753  td_render_group.is_null = is_null_geo;
1754  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1755  }
1756 }
1757 
1759  const Catalog_Namespace::Catalog& catalog,
1760  const ColumnDescriptor* cd,
1761  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1762  size_t& col_idx,
1763  std::vector<std::vector<double>>& coords_column,
1764  std::vector<std::vector<double>>& bounds_column,
1765  std::vector<std::vector<int>>& ring_sizes_column,
1766  std::vector<std::vector<int>>& poly_rings_column,
1767  std::vector<int>& render_groups_column) {
1768  const auto col_ti = cd->columnType;
1769  const auto col_type = col_ti.get_type();
1770  auto columnId = cd->columnId;
1771 
1772  auto coords_row_count = coords_column.size();
1773  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1774  for (auto& coords : coords_column) {
1775  bool is_null_geo = false;
1776  bool is_null_point = false;
1777  if (!col_ti.get_notnull()) {
1778  // Check for NULL geo
1779  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1780  is_null_point = true;
1781  coords.clear();
1782  }
1783  is_null_geo = coords.empty();
1784  if (is_null_point) {
1785  coords.push_back(NULL_ARRAY_DOUBLE);
1786  coords.push_back(NULL_DOUBLE);
1787  // Treating POINT coords as notnull, need to store actual encoding
1788  // [un]compressed+[not]null
1789  is_null_geo = false;
1790  }
1791  }
1792  std::vector<TDatum> td_coords_data;
1793  if (!is_null_geo) {
1794  std::vector<uint8_t> compressed_coords =
1795  Geospatial::compress_coords(coords, col_ti);
1796  for (auto const& cc : compressed_coords) {
1797  TDatum td_byte;
1798  td_byte.val.int_val = cc;
1799  td_coords_data.push_back(td_byte);
1800  }
1801  }
1802  TDatum tdd_coords;
1803  tdd_coords.val.arr_val = td_coords_data;
1804  tdd_coords.is_null = is_null_geo;
1805  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1806  }
1807  col_idx++;
1808 
1809  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1810  if (ring_sizes_column.size() != coords_row_count) {
1811  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1812  }
1813  // Create ring_sizes array value and add it to the physical column
1814  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1815  for (auto const& ring_sizes : ring_sizes_column) {
1816  bool is_null_geo = false;
1817  if (!col_ti.get_notnull()) {
1818  // Check for NULL geo
1819  is_null_geo = ring_sizes.empty();
1820  }
1821  std::vector<TDatum> td_ring_sizes;
1822  for (auto const& ring_size : ring_sizes) {
1823  TDatum td_ring_size;
1824  td_ring_size.val.int_val = ring_size;
1825  td_ring_sizes.push_back(td_ring_size);
1826  }
1827  TDatum tdd_ring_sizes;
1828  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1829  tdd_ring_sizes.is_null = is_null_geo;
1830  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1831  }
1832  col_idx++;
1833  }
1834 
1835  if (col_type == kMULTIPOLYGON) {
1836  if (poly_rings_column.size() != coords_row_count) {
1837  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1838  }
1839  // Create poly_rings array value and add it to the physical column
1840  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1841  for (auto const& poly_rings : poly_rings_column) {
1842  bool is_null_geo = false;
1843  if (!col_ti.get_notnull()) {
1844  // Check for NULL geo
1845  is_null_geo = poly_rings.empty();
1846  }
1847  std::vector<TDatum> td_poly_rings;
1848  for (auto const& num_rings : poly_rings) {
1849  TDatum td_num_rings;
1850  td_num_rings.val.int_val = num_rings;
1851  td_poly_rings.push_back(td_num_rings);
1852  }
1853  TDatum tdd_poly_rings;
1854  tdd_poly_rings.val.arr_val = td_poly_rings;
1855  tdd_poly_rings.is_null = is_null_geo;
1856  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1857  }
1858  col_idx++;
1859  }
1860 
1861  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1862  if (bounds_column.size() != coords_row_count) {
1863  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1864  }
1865  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1866  for (auto const& bounds : bounds_column) {
1867  bool is_null_geo = false;
1868  if (!col_ti.get_notnull()) {
1869  // Check for NULL geo
1870  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1871  }
1872  std::vector<TDatum> td_bounds_data;
1873  for (auto const& b : bounds) {
1874  TDatum td_double;
1875  td_double.val.real_val = b;
1876  td_bounds_data.push_back(td_double);
1877  }
1878  TDatum tdd_bounds;
1879  tdd_bounds.val.arr_val = td_bounds_data;
1880  tdd_bounds.is_null = is_null_geo;
1881  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1882  }
1883  col_idx++;
1884  }
1885 
1886  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1887  // Create render_group value and add it to the physical column
1888  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1889  for (auto const& render_group : render_groups_column) {
1890  TDatum td_render_group;
1891  td_render_group.val.int_val = render_group;
1892  td_render_group.is_null = false;
1893  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1894  }
1895  col_idx++;
1896  }
1897 }
1898 
1899 namespace {
1900 
1901 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1902  const std::list<const ColumnDescriptor*>& col_descs) {
1903  // validate the columns
1904  // for now we can only explode into a single destination column
1905  // which must be of the child type (POLYGON, LINESTRING, POINT)
1906  int collection_col_idx = -1;
1907  int col_idx = 0;
1908  std::string collection_col_name;
1909  SQLTypes collection_child_type = kNULLT;
1910  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1911  auto const& cd = *cd_it;
1912  auto const col_type = cd->columnType.get_type();
1913  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1914  if (collection_col_idx >= 0) {
1915  throw std::runtime_error(
1916  "Explode Collections: Found more than one destination column");
1917  }
1918  collection_col_idx = col_idx;
1919  collection_child_type = col_type;
1920  collection_col_name = cd->columnName;
1921  }
1922  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1923  ++cd_it;
1924  }
1925  col_idx++;
1926  }
1927  if (collection_col_idx < 0) {
1928  throw std::runtime_error(
1929  "Explode Collections: Failed to find a supported column type to explode "
1930  "into");
1931  }
1932  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1933 }
1934 
1936  OGRGeometry* ogr_geometry,
1937  SQLTypes collection_child_type,
1938  const std::string& collection_col_name,
1939  size_t row_or_feature_idx,
1940  std::function<void(OGRGeometry*)> execute_import_lambda) {
1941  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1942  bool is_collection = false;
1943  switch (collection_child_type) {
1944  case kPOINT:
1945  switch (ogr_geometry_type) {
1946  case wkbMultiPoint:
1947  is_collection = true;
1948  break;
1949  case wkbPoint:
1950  break;
1951  default:
1952  throw std::runtime_error(
1953  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1954  }
1955  break;
1956  case kLINESTRING:
1957  switch (ogr_geometry_type) {
1958  case wkbMultiLineString:
1959  is_collection = true;
1960  break;
1961  case wkbLineString:
1962  break;
1963  default:
1964  throw std::runtime_error(
1965  "Explode Collections: Source geo type must be MULTILINESTRING or "
1966  "LINESTRING");
1967  }
1968  break;
1969  case kPOLYGON:
1970  switch (ogr_geometry_type) {
1971  case wkbMultiPolygon:
1972  is_collection = true;
1973  break;
1974  case wkbPolygon:
1975  break;
1976  default:
1977  throw std::runtime_error(
1978  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1979  }
1980  break;
1981  default:
1982  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1983  }
1984 
1985  int64_t us = 0LL;
1986 
1987  // explode or just import
1988  if (is_collection) {
1989  // cast to collection
1990  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1991  CHECK(collection_geometry);
1992 
1993 #if LOG_EXPLODE_COLLECTIONS
1994  // log number of children
1995  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1996  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1997  << " child rows";
1998 #endif
1999 
2000  // loop over children
2001  uint32_t child_geometry_count = 0;
2002  auto child_geometry_it = collection_geometry->begin();
2003  while (child_geometry_it != collection_geometry->end()) {
2004  // get and import this child
2005  OGRGeometry* import_geometry = *child_geometry_it;
2007  [&] { execute_import_lambda(import_geometry); });
2008 
2009  // next child
2010  child_geometry_it++;
2011  child_geometry_count++;
2012  }
2013  } else {
2014  // import non-collection row just once
2016  [&] { execute_import_lambda(ogr_geometry); });
2017  }
2018 
2019  // done
2020  return us;
2021 }
2022 
2023 } // namespace
2024 
2026  int thread_id,
2027  Importer* importer,
2028  std::unique_ptr<char[]> scratch_buffer,
2029  size_t begin_pos,
2030  size_t end_pos,
2031  size_t total_size,
2032  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2033  size_t first_row_index_this_buffer,
2034  const Catalog_Namespace::SessionInfo* session_info,
2035  Executor* executor) {
2036  ImportStatus thread_import_status;
2037  int64_t total_get_row_time_us = 0;
2038  int64_t total_str_to_val_time_us = 0;
2039  auto query_session = session_info ? session_info->get_session_id() : "";
2040  CHECK(scratch_buffer);
2041  auto buffer = scratch_buffer.get();
2042  auto load_ms = measure<>::execution([]() {});
2043 
2044  thread_import_status.thread_id = thread_id;
2045 
2046  auto ms = measure<>::execution([&]() {
2047  const CopyParams& copy_params = importer->get_copy_params();
2048  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2049  size_t begin =
2050  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2051  const char* thread_buf = buffer + begin_pos + begin;
2052  const char* thread_buf_end = buffer + end_pos;
2053  const char* buf_end = buffer + total_size;
2054  bool try_single_thread = false;
2055  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2056  importer->get_import_buffers(thread_id);
2058  int phys_cols = 0;
2059  int point_cols = 0;
2060  for (const auto cd : col_descs) {
2061  const auto& col_ti = cd->columnType;
2062  phys_cols += col_ti.get_physical_cols();
2063  if (cd->columnType.get_type() == kPOINT) {
2064  point_cols++;
2065  }
2066  }
2067  auto num_cols = col_descs.size() - phys_cols;
2068  for (const auto& p : import_buffers) {
2069  p->clear();
2070  }
2071  std::vector<std::string_view> row;
2072  size_t row_index_plus_one = 0;
2073  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2074  row.clear();
2075  std::vector<std::unique_ptr<char[]>>
2076  tmp_buffers; // holds string w/ removed escape chars, etc
2077  if (DEBUG_TIMING) {
2080  thread_buf_end,
2081  buf_end,
2082  copy_params,
2083  importer->get_is_array(),
2084  row,
2085  tmp_buffers,
2086  try_single_thread,
2087  true);
2088  });
2089  total_get_row_time_us += us;
2090  } else {
2092  thread_buf_end,
2093  buf_end,
2094  copy_params,
2095  importer->get_is_array(),
2096  row,
2097  tmp_buffers,
2098  try_single_thread,
2099  true);
2100  }
2101  row_index_plus_one++;
2102  // Each POINT could consume two separate coords instead of a single WKT
2103  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2104  thread_import_status.rows_rejected++;
2105  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2106  << row.size() << "): " << shared::printContainer(row);
2107  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2108  break;
2109  }
2110  continue;
2111  }
2112 
2113  //
2114  // lambda for importing a row (perhaps multiple times if exploding a collection)
2115  //
2116 
2117  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2118  size_t import_idx = 0;
2119  size_t col_idx = 0;
2120  try {
2121  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2122  auto cd = *cd_it;
2123  const auto& col_ti = cd->columnType;
2124 
2125  bool is_null =
2126  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
2127  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2128  // So initially nullness may be missed and not passed to add_value,
2129  // which then might also check and still decide it's actually a NULL, e.g.
2130  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2131  // So "NULL" is not recognized as NULL but then it's not recognized as
2132  // a valid kINT, so it's a NULL after all.
2133  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2134 
2135  // Treating empty as NULL
2136  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2137  is_null = true;
2138  }
2139  if (!cd->columnType.is_string() && !copy_params.trim_spaces) {
2140  // everything but strings should be always trimmed
2141  row[import_idx] = sv_strip(row[import_idx]);
2142  }
2143 
2144  if (col_ti.get_physical_cols() == 0) {
2145  // not geo
2146 
2147  import_buffers[col_idx]->add_value(
2148  cd, row[import_idx], is_null, copy_params);
2149 
2150  // next
2151  ++import_idx;
2152  ++col_idx;
2153  } else {
2154  // geo
2155 
2156  // store null string in the base column
2157  import_buffers[col_idx]->add_value(
2158  cd, copy_params.null_str, true, copy_params);
2159 
2160  // WKT from string we're not storing
2161  auto const& geo_string = row[import_idx];
2162 
2163  // next
2164  ++import_idx;
2165  ++col_idx;
2166 
2167  SQLTypes col_type = col_ti.get_type();
2168  CHECK(IS_GEO(col_type));
2169 
2170  std::vector<double> coords;
2171  std::vector<double> bounds;
2172  std::vector<int> ring_sizes;
2173  std::vector<int> poly_rings;
2174  int render_group = 0;
2175 
2176  // if this is a POINT column, and the field is not null, and
2177  // looks like a scalar numeric value (and not a hex blob)
2178  // attempt to import two columns as lon/lat (or lat/lon)
2179  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
2180  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2181  geo_string[0] == '-') &&
2182  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2183  double lon = std::atof(std::string(geo_string).c_str());
2184  double lat = NAN;
2185  auto lat_str = row[import_idx];
2186  ++import_idx;
2187  if (lat_str.size() > 0 &&
2188  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2189  lat = std::atof(std::string(lat_str).c_str());
2190  }
2191  // Swap coordinates if this table uses a reverse order: lat/lon
2192  if (!copy_params.lonlat) {
2193  std::swap(lat, lon);
2194  }
2195  // TODO: should check if POINT column should have been declared with
2196  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2197  // throw std::runtime_error("POINT column " + cd->columnName + " is
2198  // not WGS84, cannot insert lon/lat");
2199  // }
2200  SQLTypeInfo import_ti{col_ti};
2201  if (copy_params.source_type ==
2203  import_ti.get_output_srid() == 4326) {
2204  auto srid0 = copy_params.source_srid;
2205  if (srid0 > 0) {
2206  // srid0 -> 4326 transform is requested on import
2207  import_ti.set_input_srid(srid0);
2208  }
2209  }
2210  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2211  throw std::runtime_error(
2212  "Cannot read lon/lat to insert into POINT column " +
2213  cd->columnName);
2214  }
2215  } else {
2216  // import it
2217  SQLTypeInfo import_ti{col_ti};
2218  if (copy_params.source_type ==
2220  import_ti.get_output_srid() == 4326) {
2221  auto srid0 = copy_params.source_srid;
2222  if (srid0 > 0) {
2223  // srid0 -> 4326 transform is requested on import
2224  import_ti.set_input_srid(srid0);
2225  }
2226  }
2227  if (is_null) {
2228  if (col_ti.get_notnull()) {
2229  throw std::runtime_error("NULL geo for column " + cd->columnName);
2230  }
2232  import_ti,
2233  coords,
2234  bounds,
2235  ring_sizes,
2236  poly_rings,
2238  } else {
2239  if (import_geometry) {
2240  // geometry already exploded
2242  import_geometry,
2243  import_ti,
2244  coords,
2245  bounds,
2246  ring_sizes,
2247  poly_rings,
2249  std::string msg =
2250  "Failed to extract valid geometry from exploded row " +
2251  std::to_string(first_row_index_this_buffer +
2252  row_index_plus_one) +
2253  " for column " + cd->columnName;
2254  throw std::runtime_error(msg);
2255  }
2256  } else {
2257  // extract geometry directly from WKT
2259  std::string(geo_string),
2260  import_ti,
2261  coords,
2262  bounds,
2263  ring_sizes,
2264  poly_rings,
2266  std::string msg = "Failed to extract valid geometry from row " +
2267  std::to_string(first_row_index_this_buffer +
2268  row_index_plus_one) +
2269  " for column " + cd->columnName;
2270  throw std::runtime_error(msg);
2271  }
2272  }
2273 
2274  // validate types
2275  if (col_type != import_ti.get_type()) {
2277  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2278  col_type == SQLTypes::kMULTIPOLYGON)) {
2279  throw std::runtime_error(
2280  "Imported geometry doesn't match the type of column " +
2281  cd->columnName);
2282  }
2283  }
2284  }
2285 
2286  // assign render group?
2287  if (columnIdToRenderGroupAnalyzerMap.size()) {
2288  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2289  if (ring_sizes.size()) {
2290  // get a suitable render group for these poly coords
2291  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2292  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2293  render_group =
2294  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2295  } else {
2296  // empty poly
2297  render_group = -1;
2298  }
2299  }
2300  }
2301  }
2302 
2303  // import extracted geo
2305  cd,
2306  import_buffers,
2307  col_idx,
2308  coords,
2309  bounds,
2310  ring_sizes,
2311  poly_rings,
2312  render_group);
2313 
2314  // skip remaining physical columns
2315  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2316  ++cd_it;
2317  }
2318  }
2319  }
2320  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2321  check_session_interrupted(query_session, executor))) {
2322  thread_import_status.load_failed = true;
2323  thread_import_status.load_msg =
2324  "Table load was cancelled via Query Interrupt";
2325  return;
2326  }
2327  thread_import_status.rows_completed++;
2328  } catch (const std::exception& e) {
2329  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2330  import_buffers[col_idx_to_pop]->pop_value();
2331  }
2332  thread_import_status.rows_rejected++;
2333  LOG(ERROR) << "Input exception thrown: " << e.what()
2334  << ". Row discarded. Data: " << shared::printContainer(row);
2335  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2336  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2337  thread_import_status.load_failed = true;
2338  thread_import_status.load_msg =
2339  "Load was cancelled due to max reject rows being reached";
2340  }
2341  }
2342  }; // End of lambda
2343 
2344  if (copy_params.geo_explode_collections) {
2345  // explode and import
2346  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2347  explode_collections_step1(col_descs);
2348  // pull out the collection WKT or WKB hex
2349  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2350  auto const& collection_geo_string = row[collection_col_idx];
2351  // convert to OGR
2352  OGRGeometry* ogr_geometry = nullptr;
2353  ScopeGuard destroy_ogr_geometry = [&] {
2354  if (ogr_geometry) {
2355  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2356  }
2357  };
2359  std::string(collection_geo_string));
2360  // do the explode and import
2361  us = explode_collections_step2(ogr_geometry,
2362  collection_child_type,
2363  collection_col_name,
2364  first_row_index_this_buffer + row_index_plus_one,
2365  execute_import_row);
2366  } else {
2367  // import non-collection row just once
2369  [&] { execute_import_row(nullptr); });
2370  }
2371 
2372  if (thread_import_status.load_failed) {
2373  break;
2374  }
2375  } // end thread
2376  total_str_to_val_time_us += us;
2377  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2378  load_ms = measure<>::execution([&]() {
2379  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2380  });
2381  }
2382  }); // end execution
2383 
2384  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2385  thread_import_status.rows_completed > 0) {
2386  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2387  << thread_import_status.rows_completed << " rows inserted in "
2388  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2389  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2390  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2391  << "sec" << std::endl;
2392  }
2393 
2394  return thread_import_status;
2395 }
2396 
2397 class ColumnNotGeoError : public std::runtime_error {
2398  public:
2399  ColumnNotGeoError(const std::string& column_name)
2400  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2401 };
2402 
2404  int thread_id,
2405  Importer* importer,
2406  OGRCoordinateTransformation* coordinate_transformation,
2407  const FeaturePtrVector& features,
2408  size_t firstFeature,
2409  size_t numFeatures,
2410  const FieldNameToIndexMapType& fieldNameToIndexMap,
2411  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2412  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2413  const Catalog_Namespace::SessionInfo* session_info,
2414  Executor* executor,
2415  const MetadataColumnInfos& metadata_column_infos) {
2416  ImportStatus thread_import_status;
2417  const CopyParams& copy_params = importer->get_copy_params();
2418  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2419  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2420  importer->get_import_buffers(thread_id);
2421  auto query_session = session_info ? session_info->get_session_id() : "";
2422  for (const auto& p : import_buffers) {
2423  p->clear();
2424  }
2425 
2426  auto convert_timer = timer_start();
2427 
2428  // for all the features in this chunk...
2429  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2430  // ignore null features
2431  if (!features[iFeature]) {
2432  continue;
2433  }
2434 
2435  // get this feature's geometry
2436  // for geodatabase, we need to consider features with no geometry
2437  // as we still want to create a table, even if it has no geo column
2438  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2439  if (pGeometry && coordinate_transformation) {
2440  pGeometry->transform(coordinate_transformation);
2441  }
2442 
2443  //
2444  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2445  //
2446 
2447  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2448  size_t col_idx = 0;
2449  try {
2450  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2451  check_session_interrupted(query_session, executor))) {
2452  thread_import_status.load_failed = true;
2453  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2455  }
2456 
2457  uint32_t field_column_count{0u};
2458  uint32_t metadata_column_count{0u};
2459 
2460  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2461  auto cd = *cd_it;
2462 
2463  // is this a geo column?
2464  const auto& col_ti = cd->columnType;
2465  if (col_ti.is_geometry()) {
2466  // Note that this assumes there is one and only one geo column in the
2467  // table. Currently, the importer only supports reading a single
2468  // geospatial feature from an input shapefile / geojson file, but this
2469  // code will need to be modified if that changes
2470  SQLTypes col_type = col_ti.get_type();
2471 
2472  // store null string in the base column
2473  import_buffers[col_idx]->add_value(
2474  cd, copy_params.null_str, true, copy_params);
2475  ++col_idx;
2476 
2477  // the data we now need to extract for the other columns
2478  std::vector<double> coords;
2479  std::vector<double> bounds;
2480  std::vector<int> ring_sizes;
2481  std::vector<int> poly_rings;
2482  int render_group = 0;
2483 
2484  // extract it
2485  SQLTypeInfo import_ti{col_ti};
2486  bool is_null_geo = !import_geometry;
2487  if (is_null_geo) {
2488  if (col_ti.get_notnull()) {
2489  throw std::runtime_error("NULL geo for column " + cd->columnName);
2490  }
2492  import_ti,
2493  coords,
2494  bounds,
2495  ring_sizes,
2496  poly_rings,
2498  } else {
2500  import_geometry,
2501  import_ti,
2502  coords,
2503  bounds,
2504  ring_sizes,
2505  poly_rings,
2507  std::string msg = "Failed to extract valid geometry from feature " +
2508  std::to_string(firstFeature + iFeature + 1) +
2509  " for column " + cd->columnName;
2510  throw std::runtime_error(msg);
2511  }
2512 
2513  // validate types
2514  if (col_type != import_ti.get_type()) {
2516  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2517  col_type == SQLTypes::kMULTIPOLYGON)) {
2518  throw std::runtime_error(
2519  "Imported geometry doesn't match the type of column " +
2520  cd->columnName);
2521  }
2522  }
2523  }
2524 
2525  if (columnIdToRenderGroupAnalyzerMap.size()) {
2526  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2527  if (ring_sizes.size()) {
2528  // get a suitable render group for these poly coords
2529  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2530  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2531  render_group =
2532  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2533  } else {
2534  // empty poly
2535  render_group = -1;
2536  }
2537  }
2538  }
2539 
2540  // create coords array value and add it to the physical column
2541  ++cd_it;
2542  auto cd_coords = *cd_it;
2543  std::vector<TDatum> td_coord_data;
2544  if (!is_null_geo) {
2545  std::vector<uint8_t> compressed_coords =
2546  Geospatial::compress_coords(coords, col_ti);
2547  for (auto cc : compressed_coords) {
2548  TDatum td_byte;
2549  td_byte.val.int_val = cc;
2550  td_coord_data.push_back(td_byte);
2551  }
2552  }
2553  TDatum tdd_coords;
2554  tdd_coords.val.arr_val = td_coord_data;
2555  tdd_coords.is_null = is_null_geo;
2556  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2557  ++col_idx;
2558 
2559  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2560  // Create ring_sizes array value and add it to the physical column
2561  ++cd_it;
2562  auto cd_ring_sizes = *cd_it;
2563  std::vector<TDatum> td_ring_sizes;
2564  if (!is_null_geo) {
2565  for (auto ring_size : ring_sizes) {
2566  TDatum td_ring_size;
2567  td_ring_size.val.int_val = ring_size;
2568  td_ring_sizes.push_back(td_ring_size);
2569  }
2570  }
2571  TDatum tdd_ring_sizes;
2572  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2573  tdd_ring_sizes.is_null = is_null_geo;
2574  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2575  ++col_idx;
2576  }
2577 
2578  if (col_type == kMULTIPOLYGON) {
2579  // Create poly_rings array value and add it to the physical column
2580  ++cd_it;
2581  auto cd_poly_rings = *cd_it;
2582  std::vector<TDatum> td_poly_rings;
2583  if (!is_null_geo) {
2584  for (auto num_rings : poly_rings) {
2585  TDatum td_num_rings;
2586  td_num_rings.val.int_val = num_rings;
2587  td_poly_rings.push_back(td_num_rings);
2588  }
2589  }
2590  TDatum tdd_poly_rings;
2591  tdd_poly_rings.val.arr_val = td_poly_rings;
2592  tdd_poly_rings.is_null = is_null_geo;
2593  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2594  ++col_idx;
2595  }
2596 
2597  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2598  col_type == kMULTIPOLYGON) {
2599  // Create bounds array value and add it to the physical column
2600  ++cd_it;
2601  auto cd_bounds = *cd_it;
2602  std::vector<TDatum> td_bounds_data;
2603  if (!is_null_geo) {
2604  for (auto b : bounds) {
2605  TDatum td_double;
2606  td_double.val.real_val = b;
2607  td_bounds_data.push_back(td_double);
2608  }
2609  }
2610  TDatum tdd_bounds;
2611  tdd_bounds.val.arr_val = td_bounds_data;
2612  tdd_bounds.is_null = is_null_geo;
2613  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2614  ++col_idx;
2615  }
2616 
2617  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2618  // Create render_group value and add it to the physical column
2619  ++cd_it;
2620  auto cd_render_group = *cd_it;
2621  TDatum td_render_group;
2622  td_render_group.val.int_val = render_group;
2623  td_render_group.is_null = is_null_geo;
2624  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2625  ++col_idx;
2626  }
2627  } else if (field_column_count < fieldNameToIndexMap.size()) {
2628  //
2629  // field column
2630  //
2631  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2632  CHECK(cit != columnNameToSourceNameMap.end());
2633  auto const& field_name = cit->second;
2634 
2635  auto const fit = fieldNameToIndexMap.find(field_name);
2636  if (fit == fieldNameToIndexMap.end()) {
2637  throw ColumnNotGeoError(cd->columnName);
2638  }
2639 
2640  auto const& field_index = fit->second;
2641  CHECK(field_index < fieldNameToIndexMap.size());
2642 
2643  auto const& feature = features[iFeature];
2644 
2645  auto field_defn = feature->GetFieldDefnRef(field_index);
2646  CHECK(field_defn);
2647 
2648  // OGRFeature::GetFieldAsString() can only return 80 characters
2649  // so for array columns, we are obliged to fetch the actual values
2650  // and construct the concatenated string ourselves
2651 
2652  std::string value_string;
2653  int array_index = 0, array_size = 0;
2654 
2655  auto stringify_numeric_list = [&](auto* values) {
2656  value_string = "{";
2657  while (array_index < array_size) {
2658  auto separator = (array_index > 0) ? "," : "";
2659  value_string += separator + std::to_string(values[array_index]);
2660  array_index++;
2661  }
2662  value_string += "}";
2663  };
2664 
2665  auto field_type = field_defn->GetType();
2666  switch (field_type) {
2667  case OFTInteger:
2668  case OFTInteger64:
2669  case OFTReal:
2670  case OFTString:
2671  case OFTBinary:
2672  case OFTDate:
2673  case OFTTime:
2674  case OFTDateTime: {
2675  value_string = feature->GetFieldAsString(field_index);
2676  } break;
2677  case OFTIntegerList: {
2678  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2679  stringify_numeric_list(values);
2680  } break;
2681  case OFTInteger64List: {
2682  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2683  stringify_numeric_list(values);
2684  } break;
2685  case OFTRealList: {
2686  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2687  stringify_numeric_list(values);
2688  } break;
2689  case OFTStringList: {
2690  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2691  value_string = "{";
2692  if (array_of_strings) {
2693  while (auto* this_string = array_of_strings[array_index]) {
2694  auto separator = (array_index > 0) ? "," : "";
2695  value_string += separator + std::string(this_string);
2696  array_index++;
2697  }
2698  }
2699  value_string += "}";
2700  } break;
2701  default:
2702  throw std::runtime_error("Unsupported geo file field type (" +
2703  std::to_string(static_cast<int>(field_type)) +
2704  ")");
2705  }
2706 
2707  import_buffers[col_idx]->add_value(cd, value_string, false, copy_params);
2708  ++col_idx;
2709  field_column_count++;
2710  } else if (metadata_column_count < metadata_column_infos.size()) {
2711  //
2712  // metadata column
2713  //
2714  auto const& mci = metadata_column_infos[metadata_column_count];
2715  if (mci.column_descriptor.columnName != cd->columnName) {
2716  throw std::runtime_error("Metadata column name mismatch");
2717  }
2718  import_buffers[col_idx]->add_value(cd, mci.value, false, copy_params);
2719  ++col_idx;
2720  metadata_column_count++;
2721  } else {
2722  throw std::runtime_error("Column count mismatch");
2723  }
2724  }
2725  thread_import_status.rows_completed++;
2726  } catch (QueryExecutionError& e) {
2728  throw e;
2729  }
2730  } catch (ColumnNotGeoError& e) {
2731  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2732  throw std::runtime_error(e.what());
2733  } catch (const std::exception& e) {
2734  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2735  import_buffers[col_idx_to_pop]->pop_value();
2736  }
2737  thread_import_status.rows_rejected++;
2738  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2739  }
2740  };
2741 
2742  if (pGeometry && copy_params.geo_explode_collections) {
2743  // explode and import
2744  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2745  explode_collections_step1(col_descs);
2746  explode_collections_step2(pGeometry,
2747  collection_child_type,
2748  collection_col_name,
2749  firstFeature + iFeature + 1,
2750  execute_import_feature);
2751  } else {
2752  // import non-collection or null feature just once
2753  execute_import_feature(pGeometry);
2754  }
2755  } // end features
2756 
2757  float convert_s = TIMER_STOP(convert_timer);
2758 
2759  float load_s = 0.0f;
2760  if (thread_import_status.rows_completed > 0) {
2761  auto load_timer = timer_start();
2762  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2763  load_s = TIMER_STOP(load_timer);
2764  }
2765 
2766  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2767  LOG(INFO) << "DEBUG: Process " << convert_s << "s";
2768  LOG(INFO) << "DEBUG: Load " << load_s << "s";
2769  LOG(INFO) << "DEBUG: Total " << (convert_s + load_s) << "s";
2770  }
2771 
2772  thread_import_status.thread_id = thread_id;
2773 
2774  return thread_import_status;
2775 }
2776 
2778  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2779  size_t row_count,
2780  const Catalog_Namespace::SessionInfo* session_info) {
2781  return loadImpl(import_buffers, row_count, false, session_info);
2782 }
2783 
2784 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2785  size_t row_count,
2786  const Catalog_Namespace::SessionInfo* session_info) {
2787  return loadImpl(import_buffers, row_count, true, session_info);
2788 }
2789 
2790 namespace {
2791 
2792 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2793  const auto& ti = import_buffer.getTypeInfo();
2794  const int8_t* values_buffer{nullptr};
2795  if (ti.is_string()) {
2796  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2797  values_buffer = import_buffer.getStringDictBuffer();
2798  } else {
2799  values_buffer = import_buffer.getAsBytes();
2800  }
2801  CHECK(values_buffer);
2802  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2803  switch (logical_size) {
2804  case 1: {
2805  return values_buffer[index];
2806  }
2807  case 2: {
2808  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2809  }
2810  case 4: {
2811  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2812  }
2813  case 8: {
2814  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2815  }
2816  default:
2817  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2818  }
2819  UNREACHABLE();
2820  return 0;
2821 }
2822 
2823 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2824  const auto& ti = import_buffer.getTypeInfo();
2825  CHECK_EQ(kFLOAT, ti.get_type());
2826  const auto values_buffer = import_buffer.getAsBytes();
2827  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2828 }
2829 
2830 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2831  const auto& ti = import_buffer.getTypeInfo();
2832  CHECK_EQ(kDOUBLE, ti.get_type());
2833  const auto values_buffer = import_buffer.getAsBytes();
2834  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2835 }
2836 
2837 } // namespace
2838 
2839 void Loader::fillShardRow(const size_t row_index,
2840  OneShardBuffers& shard_output_buffers,
2841  const OneShardBuffers& import_buffers) {
2842  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2843  const auto& input_buffer = import_buffers[col_idx];
2844  const auto& col_ti = input_buffer->getTypeInfo();
2845  const auto type =
2846  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2847 
2848  switch (type) {
2849  case kBOOLEAN:
2850  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2851  break;
2852  case kTINYINT:
2853  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2854  break;
2855  case kSMALLINT:
2856  shard_output_buffers[col_idx]->addSmallint(
2857  int_value_at(*input_buffer, row_index));
2858  break;
2859  case kINT:
2860  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2861  break;
2862  case kBIGINT:
2863  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2864  break;
2865  case kFLOAT:
2866  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2867  break;
2868  case kDOUBLE:
2869  shard_output_buffers[col_idx]->addDouble(
2870  double_value_at(*input_buffer, row_index));
2871  break;
2872  case kTEXT:
2873  case kVARCHAR:
2874  case kCHAR: {
2875  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2876  shard_output_buffers[col_idx]->addString(
2877  (*input_buffer->getStringBuffer())[row_index]);
2878  break;
2879  }
2880  case kTIME:
2881  case kTIMESTAMP:
2882  case kDATE:
2883  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2884  break;
2885  case kARRAY:
2886  if (IS_STRING(col_ti.get_subtype())) {
2887  CHECK(input_buffer->getStringArrayBuffer());
2888  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2889  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2890  shard_output_buffers[col_idx]->addStringArray(input_arr);
2891  } else {
2892  shard_output_buffers[col_idx]->addArray(
2893  (*input_buffer->getArrayBuffer())[row_index]);
2894  }
2895  break;
2896  case kPOINT:
2897  case kLINESTRING:
2898  case kPOLYGON:
2899  case kMULTIPOLYGON: {
2900  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2901  shard_output_buffers[col_idx]->addGeoString(
2902  (*input_buffer->getGeoStringBuffer())[row_index]);
2903  break;
2904  }
2905  default:
2906  CHECK(false);
2907  }
2908  }
2909 }
2910 
2912  std::vector<OneShardBuffers>& all_shard_import_buffers,
2913  std::vector<size_t>& all_shard_row_counts,
2914  const OneShardBuffers& import_buffers,
2915  const size_t row_count,
2916  const size_t shard_count,
2917  const Catalog_Namespace::SessionInfo* session_info) {
2918  int col_idx{0};
2919  const ColumnDescriptor* shard_col_desc{nullptr};
2920  for (const auto col_desc : column_descs_) {
2921  ++col_idx;
2922  if (col_idx == table_desc_->shardedColumnId) {
2923  shard_col_desc = col_desc;
2924  break;
2925  }
2926  }
2927  CHECK(shard_col_desc);
2928  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2929  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2930  const auto& shard_col_ti = shard_col_desc->columnType;
2931  CHECK(shard_col_ti.is_integer() ||
2932  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2933  shard_col_ti.is_time());
2934  if (shard_col_ti.is_string()) {
2935  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2936  CHECK(payloads_ptr);
2937  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2938  }
2939 
2940  for (size_t i = 0; i < row_count; ++i) {
2941  const size_t shard =
2942  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2943  auto& shard_output_buffers = all_shard_import_buffers[shard];
2944  fillShardRow(i, shard_output_buffers, import_buffers);
2945  ++all_shard_row_counts[shard];
2946  }
2947 }
2948 
2950  std::vector<OneShardBuffers>& all_shard_import_buffers,
2951  std::vector<size_t>& all_shard_row_counts,
2952  const OneShardBuffers& import_buffers,
2953  const size_t row_count,
2954  const size_t shard_count,
2955  const Catalog_Namespace::SessionInfo* session_info) {
2956  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2957  CHECK(shard_tds.size() == shard_count);
2958 
2959  for (size_t shard = 0; shard < shard_count; ++shard) {
2960  auto& shard_output_buffers = all_shard_import_buffers[shard];
2961  if (row_count != 0) {
2962  fillShardRow(0, shard_output_buffers, import_buffers);
2963  }
2964  // when replicating a column, row count of a shard == replicate count of the column
2965  // on the shard
2966  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2967  }
2968 }
2969 
2970 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2971  std::vector<size_t>& all_shard_row_counts,
2972  const OneShardBuffers& import_buffers,
2973  const size_t row_count,
2974  const size_t shard_count,
2975  const Catalog_Namespace::SessionInfo* session_info) {
2976  all_shard_row_counts.resize(shard_count);
2977  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2978  all_shard_import_buffers.emplace_back();
2979  for (const auto& typed_import_buffer : import_buffers) {
2980  all_shard_import_buffers.back().emplace_back(
2981  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2982  typed_import_buffer->getStringDictionary()));
2983  }
2984  }
2986  if (isAddingColumns()) {
2987  distributeToShardsNewColumns(all_shard_import_buffers,
2988  all_shard_row_counts,
2989  import_buffers,
2990  row_count,
2991  shard_count,
2992  session_info);
2993  } else {
2994  distributeToShardsExistingColumns(all_shard_import_buffers,
2995  all_shard_row_counts,
2996  import_buffers,
2997  row_count,
2998  shard_count,
2999  session_info);
3000  }
3001 }
3002 
3004  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3005  size_t row_count,
3006  bool checkpoint,
3007  const Catalog_Namespace::SessionInfo* session_info) {
3008  if (load_callback_) {
3009  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
3010  return load_callback_(import_buffers, data_blocks, row_count);
3011  }
3012  if (table_desc_->nShards) {
3013  std::vector<OneShardBuffers> all_shard_import_buffers;
3014  std::vector<size_t> all_shard_row_counts;
3015  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
3016  distributeToShards(all_shard_import_buffers,
3017  all_shard_row_counts,
3018  import_buffers,
3019  row_count,
3020  shard_tables.size(),
3021  session_info);
3022  bool success = true;
3023  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3024  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3025  all_shard_row_counts[shard_idx],
3026  shard_tables[shard_idx],
3027  checkpoint,
3028  session_info);
3029  }
3030  return success;
3031  }
3032  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3033 }
3034 
3036  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3037  std::vector<DataBlockPtr> result(import_buffers.size());
3038  std::vector<std::pair<const size_t, std::future<int8_t*>>>
3039  encoded_data_block_ptrs_futures;
3040  // make all async calls to string dictionary here and then continue execution
3041  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3042  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
3043  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
3044  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3045  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
3046 
3047  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3048  buf_idx,
3049  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
3050  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3051  return import_buffers[buf_idx]->getStringDictBuffer();
3052  })));
3053  }
3054  }
3055 
3056  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3057  DataBlockPtr p;
3058  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
3059  import_buffers[buf_idx]->getTypeInfo().is_time() ||
3060  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
3061  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
3062  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
3063  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3064  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
3065  p.stringsPtr = string_payload_ptr;
3066  } else {
3067  // This condition means we have column which is ENCODED string. We already made
3068  // Async request to gain the encoded integer values above so we should skip this
3069  // iteration and continue.
3070  continue;
3071  }
3072  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
3073  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
3074  p.stringsPtr = geo_payload_ptr;
3075  } else {
3076  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
3077  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
3078  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
3079  import_buffers[buf_idx]->addDictEncodedStringArray(
3080  *import_buffers[buf_idx]->getStringArrayBuffer());
3081  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
3082  } else {
3083  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
3084  }
3085  }
3086  result[buf_idx] = p;
3087  }
3088 
3089  // wait for the async requests we made for string dictionary
3090  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3091  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3092  }
3093  return result;
3094 }
3095 
3097  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3098  size_t row_count,
3099  const TableDescriptor* shard_table,
3100  bool checkpoint,
3101  const Catalog_Namespace::SessionInfo* session_info) {
3102  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3104  ins_data.numRows = row_count;
3105  bool success = false;
3106  try {
3107  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3108  } catch (std::exception& e) {
3109  std::ostringstream oss;
3110  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3111  << e.what();
3112 
3113  LOG(ERROR) << oss.str();
3114  error_msg_ = oss.str();
3115  return success;
3116  }
3117  if (isAddingColumns()) {
3118  // when Adding columns we omit any columns except the ones being added
3119  ins_data.columnIds.clear();
3120  ins_data.is_default.clear();
3121  for (auto& buffer : import_buffers) {
3122  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3123  ins_data.is_default.push_back(true);
3124  }
3125  } else {
3126  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3127  }
3128  // release loader_lock so that in InsertOrderFragmenter::insertDat
3129  // we can have multiple threads sort/shuffle InsertData
3130  loader_lock.unlock();
3131  success = true;
3132  {
3133  try {
3134  if (checkpoint) {
3135  shard_table->fragmenter->insertData(ins_data);
3136  } else {
3137  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3138  }
3139  } catch (std::exception& e) {
3140  std::ostringstream oss;
3141  oss << "Fragmenter Insert Exception when processing Table "
3142  << shard_table->tableName << " issue was " << e.what();
3143 
3144  LOG(ERROR) << oss.str();
3145  loader_lock.lock();
3146  error_msg_ = oss.str();
3147  success = false;
3148  }
3149  }
3150  return success;
3151 }
3152 
3153 void Loader::dropColumns(const std::vector<int>& columnIds) {
3154  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3155  if (table_desc_->nShards) {
3157  }
3158  for (auto table_desc : table_descs) {
3159  table_desc->fragmenter->dropColumns(columnIds);
3160  }
3161 }
3162 
3166  for (auto cd : column_descs_) {
3167  insert_data_.columnIds.push_back(cd->columnId);
3168  if (cd->columnType.get_compression() == kENCODING_DICT) {
3169  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3170  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3171  CHECK(dd);
3172  dict_map_[cd->columnId] = dd->stringDict.get();
3173  }
3174  }
3175  insert_data_.numRows = 0;
3176 }
3177 
3180  split_raw_data();
3182 }
3183 
3185  const std::string& file_path,
3186  const bool decompressed,
3187  const Catalog_Namespace::SessionInfo* session_info) {
3188  // we do not check interrupt status for this detection
3189  if (!p_file) {
3190  p_file = fopen(file_path.c_str(), "rb");
3191  }
3192  if (!p_file) {
3193  throw std::runtime_error("failed to open file '" + file_path +
3194  "': " + strerror(errno));
3195  }
3196 
3197  // somehow clang does not support ext/stdio_filebuf.h, so
3198  // need to diy readline with customized copy_params.line_delim...
3199  std::string line;
3200  line.reserve(1 * 1024 * 1024);
3201  auto end_time = std::chrono::steady_clock::now() +
3202  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3203  try {
3204  while (!feof(p_file)) {
3205  int c;
3206  size_t n = 0;
3207  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3208  if (n++ >= line.capacity()) {
3209  break;
3210  }
3211  line += c;
3212  }
3213  if (0 == n) {
3214  break;
3215  }
3216  // remember the first line, which is possibly a header line, to
3217  // ignore identical header line(s) in 2nd+ files of a archive;
3218  // otherwise, 2nd+ header may be mistaken as an all-string row
3219  // and so be final column types.
3220  if (line1.empty()) {
3221  line1 = line;
3222  } else if (line == line1) {
3223  line.clear();
3224  continue;
3225  }
3226 
3227  raw_data += line;
3229  line.clear();
3231  if (std::chrono::steady_clock::now() > end_time) {
3232  if (import_status_.rows_completed > 10000) {
3233  break;
3234  }
3235  }
3236  }
3237  } catch (std::exception& e) {
3238  }
3239 
3241  import_status_.load_failed = true;
3242 
3243  fclose(p_file);
3244  p_file = nullptr;
3245  return import_status_;
3246 }
3247 
3249  // this becomes analogous to Importer::import()
3250  (void)DataStreamSink::archivePlumber(nullptr);
3251 }
3252 
3254  if (copy_params.delimiter == '\0') {
3255  copy_params.delimiter = ',';
3256  if (boost::filesystem::extension(file_path) == ".tsv") {
3257  copy_params.delimiter = '\t';
3258  }
3259  }
3260 }
3261 
3263  const char* buf = raw_data.c_str();
3264  const char* buf_end = buf + raw_data.size();
3265  bool try_single_thread = false;
3266  for (const char* p = buf; p < buf_end; p++) {
3267  std::vector<std::string> row;
3268  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3270  buf_end,
3271  buf_end,
3272  copy_params,
3273  nullptr,
3274  row,
3275  tmp_buffers,
3276  try_single_thread,
3277  true);
3278  raw_rows.push_back(row);
3279  if (try_single_thread) {
3280  break;
3281  }
3282  }
3283  if (try_single_thread) {
3284  copy_params.threads = 1;
3285  raw_rows.clear();
3286  for (const char* p = buf; p < buf_end; p++) {
3287  std::vector<std::string> row;
3288  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3290  buf_end,
3291  buf_end,
3292  copy_params,
3293  nullptr,
3294  row,
3295  tmp_buffers,
3296  try_single_thread,
3297  true);
3298  raw_rows.push_back(row);
3299  }
3300  }
3301 }
3302 
3303 template <class T>
3304 bool try_cast(const std::string& str) {
3305  try {
3306  boost::lexical_cast<T>(str);
3307  } catch (const boost::bad_lexical_cast& e) {
3308  return false;
3309  }
3310  return true;
3311 }
3312 
3313 SQLTypes Detector::detect_sqltype(const std::string& str) {
3314  SQLTypes type = kTEXT;
3315  if (try_cast<double>(str)) {
3316  type = kDOUBLE;
3317  /*if (try_cast<bool>(str)) {
3318  type = kBOOLEAN;
3319  }*/
3320  if (try_cast<int16_t>(str)) {
3321  type = kSMALLINT;
3322  } else if (try_cast<int32_t>(str)) {
3323  type = kINT;
3324  } else if (try_cast<int64_t>(str)) {
3325  type = kBIGINT;
3326  } else if (try_cast<float>(str)) {
3327  type = kFLOAT;
3328  }
3329  }
3330 
3331  // check for geo types
3332  if (type == kTEXT) {
3333  // convert to upper case
3334  std::string str_upper_case = str;
3336  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3337 
3338  // then test for leading words
3339  if (str_upper_case.find("POINT") == 0) {
3340  type = kPOINT;
3341  } else if (str_upper_case.find("LINESTRING") == 0) {
3342  type = kLINESTRING;
3343  } else if (str_upper_case.find("POLYGON") == 0) {
3345  type = kMULTIPOLYGON;
3346  } else {
3347  type = kPOLYGON;
3348  }
3349  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3350  type = kMULTIPOLYGON;
3351  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3352  std::string::npos &&
3353  (str_upper_case.size() % 2) == 0) {
3354  // simple hex blob (two characters per byte, not uu-encode or base64)
3355  if (str_upper_case.size() >= 10) {
3356  // match WKB blobs for supported geometry types
3357  // the first byte specifies if the data is big-endian or little-endian
3358  // the next four bytes are the geometry type (1 = POINT etc.)
3359  // @TODO support eWKB, which has extra bits set in the geometry type
3360  auto first_five_bytes = str_upper_case.substr(0, 10);
3361  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3362  type = kPOINT;
3363  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3364  type = kLINESTRING;
3365  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3366  type = kPOLYGON;
3367  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3368  type = kMULTIPOLYGON;
3369  } else {
3370  // unsupported WKB type
3371  return type;
3372  }
3373  } else {
3374  // too short to be WKB
3375  return type;
3376  }
3377  }
3378  }
3379 
3380  // check for time types
3381  if (type == kTEXT) {
3382  // This won't match unix timestamp, since floats and ints were checked above.
3383  if (dateTimeParseOptional<kTIME>(str, 0)) {
3384  type = kTIME;
3385  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3386  type = kTIMESTAMP;
3387  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3388  type = kDATE;
3389  }
3390  }
3391 
3392  return type;
3393 }
3394 
3395 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3396  std::vector<SQLTypes> types(row.size());
3397  for (size_t i = 0; i < row.size(); i++) {
3398  types[i] = detect_sqltype(row[i]);
3399  }
3400  return types;
3401 }
3402 
3404  static std::array<int, kSQLTYPE_LAST> typeorder;
3405  typeorder[kCHAR] = 0;
3406  typeorder[kBOOLEAN] = 2;
3407  typeorder[kSMALLINT] = 3;
3408  typeorder[kINT] = 4;
3409  typeorder[kBIGINT] = 5;
3410  typeorder[kFLOAT] = 6;
3411  typeorder[kDOUBLE] = 7;
3412  typeorder[kTIMESTAMP] = 8;
3413  typeorder[kTIME] = 9;
3414  typeorder[kDATE] = 10;
3415  typeorder[kPOINT] = 11;
3416  typeorder[kLINESTRING] = 11;
3417  typeorder[kPOLYGON] = 11;
3418  typeorder[kMULTIPOLYGON] = 11;
3419  typeorder[kTEXT] = 12;
3420 
3421  // note: b < a instead of a < b because the map is ordered most to least restrictive
3422  return typeorder[b] < typeorder[a];
3423 }
3424 
3427  best_encodings =
3428  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3429  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3430  switch (copy_params.has_header) {
3432  has_headers = detect_headers(head_types, best_sqltypes);
3433  if (has_headers) {
3435  } else {
3437  }
3438  break;
3440  has_headers = false;
3441  break;
3443  has_headers = true;
3444  break;
3445  }
3446 }
3447 
3450 }
3451 
3452 std::vector<SQLTypes> Detector::find_best_sqltypes(
3453  const std::vector<std::vector<std::string>>& raw_rows,
3454  const CopyParams& copy_params) {
3455  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3456 }
3457 
3458 std::vector<SQLTypes> Detector::find_best_sqltypes(
3459  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3460  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3461  const CopyParams& copy_params) {
3462  if (raw_rows.size() < 1) {
3463  throw std::runtime_error("No rows found in: " +
3464  boost::filesystem::basename(file_path));
3465  }
3466  auto end_time = std::chrono::steady_clock::now() + timeout;
3467  size_t num_cols = raw_rows.front().size();
3468  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3469  std::vector<size_t> non_null_col_counts(num_cols, 0);
3470  for (auto row = row_begin; row != row_end; row++) {
3471  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3472  best_types.push_back(kCHAR);
3473  non_null_col_counts.push_back(0);
3474  }
3475  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3476  // do not count nulls
3477  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3478  continue;
3479  }
3480  SQLTypes t = detect_sqltype(row->at(col_idx));
3481  non_null_col_counts[col_idx]++;
3482  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3483  best_types[col_idx] = t;
3484  }
3485  }
3486  if (std::chrono::steady_clock::now() > end_time) {
3487  break;
3488  }
3489  }
3490  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3491  // if we don't have any non-null values for this column make it text to be
3492  // safe b/c that is least restrictive type
3493  if (non_null_col_counts[col_idx] == 0) {
3494  best_types[col_idx] = kTEXT;
3495  }
3496  }
3497 
3498  return best_types;
3499 }
3500 
3501 std::vector<EncodingType> Detector::find_best_encodings(
3502  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3503  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3504  const std::vector<SQLTypes>& best_types) {
3505  if (raw_rows.size() < 1) {
3506  throw std::runtime_error("No rows found in: " +
3507  boost::filesystem::basename(file_path));
3508  }
3509  size_t num_cols = best_types.size();
3510  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3511  std::vector<size_t> num_rows_per_col(num_cols, 1);
3512  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3513  for (auto row = row_begin; row != row_end; row++) {
3514  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3515  if (IS_STRING(best_types[col_idx])) {
3516  count_set[col_idx].insert(row->at(col_idx));
3517  num_rows_per_col[col_idx]++;
3518  }
3519  }
3520  }
3521  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3522  if (IS_STRING(best_types[col_idx])) {
3523  float uniqueRatio =
3524  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3525  if (uniqueRatio < 0.75) {
3526  best_encodes[col_idx] = kENCODING_DICT;
3527  }
3528  }
3529  }
3530  return best_encodes;
3531 }
3532 
3533 // detect_headers returns true if:
3534 // - all elements of the first argument are kTEXT
3535 // - there is at least one instance where tail_types is more restrictive than head_types
3536 // (ie, not kTEXT)
3537 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3538  const std::vector<SQLTypes>& tail_types) {
3539  if (head_types.size() != tail_types.size()) {
3540  return false;
3541  }
3542  bool has_headers = false;
3543  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3544  if (head_types[col_idx] != kTEXT) {
3545  return false;
3546  }
3547  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3548  }
3549  return has_headers;
3550 }
3551 
3552 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3553 #if defined(ENABLE_IMPORT_PARQUET)
3554  if (data_preview_.has_value()) {
3555  return data_preview_.value().sample_rows;
3556  } else
3557 #endif
3558  {
3559  n = std::min(n, raw_rows.size());
3560  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3561  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3562  raw_rows.begin() + n);
3563  return sample_rows;
3564  }
3565 }
3566 
3567 std::vector<std::string> Detector::get_headers() {
3568 #if defined(ENABLE_IMPORT_PARQUET)
3569  if (data_preview_.has_value()) {
3570  return data_preview_.value().column_names;
3571  } else
3572 #endif
3573  {
3574  std::vector<std::string> headers(best_sqltypes.size());
3575  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3576  if (has_headers && i < raw_rows[0].size()) {
3577  headers[i] = raw_rows[0][i];
3578  } else {
3579  headers[i] = "column_" + std::to_string(i + 1);
3580  }
3581  }
3582  return headers;
3583  }
3584 }
3585 
3586 std::vector<SQLTypeInfo> Detector::getBestColumnTypes() const {
3587 #if defined(ENABLE_IMPORT_PARQUET)
3588  if (data_preview_.has_value()) {
3589  return data_preview_.value().column_types;
3590  } else
3591 #endif
3592  {
3593  std::vector<SQLTypeInfo> types;
3594  CHECK_EQ(best_sqltypes.size(), best_encodings.size());
3595  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3596  types.emplace_back(best_sqltypes[i], false, best_encodings[i]);
3597  }
3598  return types;
3599  }
3600 }
3601 
3602 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3603  size_t row_count,
3604  const Catalog_Namespace::SessionInfo* session_info) {
3605  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3607  import_status_.load_failed = true;
3608  import_status_.load_msg = loader->getErrorMessage();
3609  }
3610 }
3611 
3613  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3614  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3617  // rollback to starting epoch - undo all the added records
3618  loader->setTableEpochs(table_epochs);
3619  } else {
3620  loader->checkpoint();
3621  }
3622  }
3623 
3624  if (loader->getTableDesc()->persistenceLevel ==
3625  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3626  // tables
3627  auto ms = measure<>::execution([&]() {
3629  if (!import_status_.load_failed) {
3630  for (auto& p : import_buffers_vec[0]) {
3631  if (!p->stringDictCheckpoint()) {
3632  LOG(ERROR) << "Checkpointing Dictionary for Column "
3633  << p->getColumnDesc()->columnName << " failed.";
3634  import_status_.load_failed = true;
3635  import_status_.load_msg = "Dictionary checkpoint failed";
3636  break;
3637  }
3638  }
3639  }
3640  });
3641  if (DEBUG_TIMING) {
3642  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3643  << std::endl;
3644  }
3645  }
3646 }
3647 
3649  const Catalog_Namespace::SessionInfo* session_info) {
3650  // in generalized importing scheme, reaching here file_path may
3651  // contain a file path, a url or a wildcard of file paths.
3652  // see CopyTableStmt::execute.
3653 
3654  std::vector<std::string> file_paths;
3655  try {
3660  file_paths = shared::local_glob_filter_sort_files(file_path, options);
3661  } catch (const shared::FileNotFoundException& e) {
3662  // After finding no matching files locally, file_path may still be an s3 url
3663  file_paths.push_back(file_path);
3664  }
3665 
3666  // sum up sizes of all local files -- only for local files. if
3667  // file_path is a s3 url, sizes will be obtained via S3Archive.
3668  for (const auto& file_path : file_paths) {
3670  }
3671 
3672  // s3 parquet goes different route because the files do not use libarchive
3673  // but parquet api, and they need to landed like .7z files.
3674  //
3675  // note: parquet must be explicitly specified by a WITH parameter
3676  // "source_type='parquet_file'", because for example spark sql users may specify a
3677  // output url w/o file extension like this:
3678  // df.write
3679  // .mode("overwrite")
3680  // .parquet("s3://bucket/folder/parquet/mydata")
3681  // without the parameter, it means plain or compressed csv files.
3682  // note: .ORC and AVRO files should follow a similar path to Parquet?
3684 #ifdef ENABLE_IMPORT_PARQUET
3685  import_parquet(file_paths, session_info);
3686 #else
3687  throw std::runtime_error("Parquet not supported!");
3688 #endif
3689  } else {
3690  import_compressed(file_paths, session_info);
3691  }
3692 
3693  return import_status_;
3694 }
3695 
3696 namespace {
3697 #ifdef ENABLE_IMPORT_PARQUET
3698 
3699 #ifdef HAVE_AWS_S3
3700 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3701 create_parquet_s3_detect_filesystem_config(const foreign_storage::ForeignServer* server,
3702  const CopyParams& copy_params) {
3703  foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3704  config;
3705 
3706  if (!copy_params.s3_access_key.empty()) {
3707  config.s3_access_key = copy_params.s3_access_key;
3708  }
3709  if (!copy_params.s3_secret_key.empty()) {
3710  config.s3_secret_key = copy_params.s3_secret_key;
3711  }
3712  if (!copy_params.s3_session_token.empty()) {
3713  config.s3_session_token = copy_params.s3_session_token;
3714  }
3715 
3716  return config;
3717 }
3718 #endif
3719 
3720 foreign_storage::DataPreview get_parquet_data_preview(const std::string& file_name,
3721  const CopyParams& copy_params) {
3722  TableDescriptor td;
3723  td.tableName = "parquet-detect-table";
3724  td.tableId = -1;
3726  auto [foreign_server, user_mapping, foreign_table] =
3727  foreign_storage::create_proxy_fsi_objects(file_name, copy_params, &td);
3728 
3729  std::shared_ptr<arrow::fs::FileSystem> file_system;
3730  auto& server_options = foreign_server->options;
3731  if (server_options
3733  ->second ==
3735  file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3736 #ifdef HAVE_AWS_S3
3737  } else if (server_options
3739  ->second ==
3742  create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3743 #endif
3744  } else {
3745  UNREACHABLE();
3746  }
3747 
3748  auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3749  foreign_table.get(), file_system);
3750  return parquet_data_wrapper->getDataPreview(g_detect_test_sample_size.has_value()
3751  ? g_detect_test_sample_size.value()
3753 }
3754 #endif
3755 
3756 } // namespace
3757 
3758 Detector::Detector(const boost::filesystem::path& fp, CopyParams& cp)
3759  : DataStreamSink(cp, fp.string()), file_path(fp) {
3760 #ifdef ENABLE_IMPORT_PARQUET
3762  !g_enable_legacy_parquet_import) {
3763  data_preview_ = get_parquet_data_preview(fp.string(), cp);
3764  } else
3765 #endif
3766  {
3767  read_file();
3768  init();
3769  }
3770 }
3771 
3772 #ifdef ENABLE_IMPORT_PARQUET
3773 inline auto open_parquet_table(const std::string& file_path,
3774  std::shared_ptr<arrow::io::ReadableFile>& infile,
3775  std::unique_ptr<parquet::arrow::FileReader>& reader,
3776  std::shared_ptr<arrow::Table>& table) {
3777  using namespace parquet::arrow;
3778  auto file_result = arrow::io::ReadableFile::Open(file_path);
3779  PARQUET_THROW_NOT_OK(file_result.status());
3780  infile = file_result.ValueOrDie();
3781 
3782  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3783  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3784  const auto num_row_groups = reader->num_row_groups();
3785  const auto num_columns = table->num_columns();
3786  const auto num_rows = table->num_rows();
3787  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3788  << " columns in " << num_row_groups << " groups.";
3789  return std::make_tuple(num_row_groups, num_columns, num_rows);
3790 } // namespace import_export
3791 
3792 void Detector::import_local_parquet(const std::string& file_path,
3793  const Catalog_Namespace::SessionInfo* session_info) {
3794  /*Skip interrupt checking in detector*/
3795  std::shared_ptr<arrow::io::ReadableFile> infile;
3796  std::unique_ptr<parquet::arrow::FileReader> reader;
3797  std::shared_ptr<arrow::Table> table;
3798  int num_row_groups, num_columns;
3799  int64_t num_rows;
3800  std::tie(num_row_groups, num_columns, num_rows) =
3801  open_parquet_table(file_path, infile, reader, table);
3802  // make up header line if not yet
3803  if (0 == raw_data.size()) {
3804  copy_params.has_header = ImportHeaderRow::kHasHeader;
3805  copy_params.line_delim = '\n';
3806  copy_params.delimiter = ',';
3807  // must quote values to skip any embedded delimiter
3808  copy_params.quoted = true;
3809  copy_params.quote = '"';
3810  copy_params.escape = '"';
3811  for (int c = 0; c < num_columns; ++c) {
3812  if (c) {
3813  raw_data += copy_params.delimiter;
3814  }
3815  raw_data += table->ColumnNames().at(c);
3816  }
3817  raw_data += copy_params.line_delim;
3818  }
3819  // make up raw data... rowwize...
3820  const ColumnDescriptor cd;
3821  for (int g = 0; g < num_row_groups; ++g) {
3822  // data is columnwise
3823  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3824  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3825  arrays.resize(num_columns);
3826  for (int c = 0; c < num_columns; ++c) {
3827  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3828  for (auto chunk : arrays[c]->chunks()) {
3829  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3830  }
3831  }
3832  for (int r = 0; r < num_rows; ++r) {
3833  for (int c = 0; c < num_columns; ++c) {
3834  std::vector<std::string> buffer;
3835  for (auto chunk : arrays[c]->chunks()) {
3836  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3837  if (c) {
3838  raw_data += copy_params.delimiter;
3839  }
3840  if (!chunk->IsNull(r)) {
3841  raw_data += copy_params.quote;
3842  raw_data += boost::replace_all_copy(
3843  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3844  raw_data += copy_params.quote;
3845  }
3846  }
3847  }
3848  raw_data += copy_params.line_delim;
3850  if (++import_status_.rows_completed >= 10000) {
3851  // as if load truncated
3852  import_status_.load_failed = true;
3853  import_status_.load_msg = "Detector processed 10000 records";
3854  return;
3855  }
3856  }
3857  }
3858 }
3859 
3860 template <typename DATA_TYPE>
3862  std::vector<DATA_TYPE>& buffer,
3863  import_export::BadRowsTracker* const bad_rows_tracker) {
3864  const auto old_size = buffer.size();
3865  // erase backward to minimize memory movement overhead
3866  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3867  ++rit) {
3868  buffer.erase(buffer.begin() + *rit);
3869  }
3870  return std::make_tuple(old_size, buffer.size());
3871 }
3872 
3874  BadRowsTracker* const bad_rows_tracker) {
3875  switch (type) {
3876  case kBOOLEAN:
3877  return del_values(*bool_buffer_, bad_rows_tracker);
3878  case kTINYINT:
3879  return del_values(*tinyint_buffer_, bad_rows_tracker);
3880  case kSMALLINT:
3881  return del_values(*smallint_buffer_, bad_rows_tracker);
3882  case kINT:
3883  return del_values(*int_buffer_, bad_rows_tracker);
3884  case kBIGINT:
3885  case kNUMERIC:
3886  case kDECIMAL:
3887  case kTIME:
3888  case kTIMESTAMP:
3889  case kDATE:
3890  return del_values(*bigint_buffer_, bad_rows_tracker);
3891  case kFLOAT:
3892  return del_values(*float_buffer_, bad_rows_tracker);
3893  case kDOUBLE:
3894  return del_values(*double_buffer_, bad_rows_tracker);
3895  case kTEXT:
3896  case kVARCHAR:
3897  case kCHAR:
3898  return del_values(*string_buffer_, bad_rows_tracker);
3899  case kPOINT:
3900  case kLINESTRING:
3901  case kPOLYGON:
3902  case kMULTIPOLYGON:
3903  return del_values(*geo_string_buffer_, bad_rows_tracker);
3904  case kARRAY:
3905  return del_values(*array_buffer_, bad_rows_tracker);
3906  default:
3907  throw std::runtime_error("Invalid Type");
3908  }
3909 }
3910 
3911 void Importer::import_local_parquet(const std::string& file_path,
3912  const Catalog_Namespace::SessionInfo* session_info) {
3913  std::shared_ptr<arrow::io::ReadableFile> infile;
3914  std::unique_ptr<parquet::arrow::FileReader> reader;
3915  std::shared_ptr<arrow::Table> table;
3916  int num_row_groups, num_columns;
3917  int64_t nrow_in_file;
3918  std::tie(num_row_groups, num_columns, nrow_in_file) =
3919  open_parquet_table(file_path, infile, reader, table);
3920  // column_list has no $deleted
3921  const auto& column_list = get_column_descs();
3922  // for now geo columns expect a wkt or wkb hex string
3923  std::vector<const ColumnDescriptor*> cds;
3924  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3925  int num_physical_cols = 0;
3926  for (auto& cd : column_list) {
3927  cds.push_back(cd);
3928  num_physical_cols += cd->columnType.get_physical_cols();
3929  }
3930  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3931  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3932  std::to_string(num_columns) + " columns in file vs " +
3933  std::to_string(column_list.size() - num_physical_cols) +
3934  " columns in table.");
3935  // slice each group to import slower columns faster, eg. geo or string
3936  max_threads = num_import_threads(copy_params.threads);
3937  VLOG(1) << "Parquet import # threads: " << max_threads;
3938  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3939  // init row estimate for this file
3940  const auto filesize = get_filesize(file_path);
3941  size_t nrow_completed{0};
3942  file_offsets.push_back(0);
3943  // map logic column index to physical column index
3944  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3945  int physical_col_idx = 0;
3946  for (int i = 0; i < logic_col_idx; ++i) {
3947  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3948  }
3949  return physical_col_idx;
3950  };
3951  // load a file = nested iteration of row groups, row slices and logical columns
3952  auto query_session = session_info ? session_info->get_session_id() : "";
3953  auto ms_load_a_file = measure<>::execution([&]() {
3954  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3955  {
3958  break;
3959  }
3960  }
3961  // a sliced row group will be handled like a (logic) parquet file, with
3962  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3963  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3965  import_status_.load_failed = true;
3966  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3967  }
3968  import_buffers_vec.resize(num_slices);
3969  for (int slice = 0; slice < num_slices; slice++) {
3970  import_buffers_vec[slice].clear();
3971  for (const auto cd : cds) {
3972  import_buffers_vec[slice].emplace_back(
3973  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3974  }
3975  }
3976  /*
3977  * A caveat here is: Parquet files or arrow data is imported column wise.
3978  * Unlike importing row-wise csv files, a error on any row of any column
3979  * forces to give up entire row group of all columns, unless there is a
3980  * sophisticated method to trace erroneous rows in individual columns so
3981  * that we can union bad rows and drop them from corresponding
3982  * import_buffers_vec; otherwise, we may exceed maximum number of
3983  * truncated rows easily even with very sparse errors in the files.
3984  */
3985  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3986  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3987  auto& bad_rows_tracker = bad_rows_trackers[slice];
3988  bad_rows_tracker.file_name = file_path;
3989  bad_rows_tracker.row_group = slice;
3990  bad_rows_tracker.importer = this;
3991  }
3992  // process arrow arrays to import buffers
3993  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3994  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3995  const auto cd = cds[physical_col_idx];
3996  std::shared_ptr<arrow::ChunkedArray> array;
3997  PARQUET_THROW_NOT_OK(
3998  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3999  const size_t array_size = array->length();
4000  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
4001  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
4002  for (int slice = 0; slice < num_slices; ++slice) {
4003  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
4005  import_status_.load_failed = true;
4006  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
4007  }
4008  thread_controller.startThread([&, slice] {
4009  const auto slice_offset = slice % num_slices;
4010  ArraySliceRange slice_range(
4011  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
4012  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
4013  auto& bad_rows_tracker = bad_rows_trackers[slice];
4014  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4015  import_buffer->import_buffers = &import_buffers_vec[slice];
4016  import_buffer->col_idx = physical_col_idx + 1;
4017  for (auto chunk : array->chunks()) {
4018  import_buffer->add_arrow_values(
4019  cd, *chunk, false, slice_range, &bad_rows_tracker);
4020  }
4021  });
4022  }
4023  thread_controller.finish();
4024  }
4025  std::vector<size_t> nrow_in_slice_raw(num_slices);
4026  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
4027  // trim bad rows from import buffers
4028  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
4029  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
4030  const auto cd = cds[physical_col_idx];
4031  for (int slice = 0; slice < num_slices; ++slice) {
4032  auto& bad_rows_tracker = bad_rows_trackers[slice];
4033  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4034  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
4035  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
4036  }
4037  }
4038  // flush slices of this row group to chunks
4039  for (int slice = 0; slice < num_slices; ++slice) {
4040  load(import_buffers_vec[slice],
4041  nrow_in_slice_successfully_loaded[slice],
4042  session_info);
4043  }
4044  // update import stats
4045  const auto nrow_original =
4046  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
4047  const auto nrow_imported =
4048  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
4049  nrow_in_slice_successfully_loaded.end(),
4050  0);
4051  const auto nrow_dropped = nrow_original - nrow_imported;
4052  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
4053  << " rows, drop " << nrow_dropped << " rows.";
4054  {
4056  import_status_.rows_completed += nrow_imported;
4057  import_status_.rows_rejected += nrow_dropped;
4058  if (import_status_.rows_rejected > copy_params.max_reject) {
4059  import_status_.load_failed = true;
4060  import_status_.load_msg = "Maximum (" + std::to_string(copy_params.max_reject) +
4061  ") rows rejected exceeded. Halting load.";
4062  LOG(ERROR) << "Maximum (" << copy_params.max_reject
4063  << ") rows rejected exceeded. Halting load.";
4064  }
4065  }
4066  // row estimate
4067  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4068  nrow_completed += nrow_imported;
4069  file_offsets.back() =
4070  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
4071  // sum up current total file offsets
4072  const auto total_file_offset =
4073  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
4074  // estimate number of rows per current total file offset
4075  if (total_file_offset) {
4077  (float)total_file_size / total_file_offset * import_status_.rows_completed;
4078  VLOG(3) << "rows_completed " << import_status_.rows_completed
4079  << ", rows_estimated " << import_status_.rows_estimated
4080  << ", total_file_size " << total_file_size << ", total_file_offset "
4081  << total_file_offset;
4082  }
4083  }
4084  });
4085  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
4086  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
4087 }
4088 
4089 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4090  const Catalog_Namespace::SessionInfo* session_info) {
4091  auto importer = dynamic_cast<Importer*>(this);
4092  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4093  : std::vector<Catalog_Namespace::TableEpochInfo>{};
4094  try {
4095  std::exception_ptr teptr;
4096  // file_paths may contain one local file path, a list of local file paths
4097  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4098  for (auto const& file_path : file_paths) {
4099  std::map<int, std::string> url_parts;
4100  Archive::parse_url(file_path, url_parts);
4101 
4102  // for a s3 url we need to know the obj keys that it comprises
4103  std::vector<std::string> objkeys;
4104  std::unique_ptr<S3ParquetArchive> us3arch;
4105  if ("s3" == url_parts[2]) {
4106 #ifdef HAVE_AWS_S3
4107  us3arch.reset(new S3ParquetArchive(file_path,
4108  copy_params.s3_access_key,
4109  copy_params.s3_secret_key,
4110  copy_params.s3_session_token,
4111  copy_params.s3_region,
4112  copy_params.s3_endpoint,
4113  copy_params.plain_text,
4114  copy_params.regex_path_filter,
4115  copy_params.file_sort_order_by,
4116  copy_params.file_sort_regex));
4117  us3arch->init_for_read();
4118  total_file_size += us3arch->get_total_file_size();
4119  objkeys = us3arch->get_objkeys();
4120 #else
4121  throw std::runtime_error("AWS S3 support not available");
4122 #endif // HAVE_AWS_S3
4123  } else {
4124  objkeys.emplace_back(file_path);
4125  }
4126 
4127  // for each obj key of a s3 url we need to land it before
4128  // importing it like doing with a 'local file'.
4129  for (auto const& objkey : objkeys) {
4130  try {
4131  auto file_path =
4132  us3arch
4133  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4134  : objkey;
4135  import_local_parquet(file_path, session_info);
4136  if (us3arch) {
4137  us3arch->vacuum(objkey);
4138  }
4139  } catch (...) {
4140  if (us3arch) {
4141  us3arch->vacuum(objkey);
4142  }
4143  throw;
4144  }
4147  break;
4148  }
4149  }
4150  }
4151  // rethrow any exception happened herebefore
4152  if (teptr) {
4153  std::rethrow_exception(teptr);
4154  }
4155  } catch (const shared::NoRegexFilterMatchException& e) {
4157  import_status_.load_failed = true;
4158  import_status_.load_msg = e.what();
4159  throw e;
4160  } catch (const std::exception& e) {
4162  import_status_.load_failed = true;
4163  import_status_.load_msg = e.what();
4164  }
4165 
4166  if (importer) {
4167  importer->checkpoint(table_epochs);
4168  }
4169 }
4170 #endif // ENABLE_IMPORT_PARQUET
4171 
4173  std::vector<std::string>& file_paths,
4174  const Catalog_Namespace::SessionInfo* session_info) {
4175  // a new requirement is to have one single input stream into
4176  // Importer::importDelimited, so need to move pipe related
4177  // stuff to the outmost block.
4178  int fd[2];
4179 #ifdef _WIN32
4180  // For some reason when folly is used to create the pipe, reader can
4181  // read nothing.
4182  auto pipe_res =
4183  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4184 #else
4185  auto pipe_res = pipe(fd);
4186 #endif
4187  if (pipe_res < 0) {
4188  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4189  }
4190 #ifndef _WIN32
4191  signal(SIGPIPE, SIG_IGN);
4192 #endif
4193 
4194  std::exception_ptr teptr;
4195  // create a thread to read uncompressed byte stream out of pipe and
4196  // feed into importDelimited()
4197  ImportStatus ret1;
4198  auto th_pipe_reader = std::thread([&]() {
4199  try {
4200  // importDelimited will read from FILE* p_file
4201  if (0 == (p_file = fdopen(fd[0], "r"))) {
4202  throw std::runtime_error(std::string("failed to open a pipe: ") +
4203  strerror(errno));
4204  }
4205 
4206  // in future, depending on data types of this uncompressed stream
4207  // it can be feed into other function such like importParquet, etc
4208  ret1 = importDelimited(file_path, true, session_info);
4209 
4210  } catch (...) {
4211  if (!teptr) { // no replace
4212  teptr = std::current_exception();
4213  }
4214  }
4215 
4216  if (p_file) {
4217  fclose(p_file);
4218  }
4219  p_file = 0;
4220  });
4221 
4222  // create a thread to iterate all files (in all archives) and
4223  // forward the uncompressed byte stream to fd[1] which is
4224  // then feed into importDelimited, importParquet, and etc.
4225  auto th_pipe_writer = std::thread([&]() {
4226  std::unique_ptr<S3Archive> us3arch;
4227  bool stop = false;
4228  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4229  try {
4230  auto file_path = file_paths[fi];
4231  std::unique_ptr<Archive> uarch;
4232  std::map<int, std::string> url_parts;
4233  Archive::parse_url(file_path, url_parts);
4234  const std::string S3_objkey_url_scheme = "s3ok";
4235  if ("file" == url_parts[2] || "" == url_parts[2]) {
4236  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4237  } else if ("s3" == url_parts[2]) {
4238 #ifdef HAVE_AWS_S3
4239  // new a S3Archive with a shared s3client.
4240  // should be safe b/c no wildcard with s3 url
4241  us3arch.reset(new S3Archive(file_path,
4242  copy_params.s3_access_key,
4243  copy_params.s3_secret_key,
4244  copy_params.s3_session_token,
4245  copy_params.s3_region,
4246  copy_params.s3_endpoint,
4247  copy_params.plain_text,
4248  copy_params.regex_path_filter,
4249  copy_params.file_sort_order_by,
4250  copy_params.file_sort_regex));
4251  us3arch->init_for_read();
4252  total_file_size += us3arch->get_total_file_size();
4253  // not land all files here but one by one in following iterations
4254  for (const auto& objkey : us3arch->get_objkeys()) {
4255  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4256  }
4257  continue;
4258 #else
4259  throw std::runtime_error("AWS S3 support not available");
4260 #endif // HAVE_AWS_S3
4261  } else if (S3_objkey_url_scheme == url_parts[2]) {
4262 #ifdef HAVE_AWS_S3
4263  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4264  auto file_path =
4265  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4266  if (0 == file_path.size()) {
4267  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4268  }
4269  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4270  // file not removed until file closed
4271  us3arch->vacuum(objkey);
4272 #else
4273  throw std::runtime_error("AWS S3 support not available");
4274 #endif // HAVE_AWS_S3
4275  }
4276 #if 0 // TODO(ppan): implement and enable any other archive class
4277  else
4278  if ("hdfs" == url_parts[2])
4279  uarch.reset(new HdfsArchive(file_path));
4280 #endif
4281  else {
4282  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4283  }
4284 
4285  // init the archive for read
4286  auto& arch = *uarch;
4287 
4288  // coming here, the archive of url should be ready to be read, unarchived
4289  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4290  const void* buf;
4291  size_t size;
4292  bool just_saw_archive_header;
4293  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4294  bool first_text_header_skipped = false;
4295  // start reading uncompressed bytes of this archive from libarchive
4296  // note! this archive may contain more than one files!
4297  file_offsets.push_back(0);
4298  size_t num_block_read = 0;
4299  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4300  bool insert_line_delim_after_this_file = false;
4301  while (!stop) {
4302  int64_t offset{-1};
4303  auto ok = arch.read_data_block(&buf, &size, &offset);
4304  // can't use (uncompressed) size, so track (max) file offset.
4305  // also we want to capture offset even on e.o.f.
4306  if (offset > 0) {
4307  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4308  file_offsets.back() = offset;
4309  }
4310  if (!ok) {
4311  break;
4312  }
4313  // one subtle point here is now we concatenate all files
4314  // to a single FILE stream with which we call importDelimited
4315  // only once. this would make it misunderstand that only one
4316  // header line is with this 'single' stream, while actually
4317  // we may have one header line for each of the files.
4318  // so we need to skip header lines here instead in importDelimited.
4319  const char* buf2 = (const char*)buf;
4320  int size2 = size;
4321  if (copy_params.has_header != import_export::ImportHeaderRow::kNoHeader &&
4322  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4323  while (size2-- > 0) {
4324  if (*buf2++ == copy_params.line_delim) {
4325  break;
4326  }
4327  }
4328  if (size2 <= 0) {
4329  LOG(WARNING) << "No line delimiter in block." << std::endl;
4330  } else {
4331  just_saw_archive_header = false;
4332  first_text_header_skipped = true;
4333  }
4334  }
4335  // In very rare occasions the write pipe somehow operates in a mode similar
4336  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4337  // means blocking mode. On such a unreliable blocking mode, a possible fix
4338  // is to loop reading till no bytes left, otherwise the annoying `failed to
4339  // write pipe: Success`...
4340  if (size2 > 0) {
4341  int nremaining = size2;
4342  while (nremaining > 0) {
4343  // try to write the entire remainder of the buffer to the pipe
4344  int nwritten = write(fd[1], buf2, nremaining);
4345  // how did we do?
4346  if (nwritten < 0) {
4347  // something bad happened
4348  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4349  // ignore these, assume nothing written, try again
4350  nwritten = 0;
4351  } else {
4352  // a real error
4353  throw std::runtime_error(
4354  std::string("failed or interrupted write to pipe: ") +
4355  strerror(errno));
4356  }
4357  } else if (nwritten == nremaining) {
4358  // we wrote everything; we're done
4359  break;
4360  }
4361  // only wrote some (or nothing), try again
4362  nremaining -= nwritten;
4363  buf2 += nwritten;
4364  // no exception when too many rejected
4367  stop = true;
4368  break;
4369  }
4370  }
4371  // check that this file (buf for size) ended with a line delim
4372  if (size > 0) {
4373  const char* plast = static_cast<const char*>(buf) + (size - 1);
4374  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4375  }
4376  }
4377  ++num_block_read;
4378  }
4379 
4380  // if that file didn't end with a line delim, we insert one here to terminate
4381  // that file's stream use a loop for the same reason as above
4382  if (insert_line_delim_after_this_file) {
4383  while (true) {
4384  // write the delim char to the pipe
4385  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4386  // how did we do?
4387  if (nwritten < 0) {
4388  // something bad happened
4389  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4390  // ignore these, assume nothing written, try again
4391  nwritten = 0;
4392  } else {
4393  // a real error
4394  throw std::runtime_error(
4395  std::string("failed or interrupted write to pipe: ") +
4396  strerror(errno));
4397  }
4398  } else if (nwritten == 1) {
4399  // we wrote it; we're done
4400  break;
4401  }
4402  }
4403  }
4404  }
4405  } catch (...) {
4406  // when import is aborted because too many data errors or because end of a
4407  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4408  // suppressed.
4411  break;
4412  }
4413  if (import_status_.rows_completed > 0) {
4414  if (nullptr != dynamic_cast<Detector*>(this)) {
4415  break;
4416  }
4417  }
4418  if (!teptr) { // no replace
4419  teptr = std::current_exception();
4420  }
4421  break;
4422  }
4423  }
4424  // close writer end
4425  close(fd[1]);
4426  });
4427 
4428  th_pipe_reader.join();
4429  th_pipe_writer.join();
4430 
4431  // rethrow any exception happened herebefore
4432  if (teptr) {
4433  std::rethrow_exception(teptr);
4434  }
4435 }
4436 
4438  return DataStreamSink::archivePlumber(session_info);
4439 }
4440 
4442  const std::string& file_path,
4443  const bool decompressed,
4444  const Catalog_Namespace::SessionInfo* session_info) {
4446  auto query_session = session_info ? session_info->get_session_id() : "";
4447 
4448  if (!p_file) {
4449  p_file = fopen(file_path.c_str(), "rb");
4450  }
4451  if (!p_file) {
4452  throw std::runtime_error("failed to open file '" + file_path +
4453  "': " + strerror(errno));
4454  }
4455 
4456  if (!decompressed) {
4457  (void)fseek(p_file, 0, SEEK_END);
4458  file_size = ftell(p_file);
4459  }
4460 
4461  max_threads = num_import_threads(copy_params.threads);
4462  VLOG(1) << "Delimited import # threads: " << max_threads;
4463 
4464  // deal with small files
4465  size_t alloc_size = copy_params.buffer_size;
4466  if (!decompressed && file_size < alloc_size) {
4467  alloc_size = file_size;
4468  }
4469 
4470  for (size_t i = 0; i < max_threads; i++) {
4471  import_buffers_vec.emplace_back();
4472  for (const auto cd : loader->get_column_descs()) {
4473  import_buffers_vec[i].emplace_back(
4474  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4475  }
4476  }
4477 
4478  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4479  size_t current_pos = 0;
4480  size_t end_pos;
4481  size_t begin_pos = 0;
4482 
4483  (void)fseek(p_file, current_pos, SEEK_SET);
4484  size_t size =
4485  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4486 
4487  // make render group analyzers for each poly column
4488  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4489  if (g_enable_assign_render_groups && copy_params.geo_assign_render_groups) {
4490  auto& cat = loader->getCatalog();
4491  auto* td = loader->getTableDesc();
4492  CHECK(td);
4493  auto column_descriptors =
4494  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4495  for (auto const& cd : column_descriptors) {
4496  if (IS_GEO_POLY(cd->columnType.get_type())) {
4497  auto rga = std::make_shared<RenderGroupAnalyzer>();
4498  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4499  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4500  }
4501  }
4502  }
4503 
4504  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4505  loader->getTableDesc()->tableId};
4506  auto table_epochs = loader->getTableEpochs();
4508  {
4509  std::list<std::future<ImportStatus>> threads;
4510 
4511  // use a stack to track thread_ids which must not overlap among threads
4512  // because thread_id is used to index import_buffers_vec[]
4513  std::stack<size_t> stack_thread_ids;
4514  for (size_t i = 0; i < max_threads; i++) {
4515  stack_thread_ids.push(i);
4516  }
4517  // added for true row index on error
4518  size_t first_row_index_this_buffer = 0;
4519 
4520  while (size > 0) {
4521  unsigned int num_rows_this_buffer = 0;
4522  CHECK(scratch_buffer);
4523  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4524  scratch_buffer,
4525  size,
4526  copy_params,
4527  first_row_index_this_buffer,
4528  num_rows_this_buffer,
4529  p_file);
4530 
4531  // unput residual
4532  int nresidual = size - end_pos;
4533  std::unique_ptr<char[]> unbuf;
4534  if (nresidual > 0) {
4535  unbuf = std::make_unique<char[]>(nresidual);
4536  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4537  }
4538 
4539  // get a thread_id not in use
4540  auto thread_id = stack_thread_ids.top();
4541  stack_thread_ids.pop();
4542  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4543 
4544  threads.push_back(std::async(std::launch::async,
4546  thread_id,
4547  this,
4548  std::move(scratch_buffer),
4549  begin_pos,
4550  end_pos,
4551  end_pos,
4552  columnIdToRenderGroupAnalyzerMap,
4553  first_row_index_this_buffer,
4554  session_info,
4555  executor));
4556 
4557  first_row_index_this_buffer += num_rows_this_buffer;
4558 
4559  current_pos += end_pos;
4560  scratch_buffer = std::make_unique<char[]>(alloc_size);
4561  CHECK(scratch_buffer);
4562  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4563  size = nresidual +
4564  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4565 
4566  begin_pos = 0;
4567  while (threads.size() > 0) {
4568  int nready = 0;
4569  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4570  it != threads.end();) {
4571  auto& p = *it;
4572  std::chrono::milliseconds span(0);
4573  if (p.wait_for(span) == std::future_status::ready) {
4574  auto ret_import_status = p.get();
4575  {
4577  import_status_ += ret_import_status;
4578  if (ret_import_status.load_failed) {
4580  }
4581  }
4582  // sum up current total file offsets
4583  size_t total_file_offset{0};
4584  if (decompressed) {
4585  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4586  for (const auto file_offset : file_offsets) {
4587  total_file_offset += file_offset;
4588  }
4589  }
4590  // estimate number of rows per current total file offset
4591  if (decompressed ? total_file_offset : current_pos) {
4593  (decompressed ? (float)total_file_size / total_file_offset
4594  : (float)file_size / current_pos) *
4596  }
4597  VLOG(3) << "rows_completed " << import_status_.rows_completed
4598  << ", rows_estimated " << import_status_.rows_estimated
4599  << ", total_file_size " << total_file_size << ", total_file_offset "
4600  << total_file_offset;
4602  // recall thread_id for reuse
4603  stack_thread_ids.push(ret_import_status.thread_id);
4604  threads.erase(it++);
4605  ++nready;
4606  } else {
4607  ++it;
4608  }
4609  }
4610 
4611  if (nready == 0) {
4612  std::this_thread::yield();
4613  }
4614 
4615  // on eof, wait all threads to finish
4616  if (0 == size) {
4617  continue;
4618  }
4619 
4620  // keep reading if any free thread slot
4621  // this is one of the major difference from old threading model !!
4622  if (threads.size() < max_threads) {
4623  break;
4624  }
4627  break;
4628  }
4629  }
4631  if (import_status_.rows_rejected > copy_params.max_reject) {
4632  import_status_.load_failed = true;
4633  // todo use better message
4634  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4635  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4636  break;
4637  }
4639  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4640  break;
4641  }
4642  }
4643 
4644  // join dangling threads in case of LOG(ERROR) above
4645  for (auto& p : threads) {
4646  p.wait();
4647  }
4648  }
4649 
4650  checkpoint(table_epochs);
4651 
4652  fclose(p_file);
4653  p_file = nullptr;
4654  return import_status_;
4655 }
4656 
4658  if (getTableDesc()->persistenceLevel ==
4659  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4660  // tables
4662  }
4663 }
4664 
4665 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4666  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4667  getTableDesc()->tableId);
4668 }
4669 
4671  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4672  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4673 }
4674 
4675 /* static */
4677  const std::string& file_name,
4678  const CopyParams& copy_params) {
4681  copy_params.s3_endpoint,
4682  copy_params.s3_access_key,
4683  copy_params.s3_secret_key,
4684  copy_params.s3_session_token);
4685  if (copy_params.source_type != import_export::SourceType::kGeoFile) {
4686  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4687  std::to_string(static_cast<int>(copy_params.source_type)) +
4688  ")");
4689  }
4691 }
4692 
4693 namespace {
4694 
4695 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4697  const std::string& file_name) {
4698  // get layer with specified name, or default to first layer
4699  OGRLayer* poLayer = nullptr;
4700  if (geo_layer_name.size()) {
4701  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4702  if (poLayer == nullptr) {
4703  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4704  file_name);
4705  }
4706  } else {
4707  poLayer = poDS->GetLayer(0);
4708  if (poLayer == nullptr) {
4709  throw std::runtime_error("No layers found in " + file_name);
4710  }
4711  }
4712  return *poLayer;
4713 }
4714 
4715 } // namespace
4716 
4717 /* static */
4719  const std::string& file_name,
4720  const std::string& geo_column_name,
4721  std::map<std::string, std::vector<std::string>>& sample_data,
4722  int row_limit,
4723  const CopyParams& copy_params) {
4725  openGDALDataSource(file_name, copy_params));
4726  if (datasource == nullptr) {
4727  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4728  file_name);
4729  }
4730 
4731  OGRLayer& layer =
4732  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4733 
4734  auto const* feature_defn = layer.GetLayerDefn();
4735  CHECK(feature_defn);
4736 
4737  // metadata columns?
4738  auto const metadata_column_infos =
4739  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4740 
4741  // get limited feature count
4742  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4743  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4744  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4745 
4746  // prepare sample data map
4747  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4748  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4749  CHECK(column_name);
4750  sample_data[column_name] = {};
4751  }
4752  sample_data[geo_column_name] = {};
4753  for (auto const& mci : metadata_column_infos) {
4754  sample_data[mci.column_descriptor.columnName] = {};
4755  }
4756 
4757  // prepare to read
4758  layer.ResetReading();
4759 
4760  // read features (up to limited count)
4761  uint64_t feature_index{0u};
4762  while (feature_index < num_features) {
4763  // get (and take ownership of) feature
4764  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4765  if (!feature) {
4766  break;
4767  }
4768 
4769  // get feature geometry
4770  auto const* geometry = feature->GetGeometryRef();
4771  if (geometry == nullptr) {
4772  break;
4773  }
4774 
4775  // validate geom type (again?)
4776  switch (wkbFlatten(geometry->getGeometryType())) {
4777  case wkbPoint:
4778  case wkbLineString:
4779  case wkbPolygon:
4780  case wkbMultiPolygon:
4781  break;
4782  case wkbMultiPoint:
4783  case wkbMultiLineString:
4784  // supported if geo_explode_collections is specified
4785  if (!copy_params.geo_explode_collections) {
4786  throw std::runtime_error("Unsupported geometry type: " +
4787  std::string(geometry->getGeometryName()));
4788  }
4789  break;
4790  default:
4791  throw std::runtime_error("Unsupported geometry type: " +
4792  std::string(geometry->getGeometryName()));
4793  }
4794 
4795  // populate sample data for regular field columns
4796  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4797  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4798  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4799  }
4800 
4801  // populate sample data for metadata columns?
4802  for (auto const& mci : metadata_column_infos) {
4803  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4804  }
4805 
4806  // populate sample data for geo column with WKT string
4807  char* wkts = nullptr;
4808  geometry->exportToWkt(&wkts);
4809  CHECK(wkts);
4810  sample_data[geo_column_name].push_back(wkts);
4811  CPLFree(wkts);
4812 
4813  // next feature
4814  feature_index++;
4815  }
4816 }
4817 
4818 namespace {
4819 
4820 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4821  switch (ogr_type) {
4822  case OFTInteger:
4823  return std::make_pair(kINT, false);
4824  case OFTIntegerList:
4825  return std::make_pair(kINT, true);
4826 #if GDAL_VERSION_MAJOR > 1
4827  case OFTInteger64:
4828  return std::make_pair(kBIGINT, false);
4829  case OFTInteger64List:
4830  return std::make_pair(kBIGINT, true);
4831 #endif
4832  case OFTReal:
4833  return std::make_pair(kDOUBLE, false);
4834  case OFTRealList:
4835  return std::make_pair(kDOUBLE, true);
4836  case OFTString:
4837  return std::make_pair(kTEXT, false);
4838  case OFTStringList:
4839  return std::make_pair(kTEXT, true);
4840  case OFTDate:
4841  return std::make_pair(kDATE, false);
4842  case OFTTime:
4843  return std::make_pair(kTIME, false);
4844  case OFTDateTime:
4845  return std::make_pair(kTIMESTAMP, false);
4846  case OFTBinary:
4847  // Interpret binary blobs as byte arrays here
4848  // but actual import will store NULL as GDAL will not
4849  // extract the blob (OGRFeature::GetFieldAsString will
4850  // result in the import buffers having an empty string)
4851  return std::make_pair(kTINYINT, true);
4852  default:
4853  break;
4854  }
4855  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4856 }
4857 
4858 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4859  switch (ogr_type) {
4860  case wkbPoint:
4861  return kPOINT;
4862  case wkbLineString:
4863  return kLINESTRING;
4864  case wkbPolygon:
4865  return kPOLYGON;
4866  case wkbMultiPolygon:
4867  return kMULTIPOLYGON;
4868  default:
4869  break;
4870  }
4871  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4872 }
4873 
4875  const import_export::RasterPointType raster_point_type) {
4876  switch (raster_point_type) {
4891  }
4892  UNREACHABLE();
4894 }
4895 
4897  const import_export::RasterPointTransform raster_point_transform) {
4898  switch (raster_point_transform) {
4907  }
4908  UNREACHABLE();
4910 }
4911 
4912 } // namespace
4913 
4914 /* static */
4915 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4916  const std::string& file_name,
4917  const bool is_raster,
4918  const std::string& geo_column_name,
4919  const CopyParams& copy_params) {
4920  if (is_raster) {
4921  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4922  }
4923  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4924 }
4925 
4926 /* static */
4927 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsRaster(
4928  const std::string& file_name,
4929  const std::string& geo_column_name,
4930  const CopyParams& copy_params) {
4931  // lazy init GDAL
4934  copy_params.s3_endpoint,
4935  copy_params.s3_access_key,
4936  copy_params.s3_secret_key,
4937  copy_params.s3_session_token);
4938 
4939  // prepare for metadata column
4940  auto metadata_column_infos =
4941  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4942 
4943  // create a raster importer and do the detect
4944  RasterImporter raster_importer;
4945  raster_importer.detect(
4946  file_name,
4947  copy_params.raster_import_bands,
4948  copy_params.raster_import_dimensions,
4951  copy_params.raster_point_compute_angle,
4952  false,
4953  metadata_column_infos);
4954 
4955  // prepare to capture column descriptors
4956  std::list<ColumnDescriptor> cds;
4957 
4958  // get the point column info
4959  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4960 
4961  // create the columns for the point in the specified type
4962  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4963  ColumnDescriptor cd;
4964  cd.columnName = cd.sourceName = col_name;
4965  cd.columnType.set_type(sql_type);
4966  // hardwire other POINT attributes for now
4967  if (sql_type == kPOINT) {
4969  cd.columnType.set_input_srid(4326);
4970  cd.columnType.set_output_srid(4326);
4972  cd.columnType.set_comp_param(32);
4973  }
4974  cds.push_back(cd);
4975  }
4976 
4977  // get the names and types for the band column(s)
4978  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4979 
4980  // add column descriptors for each band
4981  for (auto const& [band_name, sql_type] : band_names_and_types) {
4982  ColumnDescriptor cd;
4983  cd.columnName = cd.sourceName = band_name;
4984  cd.columnType.set_type(sql_type);
4986  cds.push_back(cd);
4987  }
4988 
4989  // metadata columns?
4990  for (auto& mci : metadata_column_infos) {
4991  cds.push_back(std::move(mci.column_descriptor));
4992  }
4993 
4994  // return the results
4995  return cds;
4996 }
4997 
4998 /* static */
4999 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsGeo(
5000  const std::string& file_name,
5001  const std::string& geo_column_name,
5002  const CopyParams& copy_params) {
5003  std::list<ColumnDescriptor> cds;
5004 
5005  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5006  if (poDS == nullptr) {
5007  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5008  file_name);
5009  }
5010  if (poDS->GetLayerCount() == 0) {
5011  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5012  " has no layers");
5013  }
5014 
5015  OGRLayer& layer =
5016  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
5017 
5018  layer.ResetReading();
5019  // TODO(andrewseidl): support multiple features
5020  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5021  if (poFeature == nullptr) {
5022  throw std::runtime_error("No features found in " + file_name);
5023  }
5024  // get fields as regular columns
5025  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5026  CHECK(poFDefn);
5027  int iField;
5028  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5029  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5030  auto typePair = ogr_to_type(poFieldDefn->GetType());
5031  ColumnDescriptor cd;
5032  cd.columnName = poFieldDefn->GetNameRef();
5033  cd.sourceName = poFieldDefn->GetNameRef();
5034  SQLTypeInfo ti;
5035  if (typePair.second) {
5036  ti.set_type(kARRAY);
5037  ti.set_subtype(typePair.first);
5038  } else {
5039  ti.set_type(typePair.first);
5040  }
5041  if (typePair.first == kTEXT) {
5043  ti.set_comp_param(32);
5044  }
5045  ti.set_fixed_size();
5046  cd.columnType = ti;
5047  cds.push_back(cd);
5048  }
5049  // get geo column, if any
5050  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
5051  if (poGeometry) {
5052  ColumnDescriptor cd;
5053  cd.columnName = geo_column_name;
5054  cd.sourceName = geo_column_name;
5055 
5056  // get GDAL type
5057  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
5058 
5059  // if exploding, override any collection type to child type
5060  if (copy_params.geo_explode_collections) {
5061  if (ogr_type == wkbMultiPolygon) {
5062  ogr_type = wkbPolygon;
5063  } else if (ogr_type == wkbMultiLineString) {
5064  ogr_type = wkbLineString;
5065  } else if (ogr_type == wkbMultiPoint) {
5066  ogr_type = wkbPoint;
5067  }
5068  }
5069 
5070  // convert to internal type
5071  SQLTypes geoType = ogr_to_type(ogr_type);
5072 
5073  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5075  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5076  }
5077 
5078  // build full internal type
5079  SQLTypeInfo ti;
5080  ti.set_type(geoType);
5081  ti.set_subtype(copy_params.geo_coords_type);
5082  ti.set_input_srid(copy_params.geo_coords_srid);
5083  ti.set_output_srid(copy_params.geo_coords_srid);
5084  ti.set_compression(copy_params.geo_coords_encoding);
5085  ti.set_comp_param(copy_params.geo_coords_comp_param);
5086  cd.columnType = ti;
5087 
5088  cds.push_back(cd);
5089  }
5090 
5091  // metadata columns?
5092  auto metadata_column_infos =
5093  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
5094  for (auto& mci : metadata_column_infos) {
5095  cds.push_back(std::move(mci.column_descriptor));
5096  }
5097 
5098  return cds;
5099 }
5100 
5101 bool Importer::gdalStatInternal(const std::string& path,
5102  const CopyParams& copy_params,
5103  bool also_dir) {
5104  // lazy init GDAL
5107  copy_params.s3_endpoint,
5108  copy_params.s3_access_key,
5109  copy_params.s3_secret_key,
5110  copy_params.s3_session_token);
5111 
5112 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5113  // clear GDAL stat cache
5114  // without this, file existence will be cached, even if authentication changes
5115  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5116  VSICurlClearCache();
5117 #endif
5118 
5119  // stat path
5120  VSIStatBufL sb;
5121  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5122  if (result < 0) {
5123  return false;
5124  }
5125 
5126  // exists?
5127  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5128  return true;
5129  } else if (VSI_ISREG(sb.st_mode)) {
5130  return true;
5131  }
5132  return false;
5133 }
5134 
5135 /* static */
5136 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
5137  return gdalStatInternal(path, copy_params, false);
5138 }
5139 
5140 /* static */
5141 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
5142  const CopyParams& copy_params) {
5143  return gdalStatInternal(path, copy_params, true);
5144 }
5145 
5146 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
5147  std::vector<std::string>& files) {
5148  // prepare to gather subdirectories
5149  std::vector<std::string> subdirectories;
5150 
5151  // get entries
5152  char** entries = VSIReadDir(archive_path.c_str());
5153  if (!entries) {
5154  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
5155  return;
5156  }
5157 
5158  // force scope
5159  {
5160  // request clean-up
5161  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5162 
5163  // check all the entries
5164  int index = 0;
5165  while (true) {
5166  // get next entry, or drop out if there isn't one
5167  char* entry_c = entries[index++];
5168  if (!entry_c) {
5169  break;
5170  }
5171  std::string entry(entry_c);
5172 
5173  // ignore '.' and '..'
5174  if (entry == "." || entry == "..") {
5175  continue;
5176  }
5177 
5178  // build the full path
5179  std::string entry_path = archive_path + std::string("/") + entry;
5180 
5181  // is it a file or a sub-folder
5182  VSIStatBufL sb;
5183  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5184  if (result < 0) {
5185  break;
5186  }
5187 
5188  if (VSI_ISDIR(sb.st_mode)) {
5189  // a directory that ends with .gdb could be a Geodatabase bundle
5190  // arguably dangerous to decide this purely by name, but any further
5191  // validation would be very complex especially at this scope
5192  if (boost::iends_with(entry_path, ".gdb")) {
5193  // add the directory as if it was a file and don't recurse into it
5194  files.push_back(entry_path);
5195  } else {
5196  // add subdirectory to be recursed into
5197  subdirectories.push_back(entry_path);
5198  }
5199  } else {
5200  // add this file
5201  files.push_back(entry_path);
5202  }
5203  }
5204  }
5205 
5206  // recurse into each subdirectories we found
5207  for (const auto& subdirectory : subdirectories) {
5208  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5209  }
5210 }
5211 
5212 /* static */
5213 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5214  const std::string& archive_path,
5215  const CopyParams& copy_params) {
5216  // lazy init GDAL
5219  copy_params.s3_endpoint,
5220  copy_params.s3_access_key,
5221  copy_params.s3_secret_key,
5222  copy_params.s3_session_token);
5223 
5224  // prepare to gather files
5225  std::vector<std::string> files;
5226 
5227  // gather the files recursively
5228  gdalGatherFilesInArchiveRecursive(archive_path, files);
5229 
5230  // convert to relative paths inside archive
5231  for (auto& file : files) {
5232  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5233  }
5234 
5235  // done
5236  return files;
5237 }
5238 
5239 /* static */
5240 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5241  const std::string& file_name,
5242  const CopyParams& copy_params) {
5243  // lazy init GDAL
5246  copy_params.s3_endpoint,
5247  copy_params.s3_access_key,
5248  copy_params.s3_secret_key,
5249  copy_params.s3_session_token);
5250 
5251  // prepare to gather layer info
5252  std::vector<GeoFileLayerInfo> layer_info;
5253 
5254  // open the data set
5255  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5256  if (poDS == nullptr) {
5257  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5258  file_name);
5259  }
5260 
5261  // enumerate the layers
5262  for (auto&& poLayer : poDS->GetLayers()) {
5264  // prepare to read this layer
5265  poLayer->ResetReading();
5266  // skip layer if empty
5267  if (poLayer->GetFeatureCount() > 0) {
5268  // get first feature
5269  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5270  CHECK(first_feature);
5271  // check feature for geometry
5272  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5273  if (!geometry) {
5274  // layer has no geometry
5275  contents = GeoFileLayerContents::NON_GEO;
5276  } else {
5277  // check the geometry type
5278  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5279  switch (wkbFlatten(geometry_type)) {
5280  case wkbPoint:
5281  case wkbLineString:
5282  case wkbPolygon:
5283  case wkbMultiPolygon:
5284  // layer has supported geo
5285  contents = GeoFileLayerContents::GEO;
5286  break;
5287  case wkbMultiPoint:
5288  case wkbMultiLineString:
5289  // supported if geo_explode_collections is specified
5290  contents = copy_params.geo_explode_collections
5293  break;
5294  default:
5295  // layer has unsupported geometry
5297  break;
5298  }
5299  }
5300  }
5301  // store info for this layer
5302  layer_info.emplace_back(poLayer->GetName(), contents);
5303  }
5304 
5305  // done
5306  return layer_info;
5307 }
5308 
5310  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5311  const Catalog_Namespace::SessionInfo* session_info,
5312  const bool is_raster) {
5313  if (is_raster) {
5314  return importGDALRaster(session_info);
5315  }
5316  return importGDALGeo(columnNameToSourceNameMap, session_info);
5317 }
5318 
5320  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5321  const Catalog_Namespace::SessionInfo* session_info) {
5322  // initial status
5324  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_path, copy_params));
5325  if (poDS == nullptr) {
5326  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5327  file_path);
5328  }
5329 
5330  OGRLayer& layer =
5331  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_path);
5332 
5333  // get the number of features in this layer
5334  size_t numFeatures = layer.GetFeatureCount();
5335 
5336  // build map of metadata field (additional columns) name to index
5337  // use shared_ptr since we need to pass it to the worker
5338  FieldNameToIndexMapType fieldNameToIndexMap;
5339  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5340  CHECK(poFDefn);
5341  size_t numFields = poFDefn->GetFieldCount();
5342  for (size_t iField = 0; iField < numFields; iField++) {
5343  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5344  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5345  }
5346 
5347  // the geographic spatial reference we want to put everything in
5348  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5349  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5350 
5351 #if GDAL_VERSION_MAJOR >= 3
5352  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5353  // this results in X and Y being transposed for angle-based
5354  // coordinate systems. This restores the previous behavior.
5355  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5356 #endif
5357 
5358 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5359  // just one "thread"
5360  max_threads = 1;
5361 #else
5362  // how many threads to use
5363  max_threads = num_import_threads(copy_params.threads);
5364 #endif
5365  VLOG(1) << "GDAL import # threads: " << max_threads;
5366 
5367  // metadata columns?
5368  auto const metadata_column_infos =
5369  parse_add_metadata_columns(copy_params.add_metadata_columns, file_path);
5370 
5371  // import geo table is specifically handled in both DBHandler and QueryRunner
5372  // that is separate path against a normal SQL execution
5373  // so we here explicitly enroll the import session to allow interruption
5374  // while importing geo table
5375  auto query_session = session_info ? session_info->get_session_id() : "";
5376  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5378  auto is_session_already_registered = false;
5379  {
5381  executor->getSessionLock());
5382  is_session_already_registered =
5383  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5384  }
5385  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5386  !is_session_already_registered) {
5387  executor->enrollQuerySession(query_session,
5388  "IMPORT_GEO_TABLE",
5389  query_submitted_time,
5391  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5392  }
5393  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5394  // reset the runtime query interrupt status
5395  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5396  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5397  }
5398  };
5399 
5400  // make an import buffer for each thread
5401  CHECK_EQ(import_buffers_vec.size(), 0u);
5402  import_buffers_vec.resize(max_threads);
5403  for (size_t i = 0; i < max_threads; i++) {
5404  for (const auto cd : loader->get_column_descs()) {
5405  import_buffers_vec[i].emplace_back(
5406  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5407  }
5408  }
5409 
5410  // make render group analyzers for each poly column
5411  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5412  if (g_enable_assign_render_groups && copy_params.geo_assign_render_groups) {
5413  auto& cat = loader->getCatalog();
5414  auto* td = loader->getTableDesc();
5415  CHECK(td);
5416  auto column_descriptors =
5417  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5418  for (auto const& cd : column_descriptors) {
5419  if (IS_GEO_POLY(cd->columnType.get_type())) {
5420  auto rga = std::make_shared<RenderGroupAnalyzer>();
5421  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5422  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5423  }
5424  }
5425  }
5426 
5427 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5428  // threads
5429  std::list<std::future<ImportStatus>> threads;
5430 
5431  // use a stack to track thread_ids which must not overlap among threads
5432  // because thread_id is used to index import_buffers_vec[]
5433  std::stack<size_t> stack_thread_ids;
5434  for (size_t i = 0; i < max_threads; i++) {
5435  stack_thread_ids.push(i);
5436  }
5437 #endif
5438 
5439  // checkpoint the table
5440  auto table_epochs = loader->getTableEpochs();
5441 
5442  // reset the layer
5443  layer.ResetReading();
5444 
5445  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5446 
5447  // make a features buffer for each thread
5448  std::vector<FeaturePtrVector> features(max_threads);
5449 
5450  // make one of these for each thread, based on the first feature's SR
5451  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5452  max_threads);
5453 
5454  // for each feature...
5455  size_t firstFeatureThisChunk = 0;
5456  while (firstFeatureThisChunk < numFeatures) {
5457  // how many features this chunk
5458  size_t numFeaturesThisChunk =
5459  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5460 
5461 // get a thread_id not in use
5462 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5463  size_t thread_id = 0;
5464 #else
5465  auto thread_id = stack_thread_ids.top();
5466  stack_thread_ids.pop();
5467  CHECK(thread_id < max_threads);
5468 #endif
5469 
5470  // fill features buffer for new thread
5471  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5472  features[thread_id].emplace_back(layer.GetNextFeature());
5473  }
5474 
5475  // construct a coordinate transformation for each thread, if needed
5476  // some features may not have geometry, so look for the first one that does
5477  if (coordinate_transformations[thread_id] == nullptr) {
5478  for (auto const& feature : features[thread_id]) {
5479  auto const* geometry = feature->GetGeometryRef();
5480  if (geometry) {
5481  auto const* geometry_sr = geometry->getSpatialReference();
5482  // if the SR is non-null and non-empty and different from what we want
5483  // we need to make a reusable CoordinateTransformation
5484  if (geometry_sr &&
5485 #if GDAL_VERSION_MAJOR >= 3
5486  !geometry_sr->IsEmpty() &&
5487 #endif
5488  !geometry_sr->IsSame(poGeographicSR.get())) {