OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @brief Functions for Importer class
20  *
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
28 #include <gdal.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
55 #include "Archive/S3Archive.h"
56 #include "ArrowImporter.h"
57 #include "Catalog/os/UserMapping.h"
58 #ifdef ENABLE_IMPORT_PARQUET
60 #endif
61 #if defined(ENABLE_IMPORT_PARQUET)
62 #include "Catalog/ForeignTable.h"
64 #endif
65 #ifdef ENABLE_IMPORT_PARQUET
67 #endif
68 #include "Geospatial/Compression.h"
69 #include "Geospatial/GDAL.h"
70 #include "Geospatial/Transforms.h"
71 #include "Geospatial/Types.h"
76 #include "Logger/Logger.h"
78 #include "QueryEngine/Execute.h"
80 #include "RenderGroupAnalyzer.h"
81 #include "Shared/DateTimeParser.h"
82 #include "Shared/SqlTypesLayout.h"
84 #include "Shared/file_path_util.h"
85 #include "Shared/import_helpers.h"
86 #include "Shared/likely.h"
87 #include "Shared/measure.h"
88 #include "Shared/misc.h"
89 #include "Shared/scope.h"
90 #include "Shared/shard_key.h"
91 #include "Shared/thread_count.h"
93 
94 #include "gen-cpp/Heavy.h"
95 
96 #ifdef _WIN32
97 #include <fcntl.h>
98 #include <io.h>
99 #endif
100 
101 #define TIMER_STOP(t) \
102  (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
103  t)) / \
104  1.0E6f)
105 
107  32; // Max number of default import threads to use (num hardware threads will be used
108 // if lower, and can also be explicitly overriden in copy statement with threads
109 // option)
110 size_t g_archive_read_buf_size = 1 << 20;
111 
112 std::optional<size_t> g_detect_test_sample_size = std::nullopt;
113 
114 static constexpr int kMaxRasterScanlinesPerThread = 32;
115 
116 inline auto get_filesize(const std::string& file_path) {
117  boost::filesystem::path boost_file_path{file_path};
118  boost::system::error_code ec;
119  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
120  return ec ? 0 : filesize;
121 }
122 
123 namespace {
124 
125 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
126  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
128  executor->getSessionLock());
129  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
130  }
131  return false;
132 }
133 } // namespace
134 
135 // For logging std::vector<std::string> row.
136 namespace boost {
137 namespace log {
138 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
139  out << '[';
140  for (size_t i = 0; i < row.size(); ++i) {
141  out << (i ? ", " : "") << row[i];
142  }
143  out << ']';
144  return out;
145 }
146 } // namespace log
147 } // namespace boost
148 
149 namespace import_export {
150 
151 using FieldNameToIndexMapType = std::map<std::string, size_t>;
152 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
154  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
155 using FeaturePtrVector = std::vector<Geospatial::GDAL::FeatureUqPtr>;
156 
157 #define DEBUG_TIMING false
158 #define DEBUG_RENDER_GROUP_ANALYZER 0
159 #define DEBUG_AWS_AUTHENTICATION 0
160 
161 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
162 
163 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
164 
166 static std::map<std::string, ImportStatus> import_status_map;
167 
168 // max # rows to import
169 static const size_t kImportRowLimit = 10000;
170 
172  const TableDescriptor* t,
173  const std::string& f,
174  const CopyParams& p)
175  : Importer(new Loader(c, t), f, p) {}
176 
177 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
217 
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }
229 
230 ImportStatus Importer::get_import_status(const std::string& import_id) {
232  return import_status_map.at(import_id);
233 }
234 
235 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
237  is.end = std::chrono::steady_clock::now();
238  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
240 }
241 
242 static const std::string trim_space(const char* field, const size_t len) {
243  size_t i = 0;
244  size_t j = len;
245  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
246  i++;
247  }
248  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
249  j--;
250  }
251  return std::string(field + i, j - i);
252 }
253 
254 namespace {
256  SQLTypes type;
257  if (ti.is_decimal()) {
258  type = decimal_to_int_type(ti);
259  } else if (ti.is_dict_encoded_string()) {
260  type = string_dict_to_int_type(ti);
261  } else {
262  type = ti.get_type();
263  }
264  return type;
265 }
266 } // namespace
267 
269  Datum d;
270  const auto type = get_type_for_datum(ti);
271  switch (type) {
272  case kBOOLEAN:
274  break;
275  case kBIGINT:
277  break;
278  case kINT:
280  break;
281  case kSMALLINT:
283  break;
284  case kTINYINT:
286  break;
287  case kFLOAT:
289  break;
290  case kDOUBLE:
292  break;
293  case kTIME:
294  case kTIMESTAMP:
295  case kDATE:
297  break;
298  case kPOINT:
299  case kMULTIPOINT:
300  case kLINESTRING:
301  case kMULTILINESTRING:
302  case kPOLYGON:
303  case kMULTIPOLYGON:
304  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
305  default:
306  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
307  }
308  return d;
309 }
310 
311 ArrayDatum StringToArray(const std::string& s,
312  const SQLTypeInfo& ti,
313  const CopyParams& copy_params) {
314  SQLTypeInfo elem_ti = ti.get_elem_type();
315  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
316  return ArrayDatum(0, NULL, true);
317  }
318  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
319  LOG(WARNING) << "Malformed array: " << s;
320  return ArrayDatum(0, NULL, true);
321  }
322  std::vector<std::string> elem_strs;
323  size_t last = 1;
324  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
325  i = s.find(copy_params.array_delim, last)) {
326  elem_strs.push_back(s.substr(last, i - last));
327  last = i + 1;
328  }
329  if (last + 1 <= s.size()) {
330  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
331  }
332  if (elem_strs.size() == 1) {
333  auto str = elem_strs.front();
334  auto str_trimmed = trim_space(str.c_str(), str.length());
335  if (str_trimmed == "") {
336  elem_strs.clear(); // Empty array
337  }
338  }
339  if (!elem_ti.is_string()) {
340  size_t len = elem_strs.size() * elem_ti.get_size();
341  std::unique_ptr<int8_t, FreeDeleter> buf(
342  reinterpret_cast<int8_t*>(checked_malloc(len)));
343  int8_t* p = buf.get();
344  for (auto& es : elem_strs) {
345  auto e = trim_space(es.c_str(), es.length());
346  bool is_null = (e == copy_params.null_str) || e == "NULL";
347  if (!elem_ti.is_string() && e == "") {
348  is_null = true;
349  }
350  if (elem_ti.is_number() || elem_ti.is_time()) {
351  if (!isdigit(e[0]) && e[0] != '-') {
352  is_null = true;
353  }
354  }
355  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
356  p = append_datum(p, d, elem_ti);
357  CHECK(p);
358  }
359  return ArrayDatum(len, buf.release(), false);
360  }
361  // must not be called for array of strings
362  CHECK(false);
363  return ArrayDatum(0, NULL, true);
364 }
365 
367  SQLTypeInfo elem_ti = ti.get_elem_type();
368  auto len = ti.get_size();
369 
370  if (len > 0) {
371  // Compose a NULL fixlen array
372  int8_t* buf = (int8_t*)checked_malloc(len);
373  // First scalar is a NULL_ARRAY sentinel
374  Datum d = NullArrayDatum(elem_ti);
375  int8_t* p = append_datum(buf, d, elem_ti);
376  CHECK(p);
377  // Rest is filled with normal NULL sentinels
378  Datum d0 = NullDatum(elem_ti);
379  while ((p - buf) < len) {
380  p = append_datum(p, d0, elem_ti);
381  CHECK(p);
382  }
383  CHECK((p - buf) == len);
384  return ArrayDatum(len, buf, true);
385  }
386  // NULL varlen array
387  return ArrayDatum(0, NULL, true);
388 }
389 
391  return NullArray(ti);
392 }
393 
395  const SQLTypeInfo& geo_ti) {
396  if (geo_ti.get_compression() == kENCODING_GEOINT) {
397  CHECK(geo_ti.get_comp_param() == 32);
398  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
399  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
400  const size_t len = compressed_null_coords.size();
401  int8_t* buf = (int8_t*)checked_malloc(len);
402  memcpy(buf, compressed_null_coords.data(), len);
403  return ArrayDatum(len, buf, false);
404  }
405  auto modified_ti = coords_ti;
406  modified_ti.set_subtype(kDOUBLE);
408 }
409 
410 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
411  const auto& arr = datum.val.arr_val;
412  for (const auto& elem_datum : arr) {
413  string_vec.push_back(elem_datum.val.str_val);
414  }
415 }
416 
417 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
418  Datum d;
419  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
420  switch (type) {
421  case kBOOLEAN:
422  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
423  break;
424  case kBIGINT:
425  d.bigintval =
426  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
427  break;
428  case kINT:
429  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
430  break;
431  case kSMALLINT:
432  d.smallintval =
433  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
434  break;
435  case kTINYINT:
436  d.tinyintval =
437  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
438  break;
439  case kFLOAT:
440  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
441  break;
442  case kDOUBLE:
443  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
444  break;
445  case kTIME:
446  case kTIMESTAMP:
447  case kDATE:
448  d.bigintval =
449  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
450  break;
451  case kPOINT:
452  case kMULTIPOINT:
453  case kLINESTRING:
454  case kMULTILINESTRING:
455  case kPOLYGON:
456  case kMULTIPOLYGON:
457  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
458  default:
459  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
460  }
461  return d;
462 }
463 
464 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
465  SQLTypeInfo elem_ti = ti.get_elem_type();
466 
467  CHECK(!elem_ti.is_string());
468 
469  if (datum.is_null) {
470  return NullArray(ti);
471  }
472 
473  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
474  int8_t* buf = (int8_t*)checked_malloc(len);
475  int8_t* p = buf;
476  for (auto& e : datum.val.arr_val) {
477  p = append_datum(p, TDatumToDatum(e, elem_ti), elem_ti);
478  CHECK(p);
479  }
480 
481  return ArrayDatum(len, buf, false);
482 }
483 
484 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
486  std::vector<std::string_view> string_view_vec;
487  string_view_vec.reserve(string_vec.size());
488  for (const auto& str : string_vec) {
489  if (str.size() > StringDictionary::MAX_STRLEN) {
490  std::ostringstream oss;
491  oss << "while processing dictionary for column " << getColumnDesc()->columnName
492  << " a string was detected too long for encoding, string length = "
493  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
494  throw std::runtime_error(oss.str());
495  }
496  string_view_vec.push_back(str);
497  }
498  try {
499  switch (column_desc_->columnType.get_size()) {
500  case 1:
501  string_dict_i8_buffer_->resize(string_view_vec.size());
502  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
503  break;
504  case 2:
505  string_dict_i16_buffer_->resize(string_view_vec.size());
506  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
507  break;
508  case 4:
509  string_dict_i32_buffer_->resize(string_view_vec.size());
510  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
511  break;
512  default:
513  CHECK(false);
514  }
515  } catch (std::exception& e) {
516  std::ostringstream oss;
517  oss << "while processing dictionary for column " << getColumnDesc()->columnName
518  << " : " << e.what();
519  LOG(ERROR) << oss.str();
520  throw std::runtime_error(oss.str());
521  }
522 }
523 
525  const std::string_view val,
526  const bool is_null,
527  const CopyParams& copy_params,
528  const bool check_not_null) {
529  const auto type = cd->columnType.get_type();
530  switch (type) {
531  case kBOOLEAN: {
532  if (is_null) {
533  if (check_not_null && cd->columnType.get_notnull()) {
534  throw std::runtime_error("NULL for column " + cd->columnName);
535  }
537  } else {
538  auto ti = cd->columnType;
539  Datum d = StringToDatum(val, ti);
540  addBoolean(static_cast<int8_t>(d.boolval));
541  }
542  break;
543  }
544  case kTINYINT: {
545  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
546  auto ti = cd->columnType;
547  Datum d = StringToDatum(val, ti);
549  } else {
550  if (check_not_null && cd->columnType.get_notnull()) {
551  throw std::runtime_error("NULL for column " + cd->columnName);
552  }
554  }
555  break;
556  }
557  case kSMALLINT: {
558  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
559  auto ti = cd->columnType;
560  Datum d = StringToDatum(val, ti);
562  } else {
563  if (check_not_null && cd->columnType.get_notnull()) {
564  throw std::runtime_error("NULL for column " + cd->columnName);
565  }
567  }
568  break;
569  }
570  case kINT: {
571  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
572  auto ti = cd->columnType;
573  Datum d = StringToDatum(val, ti);
574  addInt(d.intval);
575  } else {
576  if (check_not_null && cd->columnType.get_notnull()) {
577  throw std::runtime_error("NULL for column " + cd->columnName);
578  }
580  }
581  break;
582  }
583  case kBIGINT: {
584  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
585  auto ti = cd->columnType;
586  Datum d = StringToDatum(val, ti);
587  addBigint(d.bigintval);
588  } else {
589  if (check_not_null && cd->columnType.get_notnull()) {
590  throw std::runtime_error("NULL for column " + cd->columnName);
591  }
593  }
594  break;
595  }
596  case kDECIMAL:
597  case kNUMERIC: {
598  if (!is_null) {
599  auto ti = cd->columnType;
600  Datum d = StringToDatum(val, ti);
601  DecimalOverflowValidator validator(ti);
602  validator.validate(d.bigintval);
603  addBigint(d.bigintval);
604  } else {
605  if (check_not_null && cd->columnType.get_notnull()) {
606  throw std::runtime_error("NULL for column " + cd->columnName);
607  }
609  }
610  break;
611  }
612  case kFLOAT:
613  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
614  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
615  } else {
616  if (check_not_null && cd->columnType.get_notnull()) {
617  throw std::runtime_error("NULL for column " + cd->columnName);
618  }
620  }
621  break;
622  case kDOUBLE:
623  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
624  addDouble(std::atof(std::string(val).c_str()));
625  } else {
626  if (check_not_null && cd->columnType.get_notnull()) {
627  throw std::runtime_error("NULL for column " + cd->columnName);
628  }
630  }
631  break;
632  case kTEXT:
633  case kVARCHAR:
634  case kCHAR: {
635  // @TODO(wei) for now, use empty string for nulls
636  if (is_null) {
637  if (check_not_null && cd->columnType.get_notnull()) {
638  throw std::runtime_error("NULL for column " + cd->columnName);
639  }
640  addString(std::string());
641  } else {
642  if (val.length() > StringDictionary::MAX_STRLEN) {
643  throw std::runtime_error("String too long for column " + cd->columnName +
644  " was " + std::to_string(val.length()) + " max is " +
646  }
647  addString(val);
648  }
649  break;
650  }
651  case kTIME:
652  case kTIMESTAMP:
653  case kDATE:
654  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
655  SQLTypeInfo ti = cd->columnType;
656  Datum d = StringToDatum(val, ti);
657  addBigint(d.bigintval);
658  } else {
659  if (check_not_null && cd->columnType.get_notnull()) {
660  throw std::runtime_error("NULL for column " + cd->columnName);
661  }
663  }
664  break;
665  case kARRAY: {
666  if (check_not_null && is_null && cd->columnType.get_notnull()) {
667  throw std::runtime_error("NULL for column " + cd->columnName);
668  }
669  SQLTypeInfo ti = cd->columnType;
670  if (IS_STRING(ti.get_subtype())) {
671  std::vector<std::string> string_vec;
672  // Just parse string array, don't push it to buffer yet as we might throw
674  std::string(val), copy_params, string_vec);
675  if (!is_null) {
676  if (ti.get_size() > 0) {
677  auto sti = ti.get_elem_type();
678  size_t expected_size = ti.get_size() / sti.get_size();
679  size_t actual_size = string_vec.size();
680  if (actual_size != expected_size) {
681  throw std::runtime_error("Fixed length array column " + cd->columnName +
682  " expects " + std::to_string(expected_size) +
683  " values, received " +
684  std::to_string(actual_size));
685  }
686  }
687  addStringArray(string_vec);
688  } else {
689  addStringArray(std::nullopt);
690  }
691  } else {
692  if (!is_null) {
693  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
694  if (d.is_null) { // val could be "NULL"
695  addArray(NullArray(ti));
696  } else {
697  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
698  throw std::runtime_error("Fixed length array for column " + cd->columnName +
699  " has incorrect length: " + std::string(val));
700  }
701  addArray(d);
702  }
703  } else {
704  addArray(NullArray(ti));
705  }
706  }
707  break;
708  }
709  case kPOINT:
710  case kMULTIPOINT:
711  case kLINESTRING:
712  case kMULTILINESTRING:
713  case kPOLYGON:
714  case kMULTIPOLYGON:
715  addGeoString(val);
716  break;
717  default:
718  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
719  }
720 }
721 
723  const auto type = column_desc_->columnType.is_decimal()
726  switch (type) {
727  case kBOOLEAN:
728  bool_buffer_->pop_back();
729  break;
730  case kTINYINT:
731  tinyint_buffer_->pop_back();
732  break;
733  case kSMALLINT:
734  smallint_buffer_->pop_back();
735  break;
736  case kINT:
737  int_buffer_->pop_back();
738  break;
739  case kBIGINT:
740  bigint_buffer_->pop_back();
741  break;
742  case kFLOAT:
743  float_buffer_->pop_back();
744  break;
745  case kDOUBLE:
746  double_buffer_->pop_back();
747  break;
748  case kTEXT:
749  case kVARCHAR:
750  case kCHAR:
751  string_buffer_->pop_back();
752  break;
753  case kDATE:
754  case kTIME:
755  case kTIMESTAMP:
756  bigint_buffer_->pop_back();
757  break;
758  case kARRAY:
760  string_array_buffer_->pop_back();
761  } else {
762  array_buffer_->pop_back();
763  }
764  break;
765  case kPOINT:
766  case kMULTIPOINT:
767  case kLINESTRING:
768  case kMULTILINESTRING:
769  case kPOLYGON:
770  case kMULTIPOLYGON:
771  geo_string_buffer_->pop_back();
772  break;
773  default:
774  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
775  }
776 }
777 
778 struct GeoImportException : std::runtime_error {
779  using std::runtime_error::runtime_error;
780 };
781 
782 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
783 template <typename DATA_TYPE>
785  const ColumnDescriptor* cd,
786  const Array& array,
787  std::vector<DATA_TYPE>& buffer,
788  const ArraySliceRange& slice_range,
789  import_export::BadRowsTracker* const bad_rows_tracker) {
790  auto data =
791  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
792  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
793  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
794  if (bad_rows_tracker && cd->columnType.is_geometry()) {
795  f_add_geo_phy_cols = [&](const int64_t row) {
796  // Populate physical columns (ref. DBHandler::load_table)
797  std::vector<double> coords, bounds;
798  std::vector<int> ring_sizes, poly_rings;
799  int render_group = 0;
800  SQLTypeInfo ti;
801  // replace any unexpected exception from getGeoColumns or other
802  // on this path with a GeoImportException so that we wont over
803  // push a null to the logical column...
804  try {
805  SQLTypeInfo import_ti{ti};
806  if (array.IsNull(row)) {
808  import_ti, coords, bounds, ring_sizes, poly_rings, false);
809  } else {
810  arrow_throw_if<GeoImportException>(
812  ti,
813  coords,
814  bounds,
815  ring_sizes,
816  poly_rings,
817  false),
818  error_context(cd, bad_rows_tracker) + "Invalid geometry");
819  arrow_throw_if<GeoImportException>(
820  cd->columnType.get_type() != ti.get_type(),
821  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
822  }
823  auto col_idx_workpad = col_idx; // what a pitfall!!
825  bad_rows_tracker->importer->getCatalog(),
826  cd,
828  col_idx_workpad,
829  coords,
830  bounds,
831  ring_sizes,
832  poly_rings,
833  render_group);
834  } catch (GeoImportException&) {
835  throw;
836  } catch (std::runtime_error& e) {
837  throw GeoImportException(e.what());
838  } catch (const std::exception& e) {
839  throw GeoImportException(e.what());
840  } catch (...) {
841  throw GeoImportException("unknown exception");
842  }
843  };
844  }
845  auto f_mark_a_bad_row = [&](const auto row) {
846  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
847  bad_rows_tracker->rows.insert(row - slice_range.first);
848  };
849  buffer.reserve(slice_range.second - slice_range.first);
850  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
851  try {
852  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
853  f_add_geo_phy_cols(row);
854  } catch (GeoImportException&) {
855  f_mark_a_bad_row(row);
856  } catch (ArrowImporterException&) {
857  // trace bad rows of each column; otherwise rethrow.
858  if (bad_rows_tracker) {
859  *data << nullptr;
860  f_mark_a_bad_row(row);
861  } else {
862  throw;
863  }
864  }
865  }
866  return buffer.size();
867 }
868 
870  const Array& col,
871  const bool exact_type_match,
872  const ArraySliceRange& slice_range,
873  BadRowsTracker* const bad_rows_tracker) {
874  const auto type = cd->columnType.get_type();
875  if (cd->columnType.get_notnull()) {
876  // We can't have any null values for this column; to have them is an error
877  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
878  }
879 
880  switch (type) {
881  case kBOOLEAN:
882  if (exact_type_match) {
883  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
884  }
886  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
887  case kTINYINT:
888  if (exact_type_match) {
889  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
890  }
892  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
893  case kSMALLINT:
894  if (exact_type_match) {
895  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
896  }
898  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
899  case kINT:
900  if (exact_type_match) {
901  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
902  }
904  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
905  case kBIGINT:
906  case kNUMERIC:
907  case kDECIMAL:
908  if (exact_type_match) {
909  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
910  }
912  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
913  case kFLOAT:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
916  }
918  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
919  case kDOUBLE:
920  if (exact_type_match) {
921  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
922  }
924  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
925  case kTEXT:
926  case kVARCHAR:
927  case kCHAR:
928  if (exact_type_match) {
929  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
930  "Expected string type");
931  }
933  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
934  case kTIME:
935  if (exact_type_match) {
936  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
937  "Expected time32 or time64 type");
938  }
940  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
941  case kTIMESTAMP:
942  if (exact_type_match) {
943  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
944  }
946  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
947  case kDATE:
948  if (exact_type_match) {
949  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
950  "Expected date32 or date64 type");
951  }
953  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
954  case kPOINT:
955  case kMULTIPOINT:
956  case kLINESTRING:
957  case kMULTILINESTRING:
958  case kPOLYGON:
959  case kMULTIPOLYGON:
960  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
961  "Expected string type");
963  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
964  case kARRAY:
965  throw std::runtime_error("Arrow array appends not yet supported");
966  default:
967  throw std::runtime_error("Invalid Type");
968  }
969 }
970 
971 // this is exclusively used by load_table_binary_columnar
972 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
973  size_t dataSize = 0;
974  if (cd->columnType.get_notnull()) {
975  // We can't have any null values for this column; to have them is an error
976  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
977  throw std::runtime_error("NULL for column " + cd->columnName);
978  }
979  }
980 
981  switch (cd->columnType.get_type()) {
982  case kBOOLEAN: {
983  dataSize = col.data.int_col.size();
984  bool_buffer_->reserve(dataSize);
985  for (size_t i = 0; i < dataSize; i++) {
986  if (col.nulls[i]) {
988  } else {
989  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
990  }
991  }
992  break;
993  }
994  case kTINYINT: {
995  dataSize = col.data.int_col.size();
996  tinyint_buffer_->reserve(dataSize);
997  for (size_t i = 0; i < dataSize; i++) {
998  if (col.nulls[i]) {
1000  } else {
1001  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1002  }
1003  }
1004  break;
1005  }
1006  case kSMALLINT: {
1007  dataSize = col.data.int_col.size();
1008  smallint_buffer_->reserve(dataSize);
1009  for (size_t i = 0; i < dataSize; i++) {
1010  if (col.nulls[i]) {
1012  } else {
1013  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1014  }
1015  }
1016  break;
1017  }
1018  case kINT: {
1019  dataSize = col.data.int_col.size();
1020  int_buffer_->reserve(dataSize);
1021  for (size_t i = 0; i < dataSize; i++) {
1022  if (col.nulls[i]) {
1024  } else {
1025  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1026  }
1027  }
1028  break;
1029  }
1030  case kBIGINT:
1031  case kNUMERIC:
1032  case kDECIMAL: {
1033  dataSize = col.data.int_col.size();
1034  bigint_buffer_->reserve(dataSize);
1035  for (size_t i = 0; i < dataSize; i++) {
1036  if (col.nulls[i]) {
1038  } else {
1039  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1040  }
1041  }
1042  break;
1043  }
1044  case kFLOAT: {
1045  dataSize = col.data.real_col.size();
1046  float_buffer_->reserve(dataSize);
1047  for (size_t i = 0; i < dataSize; i++) {
1048  if (col.nulls[i]) {
1049  float_buffer_->push_back(NULL_FLOAT);
1050  } else {
1051  float_buffer_->push_back((float)col.data.real_col[i]);
1052  }
1053  }
1054  break;
1055  }
1056  case kDOUBLE: {
1057  dataSize = col.data.real_col.size();
1058  double_buffer_->reserve(dataSize);
1059  for (size_t i = 0; i < dataSize; i++) {
1060  if (col.nulls[i]) {
1061  double_buffer_->push_back(NULL_DOUBLE);
1062  } else {
1063  double_buffer_->push_back((double)col.data.real_col[i]);
1064  }
1065  }
1066  break;
1067  }
1068  case kTEXT:
1069  case kVARCHAR:
1070  case kCHAR: {
1071  // TODO: for now, use empty string for nulls
1072  dataSize = col.data.str_col.size();
1073  string_buffer_->reserve(dataSize);
1074  for (size_t i = 0; i < dataSize; i++) {
1075  if (col.nulls[i]) {
1076  string_buffer_->push_back(std::string());
1077  } else {
1078  string_buffer_->push_back(col.data.str_col[i]);
1079  }
1080  }
1081  break;
1082  }
1083  case kTIME:
1084  case kTIMESTAMP:
1085  case kDATE: {
1086  dataSize = col.data.int_col.size();
1087  bigint_buffer_->reserve(dataSize);
1088  for (size_t i = 0; i < dataSize; i++) {
1089  if (col.nulls[i]) {
1091  } else {
1092  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1093  }
1094  }
1095  break;
1096  }
1097  case kPOINT:
1098  case kMULTIPOINT:
1099  case kLINESTRING:
1100  case kMULTILINESTRING:
1101  case kPOLYGON:
1102  case kMULTIPOLYGON: {
1103  dataSize = col.data.str_col.size();
1104  geo_string_buffer_->reserve(dataSize);
1105  for (size_t i = 0; i < dataSize; i++) {
1106  if (col.nulls[i]) {
1107  // TODO: add support for NULL geo
1108  geo_string_buffer_->push_back(std::string());
1109  } else {
1110  geo_string_buffer_->push_back(col.data.str_col[i]);
1111  }
1112  }
1113  break;
1114  }
1115  case kARRAY: {
1116  dataSize = col.data.arr_col.size();
1117  if (IS_STRING(cd->columnType.get_subtype())) {
1118  for (size_t i = 0; i < dataSize; i++) {
1119  OptionalStringVector& string_vec = addStringArray();
1120  if (!col.nulls[i]) {
1121  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1122  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1123  string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1124  }
1125  }
1126  }
1127  } else {
1128  auto elem_ti = cd->columnType.get_subtype();
1129  switch (elem_ti) {
1130  case kBOOLEAN: {
1131  for (size_t i = 0; i < dataSize; i++) {
1132  if (col.nulls[i]) {
1134  } else {
1135  size_t len = col.data.arr_col[i].data.int_col.size();
1136  size_t byteSize = len * sizeof(int8_t);
1137  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1138  int8_t* p = buf;
1139  for (size_t j = 0; j < len; ++j) {
1140  // Explicitly checking the item for null because
1141  // casting null value (-128) to bool results
1142  // incorrect value 1.
1143  if (col.data.arr_col[i].nulls[j]) {
1144  *p = static_cast<int8_t>(
1146  } else {
1147  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1148  }
1149  p += sizeof(bool);
1150  }
1151  addArray(ArrayDatum(byteSize, buf, false));
1152  }
1153  }
1154  break;
1155  }
1156  case kTINYINT: {
1157  for (size_t i = 0; i < dataSize; i++) {
1158  if (col.nulls[i]) {
1160  } else {
1161  size_t len = col.data.arr_col[i].data.int_col.size();
1162  size_t byteSize = len * sizeof(int8_t);
1163  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1164  int8_t* p = buf;
1165  for (size_t j = 0; j < len; ++j) {
1166  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1167  p += sizeof(int8_t);
1168  }
1169  addArray(ArrayDatum(byteSize, buf, false));
1170  }
1171  }
1172  break;
1173  }
1174  case kSMALLINT: {
1175  for (size_t i = 0; i < dataSize; i++) {
1176  if (col.nulls[i]) {
1178  } else {
1179  size_t len = col.data.arr_col[i].data.int_col.size();
1180  size_t byteSize = len * sizeof(int16_t);
1181  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1182  int8_t* p = buf;
1183  for (size_t j = 0; j < len; ++j) {
1184  *(int16_t*)p =
1185  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1186  p += sizeof(int16_t);
1187  }
1188  addArray(ArrayDatum(byteSize, buf, false));
1189  }
1190  }
1191  break;
1192  }
1193  case kINT: {
1194  for (size_t i = 0; i < dataSize; i++) {
1195  if (col.nulls[i]) {
1197  } else {
1198  size_t len = col.data.arr_col[i].data.int_col.size();
1199  size_t byteSize = len * sizeof(int32_t);
1200  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1201  int8_t* p = buf;
1202  for (size_t j = 0; j < len; ++j) {
1203  *(int32_t*)p =
1204  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1205  p += sizeof(int32_t);
1206  }
1207  addArray(ArrayDatum(byteSize, buf, false));
1208  }
1209  }
1210  break;
1211  }
1212  case kBIGINT:
1213  case kNUMERIC:
1214  case kDECIMAL: {
1215  for (size_t i = 0; i < dataSize; i++) {
1216  if (col.nulls[i]) {
1218  } else {
1219  size_t len = col.data.arr_col[i].data.int_col.size();
1220  size_t byteSize = len * sizeof(int64_t);
1221  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1222  int8_t* p = buf;
1223  for (size_t j = 0; j < len; ++j) {
1224  *(int64_t*)p =
1225  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1226  p += sizeof(int64_t);
1227  }
1228  addArray(ArrayDatum(byteSize, buf, false));
1229  }
1230  }
1231  break;
1232  }
1233  case kFLOAT: {
1234  for (size_t i = 0; i < dataSize; i++) {
1235  if (col.nulls[i]) {
1237  } else {
1238  size_t len = col.data.arr_col[i].data.real_col.size();
1239  size_t byteSize = len * sizeof(float);
1240  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1241  int8_t* p = buf;
1242  for (size_t j = 0; j < len; ++j) {
1243  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1244  p += sizeof(float);
1245  }
1246  addArray(ArrayDatum(byteSize, buf, false));
1247  }
1248  }
1249  break;
1250  }
1251  case kDOUBLE: {
1252  for (size_t i = 0; i < dataSize; i++) {
1253  if (col.nulls[i]) {
1255  } else {
1256  size_t len = col.data.arr_col[i].data.real_col.size();
1257  size_t byteSize = len * sizeof(double);
1258  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1259  int8_t* p = buf;
1260  for (size_t j = 0; j < len; ++j) {
1261  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1262  p += sizeof(double);
1263  }
1264  addArray(ArrayDatum(byteSize, buf, false));
1265  }
1266  }
1267  break;
1268  }
1269  case kTIME:
1270  case kTIMESTAMP:
1271  case kDATE: {
1272  for (size_t i = 0; i < dataSize; i++) {
1273  if (col.nulls[i]) {
1275  } else {
1276  size_t len = col.data.arr_col[i].data.int_col.size();
1277  size_t byteWidth = sizeof(int64_t);
1278  size_t byteSize = len * byteWidth;
1279  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1280  int8_t* p = buf;
1281  for (size_t j = 0; j < len; ++j) {
1282  *reinterpret_cast<int64_t*>(p) =
1283  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1284  p += sizeof(int64_t);
1285  }
1286  addArray(ArrayDatum(byteSize, buf, false));
1287  }
1288  }
1289  break;
1290  }
1291  default:
1292  throw std::runtime_error("Invalid Array Type");
1293  }
1294  }
1295  break;
1296  }
1297  default:
1298  throw std::runtime_error("Invalid Type");
1299  }
1300  return dataSize;
1301 }
1302 
1304  const TDatum& datum,
1305  const bool is_null) {
1306  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1307  : cd->columnType.get_type();
1308  switch (type) {
1309  case kBOOLEAN: {
1310  if (is_null) {
1311  if (cd->columnType.get_notnull()) {
1312  throw std::runtime_error("NULL for column " + cd->columnName);
1313  }
1315  } else {
1316  addBoolean((int8_t)datum.val.int_val);
1317  }
1318  break;
1319  }
1320  case kTINYINT:
1321  if (!is_null) {
1322  addTinyint((int8_t)datum.val.int_val);
1323  } else {
1324  if (cd->columnType.get_notnull()) {
1325  throw std::runtime_error("NULL for column " + cd->columnName);
1326  }
1328  }
1329  break;
1330  case kSMALLINT:
1331  if (!is_null) {
1332  addSmallint((int16_t)datum.val.int_val);
1333  } else {
1334  if (cd->columnType.get_notnull()) {
1335  throw std::runtime_error("NULL for column " + cd->columnName);
1336  }
1338  }
1339  break;
1340  case kINT:
1341  if (!is_null) {
1342  addInt((int32_t)datum.val.int_val);
1343  } else {
1344  if (cd->columnType.get_notnull()) {
1345  throw std::runtime_error("NULL for column " + cd->columnName);
1346  }
1348  }
1349  break;
1350  case kBIGINT:
1351  if (!is_null) {
1352  addBigint(datum.val.int_val);
1353  } else {
1354  if (cd->columnType.get_notnull()) {
1355  throw std::runtime_error("NULL for column " + cd->columnName);
1356  }
1358  }
1359  break;
1360  case kFLOAT:
1361  if (!is_null) {
1362  addFloat((float)datum.val.real_val);
1363  } else {
1364  if (cd->columnType.get_notnull()) {
1365  throw std::runtime_error("NULL for column " + cd->columnName);
1366  }
1368  }
1369  break;
1370  case kDOUBLE:
1371  if (!is_null) {
1372  addDouble(datum.val.real_val);
1373  } else {
1374  if (cd->columnType.get_notnull()) {
1375  throw std::runtime_error("NULL for column " + cd->columnName);
1376  }
1378  }
1379  break;
1380  case kTEXT:
1381  case kVARCHAR:
1382  case kCHAR: {
1383  // @TODO(wei) for now, use empty string for nulls
1384  if (is_null) {
1385  if (cd->columnType.get_notnull()) {
1386  throw std::runtime_error("NULL for column " + cd->columnName);
1387  }
1388  addString(std::string());
1389  } else {
1390  addString(datum.val.str_val);
1391  }
1392  break;
1393  }
1394  case kTIME:
1395  case kTIMESTAMP:
1396  case kDATE: {
1397  if (!is_null) {
1398  addBigint(datum.val.int_val);
1399  } else {
1400  if (cd->columnType.get_notnull()) {
1401  throw std::runtime_error("NULL for column " + cd->columnName);
1402  }
1404  }
1405  break;
1406  }
1407  case kARRAY:
1408  if (is_null && cd->columnType.get_notnull()) {
1409  throw std::runtime_error("NULL for column " + cd->columnName);
1410  }
1411  if (IS_STRING(cd->columnType.get_subtype())) {
1412  OptionalStringVector& string_vec = addStringArray();
1413  addBinaryStringArray(datum, *string_vec);
1414  } else {
1415  if (!is_null) {
1416  addArray(TDatumToArrayDatum(datum, cd->columnType));
1417  } else {
1419  }
1420  }
1421  break;
1422  case kPOINT:
1423  case kMULTIPOINT:
1424  case kLINESTRING:
1425  case kMULTILINESTRING:
1426  case kPOLYGON:
1427  case kMULTIPOLYGON:
1428  if (is_null) {
1429  if (cd->columnType.get_notnull()) {
1430  throw std::runtime_error("NULL for column " + cd->columnName);
1431  }
1432  addGeoString(std::string());
1433  } else {
1434  addGeoString(datum.val.str_val);
1435  }
1436  break;
1437  default:
1438  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1439  }
1440 }
1441 
1442 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1443  bool is_null = !cd->default_value.has_value();
1444  CHECK(!(is_null && cd->columnType.get_notnull()));
1445  const auto type = cd->columnType.get_type();
1446  auto ti = cd->columnType;
1447  auto val = cd->default_value.value_or("NULL");
1448  CopyParams cp;
1449  switch (type) {
1450  case kBOOLEAN: {
1451  if (!is_null) {
1452  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1453  } else {
1454  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1455  }
1456  break;
1457  }
1458  case kTINYINT: {
1459  if (!is_null) {
1460  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1461  } else {
1463  }
1464  break;
1465  }
1466  case kSMALLINT: {
1467  if (!is_null) {
1468  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1469  } else {
1470  smallint_buffer_->resize(num_rows,
1472  }
1473  break;
1474  }
1475  case kINT: {
1476  if (!is_null) {
1477  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1478  } else {
1479  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1480  }
1481  break;
1482  }
1483  case kBIGINT: {
1484  if (!is_null) {
1485  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1486  } else {
1488  }
1489  break;
1490  }
1491  case kDECIMAL:
1492  case kNUMERIC: {
1493  if (!is_null) {
1494  const auto converted_decimal_value = convert_decimal_value_to_scale(
1495  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1496  bigint_buffer_->resize(num_rows, converted_decimal_value);
1497  } else {
1499  }
1500  break;
1501  }
1502  case kFLOAT:
1503  if (!is_null) {
1504  float_buffer_->resize(num_rows,
1505  static_cast<float>(std::atof(std::string(val).c_str())));
1506  } else {
1507  float_buffer_->resize(num_rows, NULL_FLOAT);
1508  }
1509  break;
1510  case kDOUBLE:
1511  if (!is_null) {
1512  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1513  } else {
1514  double_buffer_->resize(num_rows, NULL_DOUBLE);
1515  }
1516  break;
1517  case kTEXT:
1518  case kVARCHAR:
1519  case kCHAR: {
1520  if (is_null) {
1521  string_buffer_->resize(num_rows, "");
1522  } else {
1523  if (val.length() > StringDictionary::MAX_STRLEN) {
1524  throw std::runtime_error("String too long for column " + cd->columnName +
1525  " was " + std::to_string(val.length()) + " max is " +
1527  }
1528  string_buffer_->resize(num_rows, val);
1529  }
1530  break;
1531  }
1532  case kTIME:
1533  case kTIMESTAMP:
1534  case kDATE:
1535  if (!is_null) {
1536  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1537  } else {
1539  }
1540  break;
1541  case kARRAY: {
1542  if (IS_STRING(ti.get_subtype())) {
1543  std::vector<std::string> string_vec;
1544  // Just parse string array, don't push it to buffer yet as we might throw
1546  std::string(val), cp, string_vec);
1547  if (!is_null) {
1548  // TODO: add support for NULL string arrays
1549  if (ti.get_size() > 0) {
1550  auto sti = ti.get_elem_type();
1551  size_t expected_size = ti.get_size() / sti.get_size();
1552  size_t actual_size = string_vec.size();
1553  if (actual_size != expected_size) {
1554  throw std::runtime_error("Fixed length array column " + cd->columnName +
1555  " expects " + std::to_string(expected_size) +
1556  " values, received " +
1557  std::to_string(actual_size));
1558  }
1559  }
1560  string_array_buffer_->resize(num_rows, string_vec);
1561  } else {
1562  if (ti.get_size() > 0) {
1563  // TODO: remove once NULL fixlen arrays are allowed
1564  throw std::runtime_error("Fixed length array column " + cd->columnName +
1565  " currently cannot accept NULL arrays");
1566  }
1567  // TODO: add support for NULL string arrays, replace with addStringArray(),
1568  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1569  string_array_buffer_->resize(num_rows, string_vec);
1570  }
1571  } else {
1572  if (!is_null) {
1573  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1574  if (d.is_null) { // val could be "NULL"
1575  array_buffer_->resize(num_rows, NullArray(ti));
1576  } else {
1577  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1578  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1579  " has incorrect length: " + std::string(val));
1580  }
1581  array_buffer_->resize(num_rows, d);
1582  }
1583  } else {
1584  array_buffer_->resize(num_rows, NullArray(ti));
1585  }
1586  }
1587  break;
1588  }
1589  case kPOINT:
1590  case kMULTIPOINT:
1591  case kLINESTRING:
1592  case kMULTILINESTRING:
1593  case kPOLYGON:
1594  case kMULTIPOLYGON:
1595  geo_string_buffer_->resize(num_rows, val);
1596  break;
1597  default:
1598  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1599  << type;
1600  }
1601 }
1602 
1603 bool importGeoFromLonLat(double lon,
1604  double lat,
1605  std::vector<double>& coords,
1606  SQLTypeInfo& ti) {
1607  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1608  return false;
1609  }
1610  if (ti.transforms()) {
1611  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1612  if (!pt.transform(ti)) {
1613  return false;
1614  }
1615  pt.getColumns(coords);
1616  return true;
1617  }
1618  coords.push_back(lon);
1619  coords.push_back(lat);
1620  return true;
1621 }
1622 
1624  const Catalog_Namespace::Catalog& catalog,
1625  const ColumnDescriptor* cd,
1626  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1627  size_t& col_idx,
1628  std::vector<double>& coords,
1629  std::vector<double>& bounds,
1630  std::vector<int>& ring_sizes,
1631  std::vector<int>& poly_rings,
1632  int render_group,
1633  const bool force_null) {
1634  const auto col_ti = cd->columnType;
1635  const auto col_type = col_ti.get_type();
1636  auto columnId = cd->columnId;
1637  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1638  bool is_null_geo = false;
1639  bool is_null_point = false;
1640  if (!col_ti.get_notnull()) {
1641  // Check for NULL geo
1642  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1643  is_null_point = true;
1644  coords.clear();
1645  }
1646  is_null_geo = coords.empty();
1647  if (is_null_point) {
1648  coords.push_back(NULL_ARRAY_DOUBLE);
1649  coords.push_back(NULL_DOUBLE);
1650  // Treating POINT coords as notnull, need to store actual encoding
1651  // [un]compressed+[not]null
1652  is_null_geo = false;
1653  }
1654  }
1655  if (force_null) {
1656  is_null_geo = true;
1657  }
1658  TDatum tdd_coords;
1659  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1660  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1661  // in a fixlen array, compressed and uncompressed.
1662  if (!is_null_geo) {
1663  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1664  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1665  for (auto cc : compressed_coords) {
1666  tdd_coords.val.arr_val.emplace_back();
1667  tdd_coords.val.arr_val.back().val.int_val = cc;
1668  }
1669  }
1670  tdd_coords.is_null = is_null_geo;
1671  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1672 
1673  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1674  // Create [linest]ring_sizes array value and add it to the physical column
1675  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1676  TDatum tdd_ring_sizes;
1677  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1678  if (!is_null_geo) {
1679  for (auto ring_size : ring_sizes) {
1680  tdd_ring_sizes.val.arr_val.emplace_back();
1681  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1682  }
1683  }
1684  tdd_ring_sizes.is_null = is_null_geo;
1685  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1686  }
1687 
1688  if (col_type == kMULTIPOLYGON) {
1689  // Create poly_rings array value and add it to the physical column
1690  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1691  TDatum tdd_poly_rings;
1692  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1693  if (!is_null_geo) {
1694  for (auto num_rings : poly_rings) {
1695  tdd_poly_rings.val.arr_val.emplace_back();
1696  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1697  }
1698  }
1699  tdd_poly_rings.is_null = is_null_geo;
1700  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1701  }
1702 
1703  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1704  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1705  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1706  TDatum tdd_bounds;
1707  tdd_bounds.val.arr_val.reserve(bounds.size());
1708  if (!is_null_geo) {
1709  for (auto b : bounds) {
1710  tdd_bounds.val.arr_val.emplace_back();
1711  tdd_bounds.val.arr_val.back().val.real_val = b;
1712  }
1713  }
1714  tdd_bounds.is_null = is_null_geo;
1715  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1716  }
1717 
1718  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1719  // Create render_group value and add it to the physical column
1720  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1721  TDatum td_render_group;
1722  td_render_group.val.int_val = render_group;
1723  td_render_group.is_null = is_null_geo;
1724  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1725  }
1726 }
1727 
1729  const Catalog_Namespace::Catalog& catalog,
1730  const ColumnDescriptor* cd,
1731  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1732  size_t& col_idx,
1733  std::vector<std::vector<double>>& coords_column,
1734  std::vector<std::vector<double>>& bounds_column,
1735  std::vector<std::vector<int>>& ring_sizes_column,
1736  std::vector<std::vector<int>>& poly_rings_column,
1737  std::vector<int>& render_groups_column) {
1738  const auto col_ti = cd->columnType;
1739  const auto col_type = col_ti.get_type();
1740  auto columnId = cd->columnId;
1741 
1742  auto coords_row_count = coords_column.size();
1743  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1744  for (auto& coords : coords_column) {
1745  bool is_null_geo = false;
1746  bool is_null_point = false;
1747  if (!col_ti.get_notnull()) {
1748  // Check for NULL geo
1749  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1750  is_null_point = true;
1751  coords.clear();
1752  }
1753  is_null_geo = coords.empty();
1754  if (is_null_point) {
1755  coords.push_back(NULL_ARRAY_DOUBLE);
1756  coords.push_back(NULL_DOUBLE);
1757  // Treating POINT coords as notnull, need to store actual encoding
1758  // [un]compressed+[not]null
1759  is_null_geo = false;
1760  }
1761  }
1762  std::vector<TDatum> td_coords_data;
1763  if (!is_null_geo) {
1764  std::vector<uint8_t> compressed_coords =
1765  Geospatial::compress_coords(coords, col_ti);
1766  for (auto const& cc : compressed_coords) {
1767  TDatum td_byte;
1768  td_byte.val.int_val = cc;
1769  td_coords_data.push_back(td_byte);
1770  }
1771  }
1772  TDatum tdd_coords;
1773  tdd_coords.val.arr_val = td_coords_data;
1774  tdd_coords.is_null = is_null_geo;
1775  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1776  }
1777  col_idx++;
1778 
1779  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1780  if (ring_sizes_column.size() != coords_row_count) {
1781  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1782  }
1783  // Create [linest[ring_sizes array value and add it to the physical column
1784  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1785  for (auto const& ring_sizes : ring_sizes_column) {
1786  bool is_null_geo = false;
1787  if (!col_ti.get_notnull()) {
1788  // Check for NULL geo
1789  is_null_geo = ring_sizes.empty();
1790  }
1791  std::vector<TDatum> td_ring_sizes;
1792  for (auto const& ring_size : ring_sizes) {
1793  TDatum td_ring_size;
1794  td_ring_size.val.int_val = ring_size;
1795  td_ring_sizes.push_back(td_ring_size);
1796  }
1797  TDatum tdd_ring_sizes;
1798  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1799  tdd_ring_sizes.is_null = is_null_geo;
1800  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1801  }
1802  col_idx++;
1803  }
1804 
1805  if (col_type == kMULTIPOLYGON) {
1806  if (poly_rings_column.size() != coords_row_count) {
1807  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1808  }
1809  // Create poly_rings array value and add it to the physical column
1810  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1811  for (auto const& poly_rings : poly_rings_column) {
1812  bool is_null_geo = false;
1813  if (!col_ti.get_notnull()) {
1814  // Check for NULL geo
1815  is_null_geo = poly_rings.empty();
1816  }
1817  std::vector<TDatum> td_poly_rings;
1818  for (auto const& num_rings : poly_rings) {
1819  TDatum td_num_rings;
1820  td_num_rings.val.int_val = num_rings;
1821  td_poly_rings.push_back(td_num_rings);
1822  }
1823  TDatum tdd_poly_rings;
1824  tdd_poly_rings.val.arr_val = td_poly_rings;
1825  tdd_poly_rings.is_null = is_null_geo;
1826  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1827  }
1828  col_idx++;
1829  }
1830 
1831  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1832  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1833  if (bounds_column.size() != coords_row_count) {
1834  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1835  }
1836  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1837  for (auto const& bounds : bounds_column) {
1838  bool is_null_geo = false;
1839  if (!col_ti.get_notnull()) {
1840  // Check for NULL geo
1841  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1842  }
1843  std::vector<TDatum> td_bounds_data;
1844  for (auto const& b : bounds) {
1845  TDatum td_double;
1846  td_double.val.real_val = b;
1847  td_bounds_data.push_back(td_double);
1848  }
1849  TDatum tdd_bounds;
1850  tdd_bounds.val.arr_val = td_bounds_data;
1851  tdd_bounds.is_null = is_null_geo;
1852  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1853  }
1854  col_idx++;
1855  }
1856 
1857  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1858  // Create render_group value and add it to the physical column
1859  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1860  for (auto const& render_group : render_groups_column) {
1861  TDatum td_render_group;
1862  td_render_group.val.int_val = render_group;
1863  td_render_group.is_null = false;
1864  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1865  }
1866  col_idx++;
1867  }
1868 }
1869 
1870 namespace {
1871 
1872 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1873  const std::list<const ColumnDescriptor*>& col_descs) {
1874  // validate the columns
1875  // for now we can only explode into a single destination column
1876  // which must be of the child type (POLYGON, LINESTRING, POINT)
1877  int collection_col_idx = -1;
1878  int col_idx = 0;
1879  std::string collection_col_name;
1880  SQLTypes collection_child_type = kNULLT;
1881  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1882  auto const& cd = *cd_it;
1883  auto const col_type = cd->columnType.get_type();
1884  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1885  if (collection_col_idx >= 0) {
1886  throw std::runtime_error(
1887  "Explode Collections: Found more than one destination column");
1888  }
1889  collection_col_idx = col_idx;
1890  collection_child_type = col_type;
1891  collection_col_name = cd->columnName;
1892  }
1893  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1894  ++cd_it;
1895  }
1896  col_idx++;
1897  }
1898  if (collection_col_idx < 0) {
1899  throw std::runtime_error(
1900  "Explode Collections: Failed to find a supported column type to explode "
1901  "into");
1902  }
1903  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1904 }
1905 
1907  OGRGeometry* ogr_geometry,
1908  SQLTypes collection_child_type,
1909  const std::string& collection_col_name,
1910  size_t row_or_feature_idx,
1911  std::function<void(OGRGeometry*)> execute_import_lambda) {
1912  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1913  bool is_collection = false;
1914  switch (collection_child_type) {
1915  case kPOINT:
1916  switch (ogr_geometry_type) {
1917  case wkbMultiPoint:
1918  is_collection = true;
1919  break;
1920  case wkbPoint:
1921  break;
1922  default:
1923  throw std::runtime_error(
1924  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1925  }
1926  break;
1927  case kLINESTRING:
1928  switch (ogr_geometry_type) {
1929  case wkbMultiLineString:
1930  is_collection = true;
1931  break;
1932  case wkbLineString:
1933  break;
1934  default:
1935  throw std::runtime_error(
1936  "Explode Collections: Source geo type must be MULTILINESTRING or "
1937  "LINESTRING");
1938  }
1939  break;
1940  case kPOLYGON:
1941  switch (ogr_geometry_type) {
1942  case wkbMultiPolygon:
1943  is_collection = true;
1944  break;
1945  case wkbPolygon:
1946  break;
1947  default:
1948  throw std::runtime_error(
1949  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1950  }
1951  break;
1952  default:
1953  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1954  }
1955 
1956  int64_t us = 0LL;
1957 
1958  // explode or just import
1959  if (is_collection) {
1960  // cast to collection
1961  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1962  CHECK(collection_geometry);
1963 
1964 #if LOG_EXPLODE_COLLECTIONS
1965  // log number of children
1966  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1967  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1968  << " child rows";
1969 #endif
1970 
1971  // loop over children
1972  uint32_t child_geometry_count = 0;
1973  auto child_geometry_it = collection_geometry->begin();
1974  while (child_geometry_it != collection_geometry->end()) {
1975  // get and import this child
1976  OGRGeometry* import_geometry = *child_geometry_it;
1978  [&] { execute_import_lambda(import_geometry); });
1979 
1980  // next child
1981  child_geometry_it++;
1982  child_geometry_count++;
1983  }
1984  } else {
1985  // import non-collection row just once
1987  [&] { execute_import_lambda(ogr_geometry); });
1988  }
1989 
1990  // done
1991  return us;
1992 }
1993 
1994 } // namespace
1995 
1997  int thread_id,
1998  Importer* importer,
1999  std::unique_ptr<char[]> scratch_buffer,
2000  size_t begin_pos,
2001  size_t end_pos,
2002  size_t total_size,
2003  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2004  size_t first_row_index_this_buffer,
2005  const Catalog_Namespace::SessionInfo* session_info,
2006  Executor* executor) {
2007  ImportStatus thread_import_status;
2008  int64_t total_get_row_time_us = 0;
2009  int64_t total_str_to_val_time_us = 0;
2010  auto query_session = session_info ? session_info->get_session_id() : "";
2011  CHECK(scratch_buffer);
2012  auto buffer = scratch_buffer.get();
2013  auto load_ms = measure<>::execution([]() {});
2014 
2015  thread_import_status.thread_id = thread_id;
2016 
2017  auto ms = measure<>::execution([&]() {
2018  const CopyParams& copy_params = importer->get_copy_params();
2019  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2020  size_t begin =
2021  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2022  const char* thread_buf = buffer + begin_pos + begin;
2023  const char* thread_buf_end = buffer + end_pos;
2024  const char* buf_end = buffer + total_size;
2025  bool try_single_thread = false;
2026  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2027  importer->get_import_buffers(thread_id);
2029  int phys_cols = 0;
2030  int point_cols = 0;
2031  for (const auto cd : col_descs) {
2032  const auto& col_ti = cd->columnType;
2033  phys_cols += col_ti.get_physical_cols();
2034  if (cd->columnType.get_type() == kPOINT) {
2035  point_cols++;
2036  }
2037  }
2038  auto num_cols = col_descs.size() - phys_cols;
2039  for (const auto& p : import_buffers) {
2040  p->clear();
2041  }
2042  std::vector<std::string_view> row;
2043  size_t row_index_plus_one = 0;
2044  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2045  row.clear();
2046  std::vector<std::unique_ptr<char[]>>
2047  tmp_buffers; // holds string w/ removed escape chars, etc
2048  if (DEBUG_TIMING) {
2051  thread_buf_end,
2052  buf_end,
2053  copy_params,
2054  importer->get_is_array(),
2055  row,
2056  tmp_buffers,
2057  try_single_thread,
2058  true);
2059  });
2060  total_get_row_time_us += us;
2061  } else {
2063  thread_buf_end,
2064  buf_end,
2065  copy_params,
2066  importer->get_is_array(),
2067  row,
2068  tmp_buffers,
2069  try_single_thread,
2070  true);
2071  }
2072  row_index_plus_one++;
2073  // Each POINT could consume two separate coords instead of a single WKT
2074  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2075  thread_import_status.rows_rejected++;
2076  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2077  << row.size() << "): " << shared::printContainer(row);
2078  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2079  break;
2080  }
2081  continue;
2082  }
2083 
2084  //
2085  // lambda for importing a row (perhaps multiple times if exploding a collection)
2086  //
2087 
2088  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2089  size_t import_idx = 0;
2090  size_t col_idx = 0;
2091  try {
2092  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2093  auto cd = *cd_it;
2094  const auto& col_ti = cd->columnType;
2095 
2096  bool is_null =
2097  ImportHelpers::is_null_datum(row[import_idx], copy_params.null_str);
2098  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2099  // So initially nullness may be missed and not passed to add_value,
2100  // which then might also check and still decide it's actually a NULL, e.g.
2101  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2102  // So "NULL" is not recognized as NULL but then it's not recognized as
2103  // a valid kINT, so it's a NULL after all.
2104  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2105 
2106  // Treating empty as NULL
2107  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2108  is_null = true;
2109  }
2110  if (!cd->columnType.is_string() && !copy_params.trim_spaces) {
2111  // everything but strings should be always trimmed
2112  row[import_idx] = sv_strip(row[import_idx]);
2113  }
2114 
2115  if (col_ti.get_physical_cols() == 0) {
2116  // not geo
2117 
2118  import_buffers[col_idx]->add_value(
2119  cd, row[import_idx], is_null, copy_params);
2120 
2121  // next
2122  ++import_idx;
2123  ++col_idx;
2124  } else {
2125  // geo
2126 
2127  // store null string in the base column
2128  import_buffers[col_idx]->add_value(
2129  cd, copy_params.null_str, true, copy_params);
2130 
2131  // WKT from string we're not storing
2132  auto const& geo_string = row[import_idx];
2133 
2134  // next
2135  ++import_idx;
2136  ++col_idx;
2137 
2138  SQLTypes col_type = col_ti.get_type();
2139  CHECK(IS_GEO(col_type));
2140 
2141  std::vector<double> coords;
2142  std::vector<double> bounds;
2143  std::vector<int> ring_sizes;
2144  std::vector<int> poly_rings;
2145  int render_group = 0;
2146 
2147  // if this is a POINT column, and the field is not null, and
2148  // looks like a scalar numeric value (and not a hex blob)
2149  // attempt to import two columns as lon/lat (or lat/lon)
2150  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
2151  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2152  geo_string[0] == '-') &&
2153  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2154  double lon = std::atof(std::string(geo_string).c_str());
2155  double lat = NAN;
2156  auto lat_str = row[import_idx];
2157  ++import_idx;
2158  if (lat_str.size() > 0 &&
2159  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2160  lat = std::atof(std::string(lat_str).c_str());
2161  }
2162  // Swap coordinates if this table uses a reverse order: lat/lon
2163  if (!copy_params.lonlat) {
2164  std::swap(lat, lon);
2165  }
2166  // TODO: should check if POINT column should have been declared with
2167  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2168  // throw std::runtime_error("POINT column " + cd->columnName + " is
2169  // not WGS84, cannot insert lon/lat");
2170  // }
2171  SQLTypeInfo import_ti{col_ti};
2172  if (copy_params.source_type ==
2174  import_ti.get_output_srid() == 4326) {
2175  auto srid0 = copy_params.source_srid;
2176  if (srid0 > 0) {
2177  // srid0 -> 4326 transform is requested on import
2178  import_ti.set_input_srid(srid0);
2179  }
2180  }
2181  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2182  throw std::runtime_error(
2183  "Cannot read lon/lat to insert into POINT column " +
2184  cd->columnName);
2185  }
2186  } else {
2187  // import it
2188  SQLTypeInfo import_ti{col_ti};
2189  if (copy_params.source_type ==
2191  import_ti.get_output_srid() == 4326) {
2192  auto srid0 = copy_params.source_srid;
2193  if (srid0 > 0) {
2194  // srid0 -> 4326 transform is requested on import
2195  import_ti.set_input_srid(srid0);
2196  }
2197  }
2198  if (is_null) {
2199  if (col_ti.get_notnull()) {
2200  throw std::runtime_error("NULL geo for column " + cd->columnName);
2201  }
2203  import_ti,
2204  coords,
2205  bounds,
2206  ring_sizes,
2207  poly_rings,
2209  } else {
2210  if (import_geometry) {
2211  // geometry already exploded
2213  import_geometry,
2214  import_ti,
2215  coords,
2216  bounds,
2217  ring_sizes,
2218  poly_rings,
2220  std::string msg =
2221  "Failed to extract valid geometry from exploded row " +
2222  std::to_string(first_row_index_this_buffer +
2223  row_index_plus_one) +
2224  " for column " + cd->columnName;
2225  throw std::runtime_error(msg);
2226  }
2227  } else {
2228  // extract geometry directly from WKT
2230  std::string(geo_string),
2231  import_ti,
2232  coords,
2233  bounds,
2234  ring_sizes,
2235  poly_rings,
2237  std::string msg = "Failed to extract valid geometry from row " +
2238  std::to_string(first_row_index_this_buffer +
2239  row_index_plus_one) +
2240  " for column " + cd->columnName;
2241  throw std::runtime_error(msg);
2242  }
2243  }
2244 
2245  // validate types
2246  if (col_type != import_ti.get_type()) {
2248  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2249  col_type == SQLTypes::kMULTIPOLYGON)) {
2250  throw std::runtime_error(
2251  "Imported geometry doesn't match the type of column " +
2252  cd->columnName);
2253  }
2254  }
2255  }
2256 
2257  // assign render group?
2258  if (columnIdToRenderGroupAnalyzerMap.size()) {
2259  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2260  if (ring_sizes.size()) {
2261  // get a suitable render group for these poly coords
2262  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2263  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2264  render_group =
2265  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2266  } else {
2267  // empty poly
2268  render_group = -1;
2269  }
2270  }
2271  }
2272  }
2273 
2274  // import extracted geo
2276  cd,
2277  import_buffers,
2278  col_idx,
2279  coords,
2280  bounds,
2281  ring_sizes,
2282  poly_rings,
2283  render_group);
2284 
2285  // skip remaining physical columns
2286  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2287  ++cd_it;
2288  }
2289  }
2290  }
2291  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2292  check_session_interrupted(query_session, executor))) {
2293  thread_import_status.load_failed = true;
2294  thread_import_status.load_msg =
2295  "Table load was cancelled via Query Interrupt";
2296  return;
2297  }
2298  thread_import_status.rows_completed++;
2299  } catch (const std::exception& e) {
2300  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2301  import_buffers[col_idx_to_pop]->pop_value();
2302  }
2303  thread_import_status.rows_rejected++;
2304  LOG(ERROR) << "Input exception thrown: " << e.what()
2305  << ". Row discarded. Data: " << shared::printContainer(row);
2306  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2307  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2308  thread_import_status.load_failed = true;
2309  thread_import_status.load_msg =
2310  "Load was cancelled due to max reject rows being reached";
2311  }
2312  }
2313  }; // End of lambda
2314 
2315  if (copy_params.geo_explode_collections) {
2316  // explode and import
2317  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2318  explode_collections_step1(col_descs);
2319  // pull out the collection WKT or WKB hex
2320  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2321  auto const& collection_geo_string = row[collection_col_idx];
2322  // convert to OGR
2323  OGRGeometry* ogr_geometry = nullptr;
2324  ScopeGuard destroy_ogr_geometry = [&] {
2325  if (ogr_geometry) {
2326  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2327  }
2328  };
2330  std::string(collection_geo_string));
2331  // do the explode and import
2332  us = explode_collections_step2(ogr_geometry,
2333  collection_child_type,
2334  collection_col_name,
2335  first_row_index_this_buffer + row_index_plus_one,
2336  execute_import_row);
2337  } else {
2338  // import non-collection row just once
2340  [&] { execute_import_row(nullptr); });
2341  }
2342 
2343  if (thread_import_status.load_failed) {
2344  break;
2345  }
2346  } // end thread
2347  total_str_to_val_time_us += us;
2348  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2349  load_ms = measure<>::execution([&]() {
2350  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2351  });
2352  }
2353  }); // end execution
2354 
2355  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2356  thread_import_status.rows_completed > 0) {
2357  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2358  << thread_import_status.rows_completed << " rows inserted in "
2359  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2360  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2361  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2362  << "sec" << std::endl;
2363  }
2364 
2365  return thread_import_status;
2366 }
2367 
2368 class ColumnNotGeoError : public std::runtime_error {
2369  public:
2370  ColumnNotGeoError(const std::string& column_name)
2371  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2372 };
2373 
2375  int thread_id,
2376  Importer* importer,
2377  OGRCoordinateTransformation* coordinate_transformation,
2378  const FeaturePtrVector& features,
2379  size_t firstFeature,
2380  size_t numFeatures,
2381  const FieldNameToIndexMapType& fieldNameToIndexMap,
2382  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2383  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2384  const Catalog_Namespace::SessionInfo* session_info,
2385  Executor* executor,
2386  const MetadataColumnInfos& metadata_column_infos) {
2387  ImportStatus thread_import_status;
2388  const CopyParams& copy_params = importer->get_copy_params();
2389  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2390  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2391  importer->get_import_buffers(thread_id);
2392  auto query_session = session_info ? session_info->get_session_id() : "";
2393  for (const auto& p : import_buffers) {
2394  p->clear();
2395  }
2396 
2397  auto convert_timer = timer_start();
2398 
2399  // for all the features in this chunk...
2400  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2401  // ignore null features
2402  if (!features[iFeature]) {
2403  continue;
2404  }
2405 
2406  // get this feature's geometry
2407  // for geodatabase, we need to consider features with no geometry
2408  // as we still want to create a table, even if it has no geo column
2409  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2410  if (pGeometry && coordinate_transformation) {
2411  pGeometry->transform(coordinate_transformation);
2412  }
2413 
2414  //
2415  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2416  //
2417 
2418  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2419  size_t col_idx = 0;
2420  try {
2421  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2422  check_session_interrupted(query_session, executor))) {
2423  thread_import_status.load_failed = true;
2424  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2426  }
2427 
2428  uint32_t field_column_count{0u};
2429  uint32_t metadata_column_count{0u};
2430 
2431  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2432  auto cd = *cd_it;
2433 
2434  // is this a geo column?
2435  const auto& col_ti = cd->columnType;
2436  if (col_ti.is_geometry()) {
2437  // Note that this assumes there is one and only one geo column in the
2438  // table. Currently, the importer only supports reading a single
2439  // geospatial feature from an input shapefile / geojson file, but this
2440  // code will need to be modified if that changes
2441  SQLTypes col_type = col_ti.get_type();
2442 
2443  // store null string in the base column
2444  import_buffers[col_idx]->add_value(
2445  cd, copy_params.null_str, true, copy_params);
2446  ++col_idx;
2447 
2448  // the data we now need to extract for the other columns
2449  std::vector<double> coords;
2450  std::vector<double> bounds;
2451  std::vector<int> ring_sizes;
2452  std::vector<int> poly_rings;
2453  int render_group = 0;
2454 
2455  // extract it
2456  SQLTypeInfo import_ti{col_ti};
2457  bool is_null_geo = !import_geometry;
2458  if (is_null_geo) {
2459  if (col_ti.get_notnull()) {
2460  throw std::runtime_error("NULL geo for column " + cd->columnName);
2461  }
2463  import_ti,
2464  coords,
2465  bounds,
2466  ring_sizes,
2467  poly_rings,
2469  } else {
2471  import_geometry,
2472  import_ti,
2473  coords,
2474  bounds,
2475  ring_sizes,
2476  poly_rings,
2478  std::string msg = "Failed to extract valid geometry from feature " +
2479  std::to_string(firstFeature + iFeature + 1) +
2480  " for column " + cd->columnName;
2481  throw std::runtime_error(msg);
2482  }
2483 
2484  // validate types
2485  if (col_type != import_ti.get_type()) {
2487  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2488  col_type == SQLTypes::kMULTIPOLYGON)) {
2489  throw std::runtime_error(
2490  "Imported geometry doesn't match the type of column " +
2491  cd->columnName);
2492  }
2493  }
2494  }
2495 
2496  if (columnIdToRenderGroupAnalyzerMap.size()) {
2497  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2498  if (ring_sizes.size()) {
2499  // get a suitable render group for these poly coords
2500  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2501  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2502  render_group =
2503  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2504  } else {
2505  // empty poly
2506  render_group = -1;
2507  }
2508  }
2509  }
2510 
2511  // create coords array value and add it to the physical column
2512  ++cd_it;
2513  auto cd_coords = *cd_it;
2514  std::vector<TDatum> td_coord_data;
2515  if (!is_null_geo) {
2516  std::vector<uint8_t> compressed_coords =
2517  Geospatial::compress_coords(coords, col_ti);
2518  for (auto cc : compressed_coords) {
2519  TDatum td_byte;
2520  td_byte.val.int_val = cc;
2521  td_coord_data.push_back(td_byte);
2522  }
2523  }
2524  TDatum tdd_coords;
2525  tdd_coords.val.arr_val = td_coord_data;
2526  tdd_coords.is_null = is_null_geo;
2527  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2528  ++col_idx;
2529 
2530  if (col_type == kMULTILINESTRING || col_type == kPOLYGON ||
2531  col_type == kMULTIPOLYGON) {
2532  // Create [linest]ring_sizes array value and add it to the physical column
2533  ++cd_it;
2534  auto cd_ring_sizes = *cd_it;
2535  std::vector<TDatum> td_ring_sizes;
2536  if (!is_null_geo) {
2537  for (auto ring_size : ring_sizes) {
2538  TDatum td_ring_size;
2539  td_ring_size.val.int_val = ring_size;
2540  td_ring_sizes.push_back(td_ring_size);
2541  }
2542  }
2543  TDatum tdd_ring_sizes;
2544  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2545  tdd_ring_sizes.is_null = is_null_geo;
2546  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2547  ++col_idx;
2548  }
2549 
2550  if (col_type == kMULTIPOLYGON) {
2551  // Create poly_rings array value and add it to the physical column
2552  ++cd_it;
2553  auto cd_poly_rings = *cd_it;
2554  std::vector<TDatum> td_poly_rings;
2555  if (!is_null_geo) {
2556  for (auto num_rings : poly_rings) {
2557  TDatum td_num_rings;
2558  td_num_rings.val.int_val = num_rings;
2559  td_poly_rings.push_back(td_num_rings);
2560  }
2561  }
2562  TDatum tdd_poly_rings;
2563  tdd_poly_rings.val.arr_val = td_poly_rings;
2564  tdd_poly_rings.is_null = is_null_geo;
2565  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2566  ++col_idx;
2567  }
2568 
2569  if (col_type == kLINESTRING || col_type == kMULTILINESTRING ||
2570  col_type == kPOLYGON || col_type == kMULTIPOLYGON ||
2571  col_type == kMULTIPOINT) {
2572  // Create bounds array value and add it to the physical column
2573  ++cd_it;
2574  auto cd_bounds = *cd_it;
2575  std::vector<TDatum> td_bounds_data;
2576  if (!is_null_geo) {
2577  for (auto b : bounds) {
2578  TDatum td_double;
2579  td_double.val.real_val = b;
2580  td_bounds_data.push_back(td_double);
2581  }
2582  }
2583  TDatum tdd_bounds;
2584  tdd_bounds.val.arr_val = td_bounds_data;
2585  tdd_bounds.is_null = is_null_geo;
2586  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2587  ++col_idx;
2588  }
2589 
2590  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2591  // Create render_group value and add it to the physical column
2592  ++cd_it;
2593  auto cd_render_group = *cd_it;
2594  TDatum td_render_group;
2595  td_render_group.val.int_val = render_group;
2596  td_render_group.is_null = is_null_geo;
2597  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2598  ++col_idx;
2599  }
2600  } else if (field_column_count < fieldNameToIndexMap.size()) {
2601  //
2602  // field column
2603  //
2604  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2605  CHECK(cit != columnNameToSourceNameMap.end());
2606  auto const& field_name = cit->second;
2607 
2608  auto const fit = fieldNameToIndexMap.find(field_name);
2609  if (fit == fieldNameToIndexMap.end()) {
2610  throw ColumnNotGeoError(cd->columnName);
2611  }
2612 
2613  auto const& field_index = fit->second;
2614  CHECK(field_index < fieldNameToIndexMap.size());
2615 
2616  auto const& feature = features[iFeature];
2617 
2618  auto field_defn = feature->GetFieldDefnRef(field_index);
2619  CHECK(field_defn);
2620 
2621  // OGRFeature::GetFieldAsString() can only return 80 characters
2622  // so for array columns, we are obliged to fetch the actual values
2623  // and construct the concatenated string ourselves
2624 
2625  std::string value_string;
2626  int array_index = 0, array_size = 0;
2627 
2628  auto stringify_numeric_list = [&](auto* values) {
2629  value_string = "{";
2630  while (array_index < array_size) {
2631  auto separator = (array_index > 0) ? "," : "";
2632  value_string += separator + std::to_string(values[array_index]);
2633  array_index++;
2634  }
2635  value_string += "}";
2636  };
2637 
2638  auto field_type = field_defn->GetType();
2639  switch (field_type) {
2640  case OFTInteger:
2641  case OFTInteger64:
2642  case OFTReal:
2643  case OFTString:
2644  case OFTBinary:
2645  case OFTDate:
2646  case OFTTime:
2647  case OFTDateTime: {
2648  value_string = feature->GetFieldAsString(field_index);
2649  } break;
2650  case OFTIntegerList: {
2651  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2652  stringify_numeric_list(values);
2653  } break;
2654  case OFTInteger64List: {
2655  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2656  stringify_numeric_list(values);
2657  } break;
2658  case OFTRealList: {
2659  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2660  stringify_numeric_list(values);
2661  } break;
2662  case OFTStringList: {
2663  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2664  value_string = "{";
2665  if (array_of_strings) {
2666  while (auto* this_string = array_of_strings[array_index]) {
2667  auto separator = (array_index > 0) ? "," : "";
2668  value_string += separator + std::string(this_string);
2669  array_index++;
2670  }
2671  }
2672  value_string += "}";
2673  } break;
2674  default:
2675  throw std::runtime_error("Unsupported geo file field type (" +
2676  std::to_string(static_cast<int>(field_type)) +
2677  ")");
2678  }
2679 
2680  import_buffers[col_idx]->add_value(cd, value_string, false, copy_params);
2681  ++col_idx;
2682  field_column_count++;
2683  } else if (metadata_column_count < metadata_column_infos.size()) {
2684  //
2685  // metadata column
2686  //
2687  auto const& mci = metadata_column_infos[metadata_column_count];
2688  if (mci.column_descriptor.columnName != cd->columnName) {
2689  throw std::runtime_error("Metadata column name mismatch");
2690  }
2691  import_buffers[col_idx]->add_value(cd, mci.value, false, copy_params);
2692  ++col_idx;
2693  metadata_column_count++;
2694  } else {
2695  throw std::runtime_error("Column count mismatch");
2696  }
2697  }
2698  thread_import_status.rows_completed++;
2699  } catch (QueryExecutionError& e) {
2701  throw e;
2702  }
2703  } catch (ColumnNotGeoError& e) {
2704  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2705  throw std::runtime_error(e.what());
2706  } catch (const std::exception& e) {
2707  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2708  import_buffers[col_idx_to_pop]->pop_value();
2709  }
2710  thread_import_status.rows_rejected++;
2711  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2712  }
2713  };
2714 
2715  if (pGeometry && copy_params.geo_explode_collections) {
2716  // explode and import
2717  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2718  explode_collections_step1(col_descs);
2719  explode_collections_step2(pGeometry,
2720  collection_child_type,
2721  collection_col_name,
2722  firstFeature + iFeature + 1,
2723  execute_import_feature);
2724  } else {
2725  // import non-collection or null feature just once
2726  execute_import_feature(pGeometry);
2727  }
2728  } // end features
2729 
2730  float convert_s = TIMER_STOP(convert_timer);
2731 
2732  float load_s = 0.0f;
2733  if (thread_import_status.rows_completed > 0) {
2734  auto load_timer = timer_start();
2735  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2736  load_s = TIMER_STOP(load_timer);
2737  }
2738 
2739  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2740  LOG(INFO) << "DEBUG: Process " << convert_s << "s";
2741  LOG(INFO) << "DEBUG: Load " << load_s << "s";
2742  LOG(INFO) << "DEBUG: Total " << (convert_s + load_s) << "s";
2743  }
2744 
2745  thread_import_status.thread_id = thread_id;
2746 
2747  return thread_import_status;
2748 }
2749 
2751  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2752  size_t row_count,
2753  const Catalog_Namespace::SessionInfo* session_info) {
2754  return loadImpl(import_buffers, row_count, false, session_info);
2755 }
2756 
2757 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2758  size_t row_count,
2759  const Catalog_Namespace::SessionInfo* session_info) {
2760  return loadImpl(import_buffers, row_count, true, session_info);
2761 }
2762 
2763 namespace {
2764 
2765 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2766  const auto& ti = import_buffer.getTypeInfo();
2767  const int8_t* values_buffer{nullptr};
2768  if (ti.is_string()) {
2769  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2770  values_buffer = import_buffer.getStringDictBuffer();
2771  } else {
2772  values_buffer = import_buffer.getAsBytes();
2773  }
2774  CHECK(values_buffer);
2775  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2776  switch (logical_size) {
2777  case 1: {
2778  return values_buffer[index];
2779  }
2780  case 2: {
2781  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2782  }
2783  case 4: {
2784  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2785  }
2786  case 8: {
2787  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2788  }
2789  default:
2790  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2791  }
2792  UNREACHABLE();
2793  return 0;
2794 }
2795 
2796 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2797  const auto& ti = import_buffer.getTypeInfo();
2798  CHECK_EQ(kFLOAT, ti.get_type());
2799  const auto values_buffer = import_buffer.getAsBytes();
2800  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2801 }
2802 
2803 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2804  const auto& ti = import_buffer.getTypeInfo();
2805  CHECK_EQ(kDOUBLE, ti.get_type());
2806  const auto values_buffer = import_buffer.getAsBytes();
2807  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2808 }
2809 
2810 } // namespace
2811 
2812 void Loader::fillShardRow(const size_t row_index,
2813  OneShardBuffers& shard_output_buffers,
2814  const OneShardBuffers& import_buffers) {
2815  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2816  const auto& input_buffer = import_buffers[col_idx];
2817  const auto& col_ti = input_buffer->getTypeInfo();
2818  const auto type =
2819  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2820 
2821  switch (type) {
2822  case kBOOLEAN:
2823  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2824  break;
2825  case kTINYINT:
2826  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2827  break;
2828  case kSMALLINT:
2829  shard_output_buffers[col_idx]->addSmallint(
2830  int_value_at(*input_buffer, row_index));
2831  break;
2832  case kINT:
2833  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2834  break;
2835  case kBIGINT:
2836  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2837  break;
2838  case kFLOAT:
2839  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2840  break;
2841  case kDOUBLE:
2842  shard_output_buffers[col_idx]->addDouble(
2843  double_value_at(*input_buffer, row_index));
2844  break;
2845  case kTEXT:
2846  case kVARCHAR:
2847  case kCHAR: {
2848  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2849  shard_output_buffers[col_idx]->addString(
2850  (*input_buffer->getStringBuffer())[row_index]);
2851  break;
2852  }
2853  case kTIME:
2854  case kTIMESTAMP:
2855  case kDATE:
2856  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2857  break;
2858  case kARRAY:
2859  if (IS_STRING(col_ti.get_subtype())) {
2860  CHECK(input_buffer->getStringArrayBuffer());
2861  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2862  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2863  shard_output_buffers[col_idx]->addStringArray(input_arr);
2864  } else {
2865  shard_output_buffers[col_idx]->addArray(
2866  (*input_buffer->getArrayBuffer())[row_index]);
2867  }
2868  break;
2869  case kPOINT:
2870  case kMULTIPOINT:
2871  case kLINESTRING:
2872  case kMULTILINESTRING:
2873  case kPOLYGON:
2874  case kMULTIPOLYGON: {
2875  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2876  shard_output_buffers[col_idx]->addGeoString(
2877  (*input_buffer->getGeoStringBuffer())[row_index]);
2878  break;
2879  }
2880  default:
2881  CHECK(false);
2882  }
2883  }
2884 }
2885 
2887  std::vector<OneShardBuffers>& all_shard_import_buffers,
2888  std::vector<size_t>& all_shard_row_counts,
2889  const OneShardBuffers& import_buffers,
2890  const size_t row_count,
2891  const size_t shard_count,
2892  const Catalog_Namespace::SessionInfo* session_info) {
2893  int col_idx{0};
2894  const ColumnDescriptor* shard_col_desc{nullptr};
2895  for (const auto col_desc : column_descs_) {
2896  ++col_idx;
2897  if (col_idx == table_desc_->shardedColumnId) {
2898  shard_col_desc = col_desc;
2899  break;
2900  }
2901  }
2902  CHECK(shard_col_desc);
2903  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2904  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2905  const auto& shard_col_ti = shard_col_desc->columnType;
2906  CHECK(shard_col_ti.is_integer() ||
2907  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2908  shard_col_ti.is_time());
2909  if (shard_col_ti.is_string()) {
2910  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2911  CHECK(payloads_ptr);
2912  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2913  }
2914 
2915  for (size_t i = 0; i < row_count; ++i) {
2916  const size_t shard =
2917  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2918  auto& shard_output_buffers = all_shard_import_buffers[shard];
2919  fillShardRow(i, shard_output_buffers, import_buffers);
2920  ++all_shard_row_counts[shard];
2921  }
2922 }
2923 
2925  std::vector<OneShardBuffers>& all_shard_import_buffers,
2926  std::vector<size_t>& all_shard_row_counts,
2927  const OneShardBuffers& import_buffers,
2928  const size_t row_count,
2929  const size_t shard_count,
2930  const Catalog_Namespace::SessionInfo* session_info) {
2931  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2932  CHECK(shard_tds.size() == shard_count);
2933 
2934  for (size_t shard = 0; shard < shard_count; ++shard) {
2935  auto& shard_output_buffers = all_shard_import_buffers[shard];
2936  if (row_count != 0) {
2937  fillShardRow(0, shard_output_buffers, import_buffers);
2938  }
2939  // when replicating a column, row count of a shard == replicate count of the column
2940  // on the shard
2941  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2942  }
2943 }
2944 
2945 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2946  std::vector<size_t>& all_shard_row_counts,
2947  const OneShardBuffers& import_buffers,
2948  const size_t row_count,
2949  const size_t shard_count,
2950  const Catalog_Namespace::SessionInfo* session_info) {
2951  all_shard_row_counts.resize(shard_count);
2952  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2953  all_shard_import_buffers.emplace_back();
2954  for (const auto& typed_import_buffer : import_buffers) {
2955  all_shard_import_buffers.back().emplace_back(
2956  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2957  typed_import_buffer->getStringDictionary()));
2958  }
2959  }
2961  if (isAddingColumns()) {
2962  distributeToShardsNewColumns(all_shard_import_buffers,
2963  all_shard_row_counts,
2964  import_buffers,
2965  row_count,
2966  shard_count,
2967  session_info);
2968  } else {
2969  distributeToShardsExistingColumns(all_shard_import_buffers,
2970  all_shard_row_counts,
2971  import_buffers,
2972  row_count,
2973  shard_count,
2974  session_info);
2975  }
2976 }
2977 
2979  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2980  size_t row_count,
2981  bool checkpoint,
2982  const Catalog_Namespace::SessionInfo* session_info) {
2983  if (load_callback_) {
2984  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2985  return load_callback_(import_buffers, data_blocks, row_count);
2986  }
2987  if (table_desc_->nShards) {
2988  std::vector<OneShardBuffers> all_shard_import_buffers;
2989  std::vector<size_t> all_shard_row_counts;
2990  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2991  distributeToShards(all_shard_import_buffers,
2992  all_shard_row_counts,
2993  import_buffers,
2994  row_count,
2995  shard_tables.size(),
2996  session_info);
2997  bool success = true;
2998  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2999  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3000  all_shard_row_counts[shard_idx],
3001  shard_tables[shard_idx],
3002  checkpoint,
3003  session_info);
3004  }
3005  return success;
3006  }
3007  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3008 }
3009 
3011  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3012  std::vector<DataBlockPtr> result(import_buffers.size());
3013  std::vector<std::pair<const size_t, std::future<int8_t*>>>
3014  encoded_data_block_ptrs_futures;
3015  // make all async calls to string dictionary here and then continue execution
3016  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3017  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
3018  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
3019  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3020  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
3021 
3022  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3023  buf_idx,
3024  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
3025  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3026  return import_buffers[buf_idx]->getStringDictBuffer();
3027  })));
3028  }
3029  }
3030 
3031  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3032  DataBlockPtr p;
3033  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
3034  import_buffers[buf_idx]->getTypeInfo().is_time() ||
3035  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
3036  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
3037  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
3038  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3039  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
3040  p.stringsPtr = string_payload_ptr;
3041  } else {
3042  // This condition means we have column which is ENCODED string. We already made
3043  // Async request to gain the encoded integer values above so we should skip this
3044  // iteration and continue.
3045  continue;
3046  }
3047  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
3048  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
3049  p.stringsPtr = geo_payload_ptr;
3050  } else {
3051  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
3052  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
3053  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
3054  import_buffers[buf_idx]->addDictEncodedStringArray(
3055  *import_buffers[buf_idx]->getStringArrayBuffer());
3056  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
3057  } else {
3058  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
3059  }
3060  }
3061  result[buf_idx] = p;
3062  }
3063 
3064  // wait for the async requests we made for string dictionary
3065  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3066  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3067  }
3068  return result;
3069 }
3070 
3072  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3073  size_t row_count,
3074  const TableDescriptor* shard_table,
3075  bool checkpoint,
3076  const Catalog_Namespace::SessionInfo* session_info) {
3077  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3079  ins_data.numRows = row_count;
3080  bool success = false;
3081  try {
3082  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3083  } catch (std::exception& e) {
3084  std::ostringstream oss;
3085  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3086  << e.what();
3087 
3088  LOG(ERROR) << oss.str();
3089  error_msg_ = oss.str();
3090  return success;
3091  }
3092  if (isAddingColumns()) {
3093  // when Adding columns we omit any columns except the ones being added
3094  ins_data.columnIds.clear();
3095  ins_data.is_default.clear();
3096  for (auto& buffer : import_buffers) {
3097  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3098  ins_data.is_default.push_back(true);
3099  }
3100  } else {
3101  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3102  }
3103  // release loader_lock so that in InsertOrderFragmenter::insertDat
3104  // we can have multiple threads sort/shuffle InsertData
3105  loader_lock.unlock();
3106  success = true;
3107  {
3108  try {
3109  if (checkpoint) {
3110  shard_table->fragmenter->insertData(ins_data);
3111  } else {
3112  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3113  }
3114  } catch (std::exception& e) {
3115  std::ostringstream oss;
3116  oss << "Fragmenter Insert Exception when processing Table "
3117  << shard_table->tableName << " issue was " << e.what();
3118 
3119  LOG(ERROR) << oss.str();
3120  loader_lock.lock();
3121  error_msg_ = oss.str();
3122  success = false;
3123  }
3124  }
3125  return success;
3126 }
3127 
3128 void Loader::dropColumns(const std::vector<int>& columnIds) {
3129  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3130  if (table_desc_->nShards) {
3132  }
3133  for (auto table_desc : table_descs) {
3134  table_desc->fragmenter->dropColumns(columnIds);
3135  }
3136 }
3137 
3141  for (auto cd : column_descs_) {
3142  insert_data_.columnIds.push_back(cd->columnId);
3143  if (cd->columnType.get_compression() == kENCODING_DICT) {
3144  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3145  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3146  CHECK(dd);
3147  dict_map_[cd->columnId] = dd->stringDict.get();
3148  }
3149  }
3150  insert_data_.numRows = 0;
3151 }
3152 
3155  split_raw_data();
3157 }
3158 
3160  const std::string& file_path,
3161  const bool decompressed,
3162  const Catalog_Namespace::SessionInfo* session_info) {
3163  // we do not check interrupt status for this detection
3164  if (!p_file) {
3165  p_file = fopen(file_path.c_str(), "rb");
3166  }
3167  if (!p_file) {
3168  throw std::runtime_error("failed to open file '" + file_path +
3169  "': " + strerror(errno));
3170  }
3171 
3172  // somehow clang does not support ext/stdio_filebuf.h, so
3173  // need to diy readline with customized copy_params.line_delim...
3174  std::string line;
3175  line.reserve(1 * 1024 * 1024);
3176  auto end_time = std::chrono::steady_clock::now() +
3177  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3178  try {
3179  while (!feof(p_file)) {
3180  int c;
3181  size_t n = 0;
3182  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3183  if (n++ >= line.capacity()) {
3184  break;
3185  }
3186  line += c;
3187  }
3188  if (0 == n) {
3189  break;
3190  }
3191  // remember the first line, which is possibly a header line, to
3192  // ignore identical header line(s) in 2nd+ files of a archive;
3193  // otherwise, 2nd+ header may be mistaken as an all-string row
3194  // and so be final column types.
3195  if (line1.empty()) {
3196  line1 = line;
3197  } else if (line == line1) {
3198  line.clear();
3199  continue;
3200  }
3201 
3202  raw_data += line;
3204  line.clear();
3206  if (std::chrono::steady_clock::now() > end_time) {
3208  // stop import when row limit reached
3209  break;
3210  }
3211  }
3212  }
3213  } catch (std::exception& e) {
3214  }
3215 
3217  import_status_.load_failed = true;
3218 
3219  fclose(p_file);
3220  p_file = nullptr;
3221  return import_status_;
3222 }
3223 
3225  // this becomes analogous to Importer::import()
3226  (void)DataStreamSink::archivePlumber(nullptr);
3227 }
3228 
3230  if (copy_params.delimiter == '\0') {
3231  copy_params.delimiter = ',';
3232  if (boost::filesystem::extension(file_path) == ".tsv") {
3233  copy_params.delimiter = '\t';
3234  }
3235  }
3236 }
3237 
3239  const char* buf = raw_data.c_str();
3240  const char* buf_end = buf + raw_data.size();
3241  bool try_single_thread = false;
3242  for (const char* p = buf; p < buf_end; p++) {
3243  std::vector<std::string> row;
3244  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3246  buf_end,
3247  buf_end,
3248  copy_params,
3249  nullptr,
3250  row,
3251  tmp_buffers,
3252  try_single_thread,
3253  true);
3254  raw_rows.push_back(row);
3255  if (try_single_thread) {
3256  break;
3257  }
3258  }
3259  if (try_single_thread) {
3260  copy_params.threads = 1;
3261  raw_rows.clear();
3262  for (const char* p = buf; p < buf_end; p++) {
3263  std::vector<std::string> row;
3264  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3266  buf_end,
3267  buf_end,
3268  copy_params,
3269  nullptr,
3270  row,
3271  tmp_buffers,
3272  try_single_thread,
3273  true);
3274  raw_rows.push_back(row);
3275  }
3276  }
3277 }
3278 
3279 template <class T>
3280 bool try_cast(const std::string& str) {
3281  try {
3282  boost::lexical_cast<T>(str);
3283  } catch (const boost::bad_lexical_cast& e) {
3284  return false;
3285  }
3286  return true;
3287 }
3288 
3289 SQLTypes Detector::detect_sqltype(const std::string& str) {
3290  SQLTypes type = kTEXT;
3291  if (try_cast<double>(str)) {
3292  type = kDOUBLE;
3293  /*if (try_cast<bool>(str)) {
3294  type = kBOOLEAN;
3295  }*/
3296  if (try_cast<int16_t>(str)) {
3297  type = kSMALLINT;
3298  } else if (try_cast<int32_t>(str)) {
3299  type = kINT;
3300  } else if (try_cast<int64_t>(str)) {
3301  type = kBIGINT;
3302  } else if (try_cast<float>(str)) {
3303  type = kFLOAT;
3304  }
3305  }
3306 
3307  // check for geo types
3308  if (type == kTEXT) {
3309  // convert to upper case
3310  std::string str_upper_case = str;
3312  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3313 
3314  // then test for leading words
3315  if (str_upper_case.find("POINT") == 0) {
3316  type = kPOINT;
3317  } else if (str_upper_case.find("MULTIPOINT") == 0) {
3318  type = kMULTIPOINT;
3319  } else if (str_upper_case.find("LINESTRING") == 0) {
3320  type = kLINESTRING;
3321  } else if (str_upper_case.find("MULTILINESTRING") == 0) {
3322  type = kMULTILINESTRING;
3323  } else if (str_upper_case.find("POLYGON") == 0) {
3325  type = kMULTIPOLYGON;
3326  } else {
3327  type = kPOLYGON;
3328  }
3329  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3330  type = kMULTIPOLYGON;
3331  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3332  std::string::npos &&
3333  (str_upper_case.size() % 2) == 0) {
3334  // simple hex blob (two characters per byte, not uu-encode or base64)
3335  if (str_upper_case.size() >= 10) {
3336  // match WKB blobs for supported geometry types
3337  // the first byte specifies if the data is big-endian or little-endian
3338  // the next four bytes are the geometry type (1 = POINT etc.)
3339  // @TODO support eWKB, which has extra bits set in the geometry type
3340  auto first_five_bytes = str_upper_case.substr(0, 10);
3341  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3342  type = kPOINT;
3343  } else if (first_five_bytes == "0000000004" || first_five_bytes == "0104000000") {
3344  type = kMULTIPOINT;
3345  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3346  type = kLINESTRING;
3347  } else if (first_five_bytes == "0000000005" || first_five_bytes == "0105000000") {
3348  type = kMULTILINESTRING;
3349  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3350  type = kPOLYGON;
3351  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3352  type = kMULTIPOLYGON;
3353  } else {
3354  // unsupported WKB type
3355  return type;
3356  }
3357  } else {
3358  // too short to be WKB
3359  return type;
3360  }
3361  }
3362  }
3363 
3364  // check for time types
3365  if (type == kTEXT) {
3366  // This won't match unix timestamp, since floats and ints were checked above.
3367  if (dateTimeParseOptional<kTIME>(str, 0)) {
3368  type = kTIME;
3369  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3370  type = kTIMESTAMP;
3371  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3372  type = kDATE;
3373  }
3374  }
3375 
3376  return type;
3377 }
3378 
3379 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3380  std::vector<SQLTypes> types(row.size());
3381  for (size_t i = 0; i < row.size(); i++) {
3382  types[i] = detect_sqltype(row[i]);
3383  }
3384  return types;
3385 }
3386 
3388  static std::array<int, kSQLTYPE_LAST> typeorder;
3389  typeorder[kCHAR] = 0;
3390  typeorder[kBOOLEAN] = 2;
3391  typeorder[kSMALLINT] = 3;
3392  typeorder[kINT] = 4;
3393  typeorder[kBIGINT] = 5;
3394  typeorder[kFLOAT] = 6;
3395  typeorder[kDOUBLE] = 7;
3396  typeorder[kTIMESTAMP] = 8;
3397  typeorder[kTIME] = 9;
3398  typeorder[kDATE] = 10;
3399  typeorder[kPOINT] = 11;
3400  typeorder[kMULTIPOINT] = 11;
3401  typeorder[kLINESTRING] = 11;
3402  typeorder[kMULTILINESTRING] = 11;
3403  typeorder[kPOLYGON] = 11;
3404  typeorder[kMULTIPOLYGON] = 11;
3405  typeorder[kTEXT] = 12;
3406 
3407  // note: b < a instead of a < b because the map is ordered most to least restrictive
3408  return typeorder[b] < typeorder[a];
3409 }
3410 
3413  best_encodings =
3414  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3415  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3416  switch (copy_params.has_header) {
3418  has_headers = detect_headers(head_types, best_sqltypes);
3419  if (has_headers) {
3421  } else {
3423  }
3424  break;
3426  has_headers = false;
3427  break;
3429  has_headers = true;
3430  break;
3431  }
3432 }
3433 
3436 }
3437 
3438 std::vector<SQLTypes> Detector::find_best_sqltypes(
3439  const std::vector<std::vector<std::string>>& raw_rows,
3440  const CopyParams& copy_params) {
3441  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3442 }
3443 
3444 std::vector<SQLTypes> Detector::find_best_sqltypes(
3445  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3446  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3447  const CopyParams& copy_params) {
3448  if (raw_rows.size() < 1) {
3449  throw std::runtime_error("No rows found in: " +
3450  boost::filesystem::basename(file_path));
3451  }
3452  auto end_time = std::chrono::steady_clock::now() + timeout;
3453  size_t num_cols = raw_rows.front().size();
3454  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3455  std::vector<size_t> non_null_col_counts(num_cols, 0);
3456  for (auto row = row_begin; row != row_end; row++) {
3457  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3458  best_types.push_back(kCHAR);
3459  non_null_col_counts.push_back(0);
3460  }
3461  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3462  // do not count nulls
3463  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3464  continue;
3465  }
3466  SQLTypes t = detect_sqltype(row->at(col_idx));
3467  non_null_col_counts[col_idx]++;
3468  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3469  best_types[col_idx] = t;
3470  }
3471  }
3472  if (std::chrono::steady_clock::now() > end_time) {
3473  break;
3474  }
3475  }
3476  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3477  // if we don't have any non-null values for this column make it text to be
3478  // safe b/c that is least restrictive type
3479  if (non_null_col_counts[col_idx] == 0) {
3480  best_types[col_idx] = kTEXT;
3481  }
3482  }
3483 
3484  return best_types;
3485 }
3486 
3487 std::vector<EncodingType> Detector::find_best_encodings(
3488  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3489  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3490  const std::vector<SQLTypes>& best_types) {
3491  if (raw_rows.size() < 1) {
3492  throw std::runtime_error("No rows found in: " +
3493  boost::filesystem::basename(file_path));
3494  }
3495  size_t num_cols = best_types.size();
3496  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3497  std::vector<size_t> num_rows_per_col(num_cols, 1);
3498  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3499  for (auto row = row_begin; row != row_end; row++) {
3500  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3501  if (IS_STRING(best_types[col_idx])) {
3502  count_set[col_idx].insert(row->at(col_idx));
3503  num_rows_per_col[col_idx]++;
3504  }
3505  }
3506  }
3507  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3508  if (IS_STRING(best_types[col_idx])) {
3509  float uniqueRatio =
3510  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3511  if (uniqueRatio < 0.75) {
3512  best_encodes[col_idx] = kENCODING_DICT;
3513  }
3514  }
3515  }
3516  return best_encodes;
3517 }
3518 
3519 // detect_headers returns true if:
3520 // - all elements of the first argument are kTEXT
3521 // - there is at least one instance where tail_types is more restrictive than head_types
3522 // (ie, not kTEXT)
3523 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3524  const std::vector<SQLTypes>& tail_types) {
3525  if (head_types.size() != tail_types.size()) {
3526  return false;
3527  }
3528  bool has_headers = false;
3529  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3530  if (head_types[col_idx] != kTEXT) {
3531  return false;
3532  }
3533  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3534  }
3535  return has_headers;
3536 }
3537 
3538 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3539 #if defined(ENABLE_IMPORT_PARQUET)
3540  if (data_preview_.has_value()) {
3541  return data_preview_.value().sample_rows;
3542  } else
3543 #endif
3544  {
3545  n = std::min(n, raw_rows.size());
3546  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3547  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3548  raw_rows.begin() + n);
3549  return sample_rows;
3550  }
3551 }
3552 
3553 std::vector<std::string> Detector::get_headers() {
3554 #if defined(ENABLE_IMPORT_PARQUET)
3555  if (data_preview_.has_value()) {
3556  return data_preview_.value().column_names;
3557  } else
3558 #endif
3559  {
3560  std::vector<std::string> headers(best_sqltypes.size());
3561  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3562  if (has_headers && i < raw_rows[0].size()) {
3563  headers[i] = raw_rows[0][i];
3564  } else {
3565  headers[i] = "column_" + std::to_string(i + 1);
3566  }
3567  }
3568  return headers;
3569  }
3570 }
3571 
3572 std::vector<SQLTypeInfo> Detector::getBestColumnTypes() const {
3573 #if defined(ENABLE_IMPORT_PARQUET)
3574  if (data_preview_.has_value()) {
3575  return data_preview_.value().column_types;
3576  } else
3577 #endif
3578  {
3579  std::vector<SQLTypeInfo> types;
3580  CHECK_EQ(best_sqltypes.size(), best_encodings.size());
3581  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3582  types.emplace_back(best_sqltypes[i], false, best_encodings[i]);
3583  }
3584  return types;
3585  }
3586 }
3587 
3588 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3589  size_t row_count,
3590  const Catalog_Namespace::SessionInfo* session_info) {
3591  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3593  import_status_.load_failed = true;
3594  import_status_.load_msg = loader->getErrorMessage();
3595  }
3596 }
3597 
3599  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3600  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3603  // rollback to starting epoch - undo all the added records
3604  loader->setTableEpochs(table_epochs);
3605  } else {
3606  loader->checkpoint();
3607  }
3608  }
3609 
3610  if (loader->getTableDesc()->persistenceLevel ==
3611  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3612  // tables
3613  auto ms = measure<>::execution([&]() {
3615  if (!import_status_.load_failed) {
3616  for (auto& p : import_buffers_vec[0]) {
3617  if (!p->stringDictCheckpoint()) {
3618  LOG(ERROR) << "Checkpointing Dictionary for Column "
3619  << p->getColumnDesc()->columnName << " failed.";
3620  import_status_.load_failed = true;
3621  import_status_.load_msg = "Dictionary checkpoint failed";
3622  break;
3623  }
3624  }
3625  }
3626  });
3627  if (DEBUG_TIMING) {
3628  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3629  << std::endl;
3630  }
3631  }
3632 }
3633 
3635  const Catalog_Namespace::SessionInfo* session_info) {
3636  // in generalized importing scheme, reaching here file_path may
3637  // contain a file path, a url or a wildcard of file paths.
3638  // see CopyTableStmt::execute.
3639 
3640  std::vector<std::string> file_paths;
3641  try {
3646  file_paths = shared::local_glob_filter_sort_files(file_path, options);
3647  } catch (const shared::FileNotFoundException& e) {
3648  // After finding no matching files locally, file_path may still be an s3 url
3649  file_paths.push_back(file_path);
3650  }
3651 
3652  // sum up sizes of all local files -- only for local files. if
3653  // file_path is a s3 url, sizes will be obtained via S3Archive.
3654  for (const auto& file_path : file_paths) {
3656  }
3657 
3658  // s3 parquet goes different route because the files do not use libarchive
3659  // but parquet api, and they need to landed like .7z files.
3660  //
3661  // note: parquet must be explicitly specified by a WITH parameter
3662  // "source_type='parquet_file'", because for example spark sql users may specify a
3663  // output url w/o file extension like this:
3664  // df.write
3665  // .mode("overwrite")
3666  // .parquet("s3://bucket/folder/parquet/mydata")
3667  // without the parameter, it means plain or compressed csv files.
3668  // note: .ORC and AVRO files should follow a similar path to Parquet?
3670 #ifdef ENABLE_IMPORT_PARQUET
3671  import_parquet(file_paths, session_info);
3672 #else
3673  throw std::runtime_error("Parquet not supported!");
3674 #endif
3675  } else {
3676  import_compressed(file_paths, session_info);
3677  }
3678 
3679  return import_status_;
3680 }
3681 
3682 namespace {
3683 #ifdef ENABLE_IMPORT_PARQUET
3684 
3685 #ifdef HAVE_AWS_S3
3686 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3687 create_parquet_s3_detect_filesystem_config(const foreign_storage::ForeignServer* server,
3688  const CopyParams& copy_params) {
3689  foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3690  config;
3691 
3692  if (!copy_params.s3_access_key.empty()) {
3693  config.s3_access_key = copy_params.s3_access_key;
3694  }
3695  if (!copy_params.s3_secret_key.empty()) {
3696  config.s3_secret_key = copy_params.s3_secret_key;
3697  }
3698  if (!copy_params.s3_session_token.empty()) {
3699  config.s3_session_token = copy_params.s3_session_token;
3700  }
3701 
3702  return config;
3703 }
3704 #endif
3705 
3706 foreign_storage::DataPreview get_parquet_data_preview(const std::string& file_name,
3707  const CopyParams& copy_params) {
3708  TableDescriptor td;
3709  td.tableName = "parquet-detect-table";
3710  td.tableId = -1;
3712  auto [foreign_server, user_mapping, foreign_table] =
3713  foreign_storage::create_proxy_fsi_objects(file_name, copy_params, &td);
3714 
3715  std::shared_ptr<arrow::fs::FileSystem> file_system;
3716  auto& server_options = foreign_server->options;
3717  if (server_options
3719  ->second ==
3721  file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3722 #ifdef HAVE_AWS_S3
3723  } else if (server_options
3725  ->second ==
3728  create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3729 #endif
3730  } else {
3731  UNREACHABLE();
3732  }
3733 
3734  auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3735  foreign_table.get(), file_system);
3736  return parquet_data_wrapper->getDataPreview(g_detect_test_sample_size.has_value()
3737  ? g_detect_test_sample_size.value()
3739 }
3740 #endif
3741 
3742 } // namespace
3743 
3744 Detector::Detector(const boost::filesystem::path& fp, CopyParams& cp)
3745  : DataStreamSink(cp, fp.string()), file_path(fp) {
3746 #ifdef ENABLE_IMPORT_PARQUET
3748  !g_enable_legacy_parquet_import) {
3749  data_preview_ = get_parquet_data_preview(fp.string(), cp);
3750  } else
3751 #endif
3752  {
3753  read_file();
3754  init();
3755  }
3756 }
3757 
3758 #ifdef ENABLE_IMPORT_PARQUET
3759 inline auto open_parquet_table(const std::string& file_path,
3760  std::shared_ptr<arrow::io::ReadableFile>& infile,
3761  std::unique_ptr<parquet::arrow::FileReader>& reader,
3762  std::shared_ptr<arrow::Table>& table) {
3763  using namespace parquet::arrow;
3764  auto file_result = arrow::io::ReadableFile::Open(file_path);
3765  PARQUET_THROW_NOT_OK(file_result.status());
3766  infile = file_result.ValueOrDie();
3767 
3768  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3769  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3770  const auto num_row_groups = reader->num_row_groups();
3771  const auto num_columns = table->num_columns();
3772  const auto num_rows = table->num_rows();
3773  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3774  << " columns in " << num_row_groups << " groups.";
3775  return std::make_tuple(num_row_groups, num_columns, num_rows);
3776 } // namespace import_export
3777 
3778 void Detector::import_local_parquet(const std::string& file_path,
3779  const Catalog_Namespace::SessionInfo* session_info) {
3780  /*Skip interrupt checking in detector*/
3781  std::shared_ptr<arrow::io::ReadableFile> infile;
3782  std::unique_ptr<parquet::arrow::FileReader> reader;
3783  std::shared_ptr<arrow::Table> table;
3784  int num_row_groups, num_columns;
3785  int64_t num_rows;
3786  std::tie(num_row_groups, num_columns, num_rows) =
3787  open_parquet_table(file_path, infile, reader, table);
3788  // make up header line if not yet
3789  if (0 == raw_data.size()) {
3790  copy_params.has_header = ImportHeaderRow::kHasHeader;
3791  copy_params.line_delim = '\n';
3792  copy_params.delimiter = ',';
3793  // must quote values to skip any embedded delimiter
3794  copy_params.quoted = true;
3795  copy_params.quote = '"';
3796  copy_params.escape = '"';
3797  for (int c = 0; c < num_columns; ++c) {
3798  if (c) {
3799  raw_data += copy_params.delimiter;
3800  }
3801  raw_data += table->ColumnNames().at(c);
3802  }
3803  raw_data += copy_params.line_delim;
3804  }
3805  // make up raw data... rowwize...
3806  const ColumnDescriptor cd;
3807  for (int g = 0; g < num_row_groups; ++g) {
3808  // data is columnwise
3809  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3810  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3811  arrays.resize(num_columns);
3812  for (int c = 0; c < num_columns; ++c) {
3813  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3814  for (auto chunk : arrays[c]->chunks()) {
3815  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3816  }
3817  }
3818  for (int r = 0; r < num_rows; ++r) {
3819  for (int c = 0; c < num_columns; ++c) {
3820  std::vector<std::string> buffer;
3821  for (auto chunk : arrays[c]->chunks()) {
3822  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3823  if (c) {
3824  raw_data += copy_params.delimiter;
3825  }
3826  if (!chunk->IsNull(r)) {
3827  raw_data += copy_params.quote;
3828  raw_data += boost::replace_all_copy(
3829  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3830  raw_data += copy_params.quote;
3831  }
3832  }
3833  }
3834  raw_data += copy_params.line_delim;
3836  if (++import_status_.rows_completed >= 10000) {
3837  // as if load truncated
3838  import_status_.load_failed = true;
3839  import_status_.load_msg = "Detector processed 10000 records";
3840  return;
3841  }
3842  }
3843  }
3844 }
3845 
3846 template <typename DATA_TYPE>
3848  std::vector<DATA_TYPE>& buffer,
3849  import_export::BadRowsTracker* const bad_rows_tracker) {
3850  const auto old_size = buffer.size();
3851  // erase backward to minimize memory movement overhead
3852  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3853  ++rit) {
3854  buffer.erase(buffer.begin() + *rit);
3855  }
3856  return std::make_tuple(old_size, buffer.size());
3857 }
3858 
3860  BadRowsTracker* const bad_rows_tracker) {
3861  switch (type) {
3862  case kBOOLEAN:
3863  return del_values(*bool_buffer_, bad_rows_tracker);
3864  case kTINYINT:
3865  return del_values(*tinyint_buffer_, bad_rows_tracker);
3866  case kSMALLINT:
3867  return del_values(*smallint_buffer_, bad_rows_tracker);
3868  case kINT:
3869  return del_values(*int_buffer_, bad_rows_tracker);
3870  case kBIGINT:
3871  case kNUMERIC:
3872  case kDECIMAL:
3873  case kTIME:
3874  case kTIMESTAMP:
3875  case kDATE:
3876  return del_values(*bigint_buffer_, bad_rows_tracker);
3877  case kFLOAT:
3878  return del_values(*float_buffer_, bad_rows_tracker);
3879  case kDOUBLE:
3880  return del_values(*double_buffer_, bad_rows_tracker);
3881  case kTEXT:
3882  case kVARCHAR:
3883  case kCHAR:
3884  return del_values(*string_buffer_, bad_rows_tracker);
3885  case kPOINT:
3886  case kMULTIPOINT:
3887  case kLINESTRING:
3888  case kMULTILINESTRING:
3889  case kPOLYGON:
3890  case kMULTIPOLYGON:
3891  return del_values(*geo_string_buffer_, bad_rows_tracker);
3892  case kARRAY:
3893  return del_values(*array_buffer_, bad_rows_tracker);
3894  default:
3895  throw std::runtime_error("Invalid Type");
3896  }
3897 }
3898 
3899 void Importer::import_local_parquet(const std::string& file_path,
3900  const Catalog_Namespace::SessionInfo* session_info) {
3901  std::shared_ptr<arrow::io::ReadableFile> infile;
3902  std::unique_ptr<parquet::arrow::FileReader> reader;
3903  std::shared_ptr<arrow::Table> table;
3904  int num_row_groups, num_columns;
3905  int64_t nrow_in_file;
3906  std::tie(num_row_groups, num_columns, nrow_in_file) =
3907  open_parquet_table(file_path, infile, reader, table);
3908  // column_list has no $deleted
3909  const auto& column_list = get_column_descs();
3910  // for now geo columns expect a wkt or wkb hex string
3911  std::vector<const ColumnDescriptor*> cds;
3912  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3913  int num_physical_cols = 0;
3914  for (auto& cd : column_list) {
3915  cds.push_back(cd);
3916  num_physical_cols += cd->columnType.get_physical_cols();
3917  }
3918  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3919  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3920  std::to_string(num_columns) + " columns in file vs " +
3921  std::to_string(column_list.size() - num_physical_cols) +
3922  " columns in table.");
3923  // slice each group to import slower columns faster, eg. geo or string
3924  max_threads = num_import_threads(copy_params.threads);
3925  VLOG(1) << "Parquet import # threads: " << max_threads;
3926  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3927  // init row estimate for this file
3928  const auto filesize = get_filesize(file_path);
3929  size_t nrow_completed{0};
3930  file_offsets.push_back(0);
3931  // map logic column index to physical column index
3932  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3933  int physical_col_idx = 0;
3934  for (int i = 0; i < logic_col_idx; ++i) {
3935  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3936  }
3937  return physical_col_idx;
3938  };
3939  // load a file = nested iteration of row groups, row slices and logical columns
3940  auto query_session = session_info ? session_info->get_session_id() : "";
3941  auto ms_load_a_file = measure<>::execution([&]() {
3942  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3943  {
3946  break;
3947  }
3948  }
3949  // a sliced row group will be handled like a (logic) parquet file, with
3950  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3951  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3953  import_status_.load_failed = true;
3954  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3955  }
3956  import_buffers_vec.resize(num_slices);
3957  for (int slice = 0; slice < num_slices; slice++) {
3958  import_buffers_vec[slice].clear();
3959  for (const auto cd : cds) {
3960  import_buffers_vec[slice].emplace_back(
3961  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3962  }
3963  }
3964  /*
3965  * A caveat here is: Parquet files or arrow data is imported column wise.
3966  * Unlike importing row-wise csv files, a error on any row of any column
3967  * forces to give up entire row group of all columns, unless there is a
3968  * sophisticated method to trace erroneous rows in individual columns so
3969  * that we can union bad rows and drop them from corresponding
3970  * import_buffers_vec; otherwise, we may exceed maximum number of
3971  * truncated rows easily even with very sparse errors in the files.
3972  */
3973  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3974  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3975  auto& bad_rows_tracker = bad_rows_trackers[slice];
3976  bad_rows_tracker.file_name = file_path;
3977  bad_rows_tracker.row_group = slice;
3978  bad_rows_tracker.importer = this;
3979  }
3980  // process arrow arrays to import buffers
3981  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3982  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3983  const auto cd = cds[physical_col_idx];
3984  std::shared_ptr<arrow::ChunkedArray> array;
3985  PARQUET_THROW_NOT_OK(
3986  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3987  const size_t array_size = array->length();
3988  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3989  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3990  for (int slice = 0; slice < num_slices; ++slice) {
3991  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3993  import_status_.load_failed = true;
3994  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3995  }
3996  thread_controller.startThread([&, slice] {
3997  const auto slice_offset = slice % num_slices;
3998  ArraySliceRange slice_range(
3999  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
4000  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
4001  auto& bad_rows_tracker = bad_rows_trackers[slice];
4002  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4003  import_buffer->import_buffers = &import_buffers_vec[slice];
4004  import_buffer->col_idx = physical_col_idx + 1;
4005  for (auto chunk : array->chunks()) {
4006  import_buffer->add_arrow_values(
4007  cd, *chunk, false, slice_range, &bad_rows_tracker);
4008  }
4009  });
4010  }
4011  thread_controller.finish();
4012  }
4013  std::vector<size_t> nrow_in_slice_raw(num_slices);
4014  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
4015  // trim bad rows from import buffers
4016  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
4017  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
4018  const auto cd = cds[physical_col_idx];
4019  for (int slice = 0; slice < num_slices; ++slice) {
4020  auto& bad_rows_tracker = bad_rows_trackers[slice];
4021  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4022  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
4023  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
4024  }
4025  }
4026  // flush slices of this row group to chunks
4027  for (int slice = 0; slice < num_slices; ++slice) {
4028  load(import_buffers_vec[slice],
4029  nrow_in_slice_successfully_loaded[slice],
4030  session_info);
4031  }
4032  // update import stats
4033  const auto nrow_original =
4034  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
4035  const auto nrow_imported =
4036  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
4037  nrow_in_slice_successfully_loaded.end(),
4038  0);
4039  const auto nrow_dropped = nrow_original - nrow_imported;
4040  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
4041  << " rows, drop " << nrow_dropped << " rows.";
4042  {
4044  import_status_.rows_completed += nrow_imported;
4045  import_status_.rows_rejected += nrow_dropped;
4046  if (import_status_.rows_rejected > copy_params.max_reject) {
4047  import_status_.load_failed = true;
4048  import_status_.load_msg = "Maximum (" + std::to_string(copy_params.max_reject) +
4049  ") rows rejected exceeded. Halting load.";
4050  LOG(ERROR) << "Maximum (" << copy_params.max_reject
4051  << ") rows rejected exceeded. Halting load.";
4052  }
4053  }
4054  // row estimate
4055  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4056  nrow_completed += nrow_imported;
4057  file_offsets.back() =
4058  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
4059  // sum up current total file offsets
4060  const auto total_file_offset =
4061  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
4062  // estimate number of rows per current total file offset
4063  if (total_file_offset) {
4065  (float)total_file_size / total_file_offset * import_status_.rows_completed;
4066  VLOG(3) << "rows_completed " << import_status_.rows_completed
4067  << ", rows_estimated " << import_status_.rows_estimated
4068  << ", total_file_size " << total_file_size << ", total_file_offset "
4069  << total_file_offset;
4070  }
4071  }
4072  });
4073  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
4074  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
4075 }
4076 
4077 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4078  const Catalog_Namespace::SessionInfo* session_info) {
4079  auto importer = dynamic_cast<Importer*>(this);
4080  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4081  : std::vector<Catalog_Namespace::TableEpochInfo>{};
4082  try {
4083  std::exception_ptr teptr;
4084  // file_paths may contain one local file path, a list of local file paths
4085  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4086  for (auto const& file_path : file_paths) {
4087  std::map<int, std::string> url_parts;
4088  Archive::parse_url(file_path, url_parts);
4089 
4090  // for a s3 url we need to know the obj keys that it comprises
4091  std::vector<std::string> objkeys;
4092  std::unique_ptr<S3ParquetArchive> us3arch;
4093  if ("s3" == url_parts[2]) {
4094 #ifdef HAVE_AWS_S3
4095  us3arch.reset(new S3ParquetArchive(file_path,
4096  copy_params.s3_access_key,
4097  copy_params.s3_secret_key,
4098  copy_params.s3_session_token,
4099  copy_params.s3_region,
4100  copy_params.s3_endpoint,
4101  copy_params.plain_text,
4102  copy_params.regex_path_filter,
4103  copy_params.file_sort_order_by,
4104  copy_params.file_sort_regex));
4105  us3arch->init_for_read();
4106  total_file_size += us3arch->get_total_file_size();
4107  objkeys = us3arch->get_objkeys();
4108 #else
4109  throw std::runtime_error("AWS S3 support not available");
4110 #endif // HAVE_AWS_S3
4111  } else {
4112  objkeys.emplace_back(file_path);
4113  }
4114 
4115  // for each obj key of a s3 url we need to land it before
4116  // importing it like doing with a 'local file'.
4117  for (auto const& objkey : objkeys) {
4118  try {
4119  auto file_path =
4120  us3arch
4121  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4122  : objkey;
4123  import_local_parquet(file_path, session_info);
4124  if (us3arch) {
4125  us3arch->vacuum(objkey);
4126  }
4127  } catch (...) {
4128  if (us3arch) {
4129  us3arch->vacuum(objkey);
4130  }
4131  throw;
4132  }
4135  break;
4136  }
4137  }
4138  }
4139  // rethrow any exception happened herebefore
4140  if (teptr) {
4141  std::rethrow_exception(teptr);
4142  }
4143  } catch (const shared::NoRegexFilterMatchException& e) {
4145  import_status_.load_failed = true;
4146  import_status_.load_msg = e.what();
4147  throw e;
4148  } catch (const std::exception& e) {
4150  import_status_.load_failed = true;
4151  import_status_.load_msg = e.what();
4152  }
4153 
4154  if (importer) {
4155  importer->checkpoint(table_epochs);
4156  }
4157 }
4158 #endif // ENABLE_IMPORT_PARQUET
4159 
4161  std::vector<std::string>& file_paths,
4162  const Catalog_Namespace::SessionInfo* session_info) {
4163  // a new requirement is to have one single input stream into
4164  // Importer::importDelimited, so need to move pipe related
4165  // stuff to the outmost block.
4166  int fd[2];
4167 #ifdef _WIN32
4168  // For some reason when folly is used to create the pipe, reader can
4169  // read nothing.
4170  auto pipe_res =
4171  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4172 #else
4173  auto pipe_res = pipe(fd);
4174 #endif
4175  if (pipe_res < 0) {
4176  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4177  }
4178 #ifndef _WIN32
4179  signal(SIGPIPE, SIG_IGN);
4180 #endif
4181 
4182  std::exception_ptr teptr;
4183  // create a thread to read uncompressed byte stream out of pipe and
4184  // feed into importDelimited()
4185  ImportStatus ret1;
4186  auto th_pipe_reader = std::thread([&]() {
4187  try {
4188  // importDelimited will read from FILE* p_file
4189  if (0 == (p_file = fdopen(fd[0], "r"))) {
4190  throw std::runtime_error(std::string("failed to open a pipe: ") +
4191  strerror(errno));
4192  }
4193 
4194  // in future, depending on data types of this uncompressed stream
4195  // it can be feed into other function such like importParquet, etc
4196  ret1 = importDelimited(file_path, true, session_info);
4197 
4198  } catch (...) {
4199  if (!teptr) { // no replace
4200  teptr = std::current_exception();
4201  }
4202  }
4203 
4204  if (p_file) {
4205  fclose(p_file);
4206  }
4207  p_file = 0;
4208  });
4209 
4210  // create a thread to iterate all files (in all archives) and
4211  // forward the uncompressed byte stream to fd[1] which is
4212  // then feed into importDelimited, importParquet, and etc.
4213  auto th_pipe_writer = std::thread([&]() {
4214  std::unique_ptr<S3Archive> us3arch;
4215  bool stop = false;
4216  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4217  try {
4218  auto file_path = file_paths[fi];
4219  std::unique_ptr<Archive> uarch;
4220  std::map<int, std::string> url_parts;
4221  Archive::parse_url(file_path, url_parts);
4222  const std::string S3_objkey_url_scheme = "s3ok";
4223  if ("file" == url_parts[2] || "" == url_parts[2]) {
4224  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4225  } else if ("s3" == url_parts[2]) {
4226 #ifdef HAVE_AWS_S3
4227  // new a S3Archive with a shared s3client.
4228  // should be safe b/c no wildcard with s3 url
4229  us3arch.reset(new S3Archive(file_path,
4230  copy_params.s3_access_key,
4231  copy_params.s3_secret_key,
4232  copy_params.s3_session_token,
4233  copy_params.s3_region,
4234  copy_params.s3_endpoint,
4235  copy_params.plain_text,
4236  copy_params.regex_path_filter,
4237  copy_params.file_sort_order_by,
4238  copy_params.file_sort_regex));
4239  us3arch->init_for_read();
4240  total_file_size += us3arch->get_total_file_size();
4241  // not land all files here but one by one in following iterations
4242  for (const auto& objkey : us3arch->get_objkeys()) {
4243  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4244  }
4245  continue;
4246 #else
4247  throw std::runtime_error("AWS S3 support not available");
4248 #endif // HAVE_AWS_S3
4249  } else if (S3_objkey_url_scheme == url_parts[2]) {
4250 #ifdef HAVE_AWS_S3
4251  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4252  auto file_path =
4253  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4254  if (0 == file_path.size()) {
4255  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4256  }
4257  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4258  // file not removed until file closed
4259  us3arch->vacuum(objkey);
4260 #else
4261  throw std::runtime_error("AWS S3 support not available");
4262 #endif // HAVE_AWS_S3
4263  }
4264 #if 0 // TODO(ppan): implement and enable any other archive class
4265  else
4266  if ("hdfs" == url_parts[2])
4267  uarch.reset(new HdfsArchive(file_path));
4268 #endif
4269  else {
4270  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4271  }
4272 
4273  // init the archive for read
4274  auto& arch = *uarch;
4275 
4276  // coming here, the archive of url should be ready to be read, unarchived
4277  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4278  const void* buf;
4279  size_t size;
4280  bool just_saw_archive_header;
4281  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4282  bool first_text_header_skipped = false;
4283  // start reading uncompressed bytes of this archive from libarchive
4284  // note! this archive may contain more than one files!
4285  file_offsets.push_back(0);
4286  size_t num_block_read = 0;
4287  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4288  bool insert_line_delim_after_this_file = false;
4289  while (!stop) {
4290  int64_t offset{-1};
4291  auto ok = arch.read_data_block(&buf, &size, &offset);
4292  // can't use (uncompressed) size, so track (max) file offset.
4293  // also we want to capture offset even on e.o.f.
4294  if (offset > 0) {
4295  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4296  file_offsets.back() = offset;
4297  }
4298  if (!ok) {
4299  break;
4300  }
4301  // one subtle point here is now we concatenate all files
4302  // to a single FILE stream with which we call importDelimited
4303  // only once. this would make it misunderstand that only one
4304  // header line is with this 'single' stream, while actually
4305  // we may have one header line for each of the files.
4306  // so we need to skip header lines here instead in importDelimited.
4307  const char* buf2 = (const char*)buf;
4308  int size2 = size;
4309  if (copy_params.has_header != import_export::ImportHeaderRow::kNoHeader &&
4310  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4311  while (size2-- > 0) {
4312  if (*buf2++ == copy_params.line_delim) {
4313  break;
4314  }
4315  }
4316  if (size2 <= 0) {
4317  LOG(WARNING) << "No line delimiter in block." << std::endl;
4318  } else {
4319  just_saw_archive_header = false;
4320  first_text_header_skipped = true;
4321  }
4322  }
4323  // In very rare occasions the write pipe somehow operates in a mode similar
4324  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4325  // means blocking mode. On such a unreliable blocking mode, a possible fix
4326  // is to loop reading till no bytes left, otherwise the annoying `failed to
4327  // write pipe: Success`...
4328  if (size2 > 0) {
4329  int nremaining = size2;
4330  while (nremaining > 0) {
4331  // try to write the entire remainder of the buffer to the pipe
4332  int nwritten = write(fd[1], buf2, nremaining);
4333  // how did we do?
4334  if (nwritten < 0) {
4335  // something bad happened
4336  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4337  // ignore these, assume nothing written, try again
4338  nwritten = 0;
4339  } else if (errno == EPIPE &&
4341  // the reader thread has shut down the pipe from the read end
4342  stop = true;
4343  break;
4344  } else {
4345  // a real error
4346  throw std::runtime_error(
4347  std::string("failed or interrupted write to pipe: ") +
4348  strerror(errno));
4349  }
4350  } else if (nwritten == nremaining) {
4351  // we wrote everything; we're done
4352  break;
4353  }
4354  // only wrote some (or nothing), try again
4355  nremaining -= nwritten;
4356  buf2 += nwritten;
4357  // no exception when too many rejected
4360  stop = true;
4361  break;
4362  }
4363  }
4364  // check that this file (buf for size) ended with a line delim
4365  if (size > 0) {
4366  const char* plast = static_cast<const char*>(buf) + (size - 1);
4367  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4368  }
4369  }
4370  ++num_block_read;
4371  }
4372 
4373  // if that file didn't end with a line delim, we insert one here to terminate
4374  // that file's stream use a loop for the same reason as above
4375  if (insert_line_delim_after_this_file) {
4376  while (true) {
4377  // write the delim char to the pipe
4378  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4379  // how did we do?
4380  if (nwritten < 0) {
4381  // something bad happened
4382  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4383  // ignore these, assume nothing written, try again
4384  nwritten = 0;
4385  } else if (errno == EPIPE &&
4387  // the reader thread has shut down the pipe from the read end
4388  stop = true;
4389  break;
4390  } else {
4391  // a real error
4392  throw std::runtime_error(
4393  std::string("failed or interrupted write to pipe: ") +
4394  strerror(errno));
4395  }
4396  } else if (nwritten == 1) {
4397  // we wrote it; we're done
4398  break;
4399  }
4400  }
4401  }
4402  }
4403  } catch (...) {
4404  // when import is aborted because too many data errors or because end of a
4405  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4406  // suppressed.
4409  break;
4410  }
4411  if (import_status_.rows_completed > 0) {
4412  if (nullptr != dynamic_cast<Detector*>(this)) {
4413  break;
4414  }
4415  }
4416  if (!teptr) { // no replace
4417  teptr = std::current_exception();
4418  }
4419  break;
4420  }
4421  }
4422  // close writer end
4423  close(fd[1]);
4424  });
4425 
4426  th_pipe_reader.join();
4427  th_pipe_writer.join();
4428 
4429  // rethrow any exception happened herebefore
4430  if (teptr) {
4431  std::rethrow_exception(teptr);
4432  }
4433 }
4434 
4436  return DataStreamSink::archivePlumber(session_info);
4437 }
4438 
4440  const std::string& file_path,
4441  const bool decompressed,
4442  const Catalog_Namespace::SessionInfo* session_info) {
4444  auto query_session = session_info ? session_info->get_session_id() : "";
4445 
4446  if (!p_file) {
4447  p_file = fopen(file_path.c_str(), "rb");
4448  }
4449  if (!p_file) {
4450  throw std::runtime_error("failed to open file '" + file_path +
4451  "': " + strerror(errno));
4452  }
4453 
4454  if (!decompressed) {
4455  (void)fseek(p_file, 0, SEEK_END);
4456  file_size = ftell(p_file);
4457  }
4458 
4459  max_threads = num_import_threads(copy_params.threads);
4460  VLOG(1) << "Delimited import # threads: " << max_threads;
4461 
4462  // deal with small files
4463  size_t alloc_size = copy_params.buffer_size;
4464  if (!decompressed && file_size < alloc_size) {
4465  alloc_size = file_size;
4466  }
4467 
4468  for (size_t i = 0; i < max_threads; i++) {
4469  import_buffers_vec.emplace_back();
4470  for (const auto cd : loader->get_column_descs()) {
4471  import_buffers_vec[i].emplace_back(
4472  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4473  }
4474  }
4475 
4476  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4477  size_t current_pos = 0;
4478  size_t end_pos;
4479  size_t begin_pos = 0;
4480 
4481  (void)fseek(p_file, current_pos, SEEK_SET);
4482  size_t size =
4483  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4484 
4485  // make render group analyzers for each poly column
4486  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4487  if (copy_params.geo_assign_render_groups) {
4489  auto& cat = loader->getCatalog();
4490  auto* td = loader->getTableDesc();
4491  CHECK(td);
4492  auto column_descriptors =
4493  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4494  for (auto const& cd : column_descriptors) {
4495  if (IS_GEO_POLY(cd->columnType.get_type())) {
4496  auto rga = std::make_shared<RenderGroupAnalyzer>();
4497  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4498  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4499  }
4500  }
4501  } else {
4502  fclose(p_file);
4503  throw std::runtime_error(
4504  "Render Group Assignment requested in CopyParams but disabled in Server "
4505  "Config. Set enable_assign_render_groups=true in Server Config to override.");
4506  }
4507  }
4508 
4509  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4510  loader->getTableDesc()->tableId};
4511  auto table_epochs = loader->getTableEpochs();
4513  {
4514  std::list<std::future<ImportStatus>> threads;
4515 
4516  // use a stack to track thread_ids which must not overlap among threads
4517  // because thread_id is used to index import_buffers_vec[]
4518  std::stack<size_t> stack_thread_ids;
4519  for (size_t i = 0; i < max_threads; i++) {
4520  stack_thread_ids.push(i);
4521  }
4522  // added for true row index on error
4523  size_t first_row_index_this_buffer = 0;
4524 
4525  while (size > 0) {
4526  unsigned int num_rows_this_buffer = 0;
4527  CHECK(scratch_buffer);
4528  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4529  scratch_buffer,
4530  size,
4531  copy_params,
4532  first_row_index_this_buffer,
4533  num_rows_this_buffer,
4534  p_file);
4535 
4536  // unput residual
4537  int nresidual = size - end_pos;
4538  std::unique_ptr<char[]> unbuf;
4539  if (nresidual > 0) {
4540  unbuf = std::make_unique<char[]>(nresidual);
4541  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4542  }
4543 
4544  // get a thread_id not in use
4545  auto thread_id = stack_thread_ids.top();
4546  stack_thread_ids.pop();
4547  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4548 
4549  threads.push_back(std::async(std::launch::async,
4551  thread_id,
4552  this,
4553  std::move(scratch_buffer),
4554  begin_pos,
4555  end_pos,
4556  end_pos,
4557  columnIdToRenderGroupAnalyzerMap,
4558  first_row_index_this_buffer,
4559  session_info,
4560  executor));
4561 
4562  first_row_index_this_buffer += num_rows_this_buffer;
4563 
4564  current_pos += end_pos;
4565  scratch_buffer = std::make_unique<char[]>(alloc_size);
4566  CHECK(scratch_buffer);
4567  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4568  size = nresidual +
4569  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4570 
4571  begin_pos = 0;
4572  while (threads.size() > 0) {
4573  int nready = 0;
4574  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4575  it != threads.end();) {
4576  auto& p = *it;
4577  std::chrono::milliseconds span(0);
4578  if (p.wait_for(span) == std::future_status::ready) {
4579  auto ret_import_status = p.get();
4580  {
4582  import_status_ += ret_import_status;
4583  if (ret_import_status.load_failed) {
4585  }
4586  }
4587  // sum up current total file offsets
4588  size_t total_file_offset{0};
4589  if (decompressed) {
4590  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4591  for (const auto file_offset : file_offsets) {
4592  total_file_offset += file_offset;
4593  }
4594  }
4595  // estimate number of rows per current total file offset
4596  if (decompressed ? total_file_offset : current_pos) {
4598  (decompressed ? (float)total_file_size / total_file_offset
4599  : (float)file_size / current_pos) *
4601  }
4602  VLOG(3) << "rows_completed " << import_status_.rows_completed
4603  << ", rows_estimated " << import_status_.rows_estimated
4604  << ", total_file_size " << total_file_size << ", total_file_offset "
4605  << total_file_offset;
4607  // recall thread_id for reuse
4608  stack_thread_ids.push(ret_import_status.thread_id);
4609  threads.erase(it++);
4610  ++nready;
4611  } else {
4612  ++it;
4613  }
4614  }
4615 
4616  if (nready == 0) {
4617  std::this_thread::yield();
4618  }
4619 
4620  // on eof, wait all threads to finish
4621  if (0 == size) {
4622  continue;
4623  }
4624 
4625  // keep reading if any free thread slot
4626  // this is one of the major difference from old threading model !!
4627  if (threads.size() < max_threads) {
4628  break;
4629  }
4632  break;
4633  }
4634  }
4636  if (import_status_.rows_rejected > copy_params.max_reject) {
4637  import_status_.load_failed = true;
4638  // todo use better message
4639  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4640  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4641  break;
4642  }
4644  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4645  break;
4646  }
4647  }
4648 
4649  // join dangling threads in case of LOG(ERROR) above
4650  for (auto& p : threads) {
4651  p.wait();
4652  }
4653  }
4654 
4655  checkpoint(table_epochs);
4656 
4657  fclose(p_file);
4658  p_file = nullptr;
4659  return import_status_;
4660 }
4661 
4663  if (getTableDesc()->persistenceLevel ==
4664  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4665  // tables
4667  }
4668 }
4669 
4670 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4671  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4672  getTableDesc()->tableId);
4673 }
4674 
4676  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4677  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4678 }
4679 
4680 /* static */
4682  const std::string& file_name,
4683  const CopyParams& copy_params) {
4686  copy_params.s3_endpoint,
4687  copy_params.s3_access_key,
4688  copy_params.s3_secret_key,
4689  copy_params.s3_session_token);
4690  if (copy_params.source_type != import_export::SourceType::kGeoFile) {
4691  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4692  std::to_string(static_cast<int>(copy_params.source_type)) +
4693  ")");
4694  }
4696 }
4697 
4698 namespace {
4699 
4700 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4702  const std::string& file_name) {
4703  // get layer with specified name, or default to first layer
4704  OGRLayer* poLayer = nullptr;
4705  if (geo_layer_name.size()) {
4706  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4707  if (poLayer == nullptr) {
4708  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4709  file_name);
4710  }
4711  } else {
4712  poLayer = poDS->GetLayer(0);
4713  if (poLayer == nullptr) {
4714  throw std::runtime_error("No layers found in " + file_name);
4715  }
4716  }
4717  return *poLayer;
4718 }
4719 
4720 } // namespace
4721 
4722 /* static */
4724  const std::string& file_name,
4725  const std::string& geo_column_name,
4726  std::map<std::string, std::vector<std::string>>& sample_data,
4727  int row_limit,
4728  const CopyParams& copy_params) {
4730  openGDALDataSource(file_name, copy_params));
4731  if (datasource == nullptr) {
4732  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4733  file_name);
4734  }
4735 
4736  OGRLayer& layer =
4737  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4738 
4739  auto const* feature_defn = layer.GetLayerDefn();
4740  CHECK(feature_defn);
4741 
4742  // metadata columns?
4743  auto const metadata_column_infos =
4744  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4745 
4746  // get limited feature count
4747  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4748  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4749  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4750 
4751  // prepare sample data map
4752  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4753  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4754  CHECK(column_name);
4755  sample_data[column_name] = {};
4756  }
4757  sample_data[geo_column_name] = {};
4758  for (auto const& mci : metadata_column_infos) {
4759  sample_data[mci.column_descriptor.columnName] = {};
4760  }
4761 
4762  // prepare to read
4763  layer.ResetReading();
4764 
4765  // read features (up to limited count)
4766  uint64_t feature_index{0u};
4767  while (feature_index < num_features) {
4768  // get (and take ownership of) feature
4769  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4770  if (!feature) {
4771  break;
4772  }
4773 
4774  // get feature geometry
4775  auto const* geometry = feature->GetGeometryRef();
4776  if (geometry == nullptr) {
4777  break;
4778  }
4779 
4780  // validate geom type (again?)
4781  switch (wkbFlatten(geometry->getGeometryType())) {
4782  case wkbPoint:
4783  case wkbMultiPoint:
4784  case wkbLineString:
4785  case wkbMultiLineString:
4786  case wkbPolygon:
4787  case wkbMultiPolygon:
4788  break;
4789  default:
4790  throw std::runtime_error("Unsupported geometry type: " +
4791  std::string(geometry->getGeometryName()));
4792  }
4793 
4794  // populate sample data for regular field columns
4795  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4796  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4797  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4798  }
4799 
4800  // populate sample data for metadata columns?
4801  for (auto const& mci : metadata_column_infos) {
4802  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4803  }
4804 
4805  // populate sample data for geo column with WKT string
4806  char* wkts = nullptr;
4807  geometry->exportToWkt(&wkts);
4808  CHECK(wkts);
4809  sample_data[geo_column_name].push_back(wkts);
4810  CPLFree(wkts);
4811 
4812  // next feature
4813  feature_index++;
4814  }
4815 }
4816 
4817 namespace {
4818 
4819 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4820  switch (ogr_type) {
4821  case OFTInteger:
4822  return std::make_pair(kINT, false);
4823  case OFTIntegerList:
4824  return std::make_pair(kINT, true);
4825 #if GDAL_VERSION_MAJOR > 1
4826  case OFTInteger64:
4827  return std::make_pair(kBIGINT, false);
4828  case OFTInteger64List:
4829  return std::make_pair(kBIGINT, true);
4830 #endif
4831  case OFTReal:
4832  return std::make_pair(kDOUBLE, false);
4833  case OFTRealList:
4834  return std::make_pair(kDOUBLE, true);
4835  case OFTString:
4836  return std::make_pair(kTEXT, false);
4837  case OFTStringList:
4838  return std::make_pair(kTEXT, true);
4839  case OFTDate:
4840  return std::make_pair(kDATE, false);
4841  case OFTTime:
4842  return std::make_pair(kTIME, false);
4843  case OFTDateTime:
4844  return std::make_pair(kTIMESTAMP, false);
4845  case OFTBinary:
4846  // Interpret binary blobs as byte arrays here
4847  // but actual import will store NULL as GDAL will not
4848  // extract the blob (OGRFeature::GetFieldAsString will
4849  // result in the import buffers having an empty string)
4850  return std::make_pair(kTINYINT, true);
4851  default:
4852  break;
4853  }
4854  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4855 }
4856 
4857 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4858  switch (ogr_type) {
4859  case wkbPoint:
4860  return kPOINT;
4861  case wkbMultiPoint:
4862  return kMULTIPOINT;
4863  case wkbLineString:
4864  return kLINESTRING;
4865  case wkbMultiLineString:
4866  return kMULTILINESTRING;
4867  case wkbPolygon:
4868  return kPOLYGON;
4869  case wkbMultiPolygon:
4870  return kMULTIPOLYGON;
4871  default:
4872  break;
4873  }
4874  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4875 }
4876 
4878  const import_export::RasterPointType raster_point_type) {
4879  switch (raster_point_type) {
4894  }
4895  UNREACHABLE();
4897 }
4898 
4900  const import_export::RasterPointTransform raster_point_transform) {
4901  switch (raster_point_transform) {
4910  }
4911  UNREACHABLE();
4913 }
4914 
4915 } // namespace
4916 
4917 /* static */
4918 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4919  const std::string& file_name,
4920  const bool is_raster,
4921  const std::string& geo_column_name,
4922  const CopyParams& copy_params) {
4923  if (is_raster) {
4924  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4925  }
4926  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4927 }
4928 
4929 /* static */
4930 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsRaster(
4931  const std::string& file_name,
4932  const std::string& geo_column_name,
4933  const CopyParams& copy_params) {
4934  // lazy init GDAL
4937  copy_params.s3_endpoint,
4938  copy_params.s3_access_key,
4939  copy_params.s3_secret_key,
4940  copy_params.s3_session_token);
4941 
4942  // prepare for metadata column
4943  auto metadata_column_infos =
4944  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4945 
4946  // create a raster importer and do the detect
4947  RasterImporter raster_importer;
4948  raster_importer.detect(
4949  file_name,
4950  copy_params.raster_import_bands,
4951  copy_params.raster_import_dimensions,
4954  copy_params.raster_point_compute_angle,
4955  false,
4956  metadata_column_infos);
4957 
4958  // prepare to capture column descriptors
4959  std::list<ColumnDescriptor> cds;
4960 
4961  // get the point column info
4962  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4963 
4964  // create the columns for the point in the specified type
4965  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4966  ColumnDescriptor cd;
4967  cd.columnName = cd.sourceName = col_name;
4968  cd.columnType.set_type(sql_type);
4969  // hardwire other POINT attributes for now
4970  if (sql_type == kPOINT) {
4972  cd.columnType.set_input_srid(4326);
4973  cd.columnType.set_output_srid(4326);
4975  cd.columnType.set_comp_param(32);
4976  }
4977  cds.push_back(cd);
4978  }
4979 
4980  // get the names and types for the band column(s)
4981  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4982 
4983  // add column descriptors for each band
4984  for (auto const& [band_name, sql_type] : band_names_and_types) {
4985  ColumnDescriptor cd;
4986  cd.columnName = cd.sourceName = band_name;
4987  cd.columnType.set_type(sql_type);
4989  cds.push_back(cd);
4990  }
4991 
4992  // metadata columns?
4993  for (auto& mci : metadata_column_infos) {
4994  cds.push_back(std::move(mci.column_descriptor));
4995  }
4996 
4997  // return the results
4998  return cds;
4999 }
5000 
5001 /* static */
5002 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsGeo(
5003  const std::string& file_name,
5004  const std::string& geo_column_name,
5005  const CopyParams& copy_params) {
5006  std::list<ColumnDescriptor> cds;
5007 
5008  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5009  if (poDS == nullptr) {
5010  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5011  file_name);
5012  }
5013  if (poDS->GetLayerCount() == 0) {
5014  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5015  " has no layers");
5016  }
5017 
5018  OGRLayer& layer =
5019  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
5020 
5021  layer.ResetReading();
5022  // TODO(andrewseidl): support multiple features
5023  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5024  if (poFeature == nullptr) {
5025  throw std::runtime_error("No features found in " + file_name);
5026  }
5027  // get fields as regular columns
5028  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5029  CHECK(poFDefn);
5030  int iField;
5031  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5032  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5033  auto typePair = ogr_to_type(poFieldDefn->GetType());
5034  ColumnDescriptor cd;
5035  cd.columnName = poFieldDefn->GetNameRef();
5036  cd.sourceName = poFieldDefn->GetNameRef();
5037  SQLTypeInfo ti;
5038  if (typePair.second) {
5039  ti.set_type(kARRAY);
5040  ti.set_subtype(typePair.first);
5041  } else {
5042  ti.set_type(typePair.first);
5043  }
5044  if (typePair.first == kTEXT) {
5046  ti.set_comp_param(32);
5047  }
5048  ti.set_fixed_size();
5049  cd.columnType = ti;
5050  cds.push_back(cd);
5051  }
5052  // get geo column, if any
5053  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
5054  if (poGeometry) {
5055  ColumnDescriptor cd;
5056  cd.columnName = geo_column_name;
5057  cd.sourceName = geo_column_name;
5058 
5059  // get GDAL type
5060  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
5061 
5062  // if exploding, override any collection type to child type
5063  if (copy_params.geo_explode_collections) {
5064  if (ogr_type == wkbMultiPolygon) {
5065  ogr_type = wkbPolygon;
5066  } else if (ogr_type == wkbMultiLineString) {
5067  ogr_type = wkbLineString;
5068  } else if (ogr_type == wkbMultiPoint) {
5069  ogr_type = wkbPoint;
5070  }
5071  }
5072 
5073  // convert to internal type
5074  SQLTypes geoType = ogr_to_type(ogr_type);
5075 
5076  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5078  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5079  }
5080 
5081  // build full internal type
5082  SQLTypeInfo ti;
5083  ti.set_type(geoType);
5084  ti.set_subtype(copy_params.geo_coords_type);
5085  ti.set_input_srid(copy_params.geo_coords_srid);
5086  ti.set_output_srid(copy_params.geo_coords_srid);
5087  ti.set_compression(copy_params.geo_coords_encoding);
5088  ti.set_comp_param(copy_params.geo_coords_comp_param);
5089  cd.columnType = ti;
5090 
5091  cds.push_back(cd);
5092  }
5093 
5094  // metadata columns?
5095  auto metadata_column_infos =
5096  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
5097  for (auto& mci : metadata_column_infos) {
5098  cds.push_back(std::move(mci.column_descriptor));
5099  }
5100 
5101  return cds;
5102 }
5103 
5104 bool Importer::gdalStatInternal(const std::string& path,
5105  const CopyParams& copy_params,
5106  bool also_dir) {
5107  // lazy init GDAL
5110  copy_params.s3_endpoint,
5111  copy_params.s3_access_key,
5112  copy_params.s3_secret_key,
5113  copy_params.s3_session_token);
5114 
5115 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5116  // clear GDAL stat cache
5117  // without this, file existence will be cached, even if authentication changes
5118  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5119  VSICurlClearCache();
5120 #endif
5121 
5122  // stat path
5123  VSIStatBufL sb;
5124  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5125  if (result < 0) {
5126  return false;
5127  }
5128 
5129  // exists?
5130  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5131  return true;
5132  } else if (VSI_ISREG(sb.st_mode)) {
5133  return true;
5134  }
5135  return false;
5136 }
5137 
5138 /* static */
5139 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
5140  return gdalStatInternal(path, copy_params, false);
5141 }
5142 
5143 /* static */
5144 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
5145  const CopyParams& copy_params) {
5146  return gdalStatInternal(path, copy_params, true);
5147 }
5148 
5149 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
5150  std::vector<std::string>& files) {
5151  // prepare to gather subdirectories
5152  std::vector<std::string> subdirectories;
5153 
5154  // get entries
5155  char** entries = VSIReadDir(archive_path.c_str());
5156  if (!entries) {
5157  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
5158  return;
5159  }
5160 
5161  // force scope
5162  {
5163  // request clean-up
5164  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5165 
5166  // check all the entries
5167  int index = 0;
5168  while (true) {
5169  // get next entry, or drop out if there isn't one
5170  char* entry_c = entries[index++];
5171  if (!entry_c) {
5172  break;
5173  }
5174  std::string entry(entry_c);
5175 
5176  // ignore '.' and '..'
5177  if (entry == "." || entry == "..") {
5178  continue;
5179  }
5180 
5181  // build the full path
5182  std::string entry_path = archive_path + std::string("/") + entry;
5183 
5184  // is it a file or a sub-folder
5185  VSIStatBufL sb;
5186  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5187  if (result < 0) {
5188  break;
5189  }
5190 
5191  if (VSI_ISDIR(sb.st_mode)) {
5192  // a directory that ends with .gdb could be a Geodatabase bundle
5193  // arguably dangerous to decide this purely by name, but any further
5194  // validation would be very complex especially at this scope
5195  if (boost::iends_with(entry_path, ".gdb")) {
5196  // add the directory as if it was a file and don't recurse into it
5197  files.push_back(entry_path);
5198  } else {
5199  // add subdirectory to be recursed into
5200  subdirectories.push_back(entry_path);
5201  }
5202  } else {
5203  // add this file
5204  files.push_back(entry_path);
5205  }
5206  }
5207  }
5208 
5209  // recurse into each subdirectories we found
5210  for (const auto& subdirectory : subdirectories) {
5211  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5212  }
5213 }
5214 
5215 /* static */
5216 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5217  const std::string& archive_path,
5218  const CopyParams& copy_params) {
5219  // lazy init GDAL
5222  copy_params.s3_endpoint,
5223  copy_params.s3_access_key,
5224  copy_params.s3_secret_key,
5225  copy_params.s3_session_token);
5226 
5227  // prepare to gather files
5228  std::vector<std::string> files;
5229 
5230  // gather the files recursively
5231  gdalGatherFilesInArchiveRecursive(archive_path, files);
5232 
5233  // convert to relative paths inside archive
5234  for (auto& file : files) {
5235  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5236  }
5237 
5238  // done
5239  return files;
5240 }
5241 
5242 /* static */
5243 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5244  const std::string& file_name,
5245  const CopyParams& copy_params) {
5246  // lazy init GDAL
5249  copy_params.s3_endpoint,
5250  copy_params.s3_access_key,
5251  copy_params.s3_secret_key,
5252  copy_params.s3_session_token);
5253 
5254  // prepare to gather layer info
5255  std::vector<GeoFileLayerInfo> layer_info;
5256 
5257  // open the data set
5258  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5259  if (poDS == nullptr) {
5260  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5261  file_name);
5262  }
5263 
5264  // enumerate the layers
5265  for (auto&& poLayer : poDS->GetLayers()) {
5267  // prepare to read this layer
5268  poLayer->ResetReading();
5269  // skip layer if empty
5270  if (poLayer->GetFeatureCount() > 0) {
5271  // get first feature
5272  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5273  CHECK(first_feature);
5274  // check feature for geometry
5275  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5276  if (!geometry) {
5277  // layer has no geometry
5278  contents = GeoFileLayerContents::NON_GEO;
5279  } else {
5280  // check the geometry type
5281  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5282  switch (wkbFlatten(geometry_type)) {
5283  case wkbPoint:
5284  case wkbLineString:
5285  case wkbMultiLineString:
5286  case wkbPolygon:
5287  case wkbMultiPolygon:
5288  // layer has supported geo
5289  contents = GeoFileLayerContents::GEO;
5290  break;
5291  case wkbMultiPoint:
5292  // supported if geo_explode_collections is specified
5293  contents = copy_params.geo_explode_collections
5296  break;
5297  default:
5298  // layer has unsupported geometry
5300  break;
5301  }
5302  }
5303  }
5304  // store info for this layer
5305  layer_info.emplace_back(poLayer->GetName(), contents);
5306  }
5307 
5308  // done
5309  return layer_info;
5310 }
5311 
5313  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5314  const Catalog_Namespace::SessionInfo* session_info,
5315  const bool is_raster) {
5316  if (is_raster) {
5317  return importGDALRaster(session_info);
5318  }
5319  return importGDALGeo(columnNameToSourceNameMap, session_info);
5320 }
5321 
5323  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5324  const Catalog_Namespace::SessionInfo* session_info) {
5325  // initial status
5327  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_path, copy_params));
5328  if (poDS == nullptr) {
5329  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5330  file_path);
5331  }
5332 
5333  OGRLayer& layer =
5334  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_path);
5335 
5336  // get the number of features in this layer
5337  size_t numFeatures = layer.GetFeatureCount();
5338 
5339  // build map of metadata field (additional columns) name to index
5340  // use shared_ptr since we need to pass it to the worker
5341  FieldNameToIndexMapType fieldNameToIndexMap;
5342  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5343  CHECK(poFDefn);
5344  size_t numFields = poFDefn->GetFieldCount();
5345  for (size_t iField = 0; iField < numFields; iField++) {
5346  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5347  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5348  }
5349 
5350  // the geographic spatial reference we want to put everything in
5351  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5352  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5353 
5354 #if GDAL_VERSION_MAJOR >= 3
5355  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5356  // this results in X and Y being transposed for angle-based
5357  // coordinate systems. This restores the previous behavior.
5358  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5359 #endif
5360 
5361 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5362  // just one "thread"
5363  max_threads = 1;
5364 #else
5365  // how many threads to use
5366  max_threads = num_import_threads(copy_params.threads);
5367 #endif
5368  VLOG(1) << "GDAL import # threads: " << max_threads;
5369 
5370  // metadata columns?
5371  auto const metadata_column_infos =
5372  parse_add_metadata_columns(copy_params.add_metadata_columns, file_path);
5373 
5374  // import geo table is specifically handled in both DBHandler and QueryRunner
5375  // that is separate path against a normal SQL execution
5376  // so we here explicitly enroll the import session to allow interruption
5377  // while importing geo table
5378  auto query_session = session_info ? session_info->get_session_id() : "";
5379  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5381  auto is_session_already_registered = false;
5382  {
5384  executor->getSessionLock());
5385  is_session_already_registered =
5386  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5387  }
5388  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5389  !is_session_already_registered) {
5390  executor->enrollQuerySession(query_session,
5391  "IMPORT_GEO_TABLE",
5392  query_submitted_time,
5394  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5395  }
5396  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5397  // reset the runtime query interrupt status
5398  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5399  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5400  }
5401  };
5402 
5403  // make an import buffer for each thread
5404  CHECK_EQ(import_buffers_vec.size(), 0u);
5405  import_buffers_vec.resize(max_threads);
5406  for (size_t i = 0; i < max_threads; i++) {
5407  for (const auto cd : loader->get_column_descs()) {
5408  import_buffers_vec[i].emplace_back(
5409  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5410  }
5411  }
5412 
5413  // make render group analyzers for each poly column
5414  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5415  if (copy_params.geo_assign_render_groups) {
5417  auto& cat = loader->getCatalog();
5418  auto* td = loader->getTableDesc();
5419  CHECK(td);
5420  auto column_descriptors =
5421  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5422  for (auto const& cd : column_descriptors) {
5423  if (IS_GEO_POLY(cd->columnType.get_type())) {
5424  auto rga = std::make_shared<RenderGroupAnalyzer>();
5425  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5426  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5427  }
5428  }
5429  } else {
5430  throw std::runtime_error(
5431  "Render Group Assignment requested in CopyParams but disabled in Server "
5432  "Config. Set enable_assign_render_groups=true in Server Config to override.");
5433  }
5434  }
5435 
5436 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5437  // threads
5438  std::list<std::future<ImportStatus>> threads;
5439 
5440  // use a stack to track thread_ids which must not overlap among threads
5441  // because thread_id is used to index import_buffers_vec[]
5442  std::stack<size_t> stack_thread_ids;
5443  for (size_t i = 0; i < max_threads; i++) {
5444  stack_thread_ids.push(i);
5445  }
5446 #endif
5447 
5448  // checkpoint the table
5449  auto table_epochs = loader->getTableEpochs();
5450 
5451  // reset the layer
5452  layer.ResetReading();
5453 
5454  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5455 
5456  // make a features buffer for each thread
5457  std::vector<FeaturePtrVector> features(max_threads);
5458 
5459  // make one of these for each thread, based on the first feature's SR
5460  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5461  max_threads);
5462 
5463  // for each feature...
5464  size_t firstFeatureThisChunk = 0;
5465  while (firstFeatureThisChunk < numFeatures) {
5466  // how many features this chunk
5467  size_t numFeaturesThisChunk =
5468  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5469 
5470 // get a thread_id not in use
5471 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5472  size_t thread_id = 0;
5473 #else
5474  auto thread_id = stack_thread_ids.top();
5475  stack_thread_ids.pop();
5476  CHECK(thread_id < max_threads);