OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @brief Functions for Importer class
20  *
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
28 #include <gdal.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
55 #include "Archive/S3Archive.h"
56 #include "ArrowImporter.h"
57 #include "Catalog/os/UserMapping.h"
58 #ifdef ENABLE_IMPORT_PARQUET
60 #endif
61 #if defined(ENABLE_IMPORT_PARQUET)
62 #include "Catalog/ForeignTable.h"
64 #endif
65 #ifdef ENABLE_IMPORT_PARQUET
67 #endif
68 #include "Geospatial/Compression.h"
69 #include "Geospatial/GDAL.h"
70 #include "Geospatial/Transforms.h"
71 #include "Geospatial/Types.h"
76 #include "Logger/Logger.h"
78 #include "QueryEngine/Execute.h"
80 #include "Shared/DateTimeParser.h"
81 #include "Shared/SqlTypesLayout.h"
82 #include "Shared/SysDefinitions.h"
83 #include "Shared/file_path_util.h"
84 #include "Shared/import_helpers.h"
85 #include "Shared/likely.h"
86 #include "Shared/measure.h"
87 #include "Shared/misc.h"
88 #include "Shared/scope.h"
89 #include "Shared/shard_key.h"
90 #include "Shared/thread_count.h"
92 
93 #include "gen-cpp/Heavy.h"
94 
95 #ifdef _WIN32
96 #include <fcntl.h>
97 #include <io.h>
98 #endif
99 
100 #define TIMER_STOP(t) \
101  (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
102  t)) / \
103  1.0E6f)
104 
106  32; // Max number of default import threads to use (num hardware threads will be used
107 // if lower, and can also be explicitly overriden in copy statement with threads
108 // option)
109 size_t g_archive_read_buf_size = 1 << 20;
110 
111 std::optional<size_t> g_detect_test_sample_size = std::nullopt;
112 
113 static constexpr int kMaxRasterScanlinesPerThread = 32;
114 
115 inline auto get_filesize(const std::string& file_path) {
116  boost::filesystem::path boost_file_path{file_path};
118  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
119  return ec ? 0 : filesize;
120 }
121 
122 namespace {
123 
124 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
125  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
127  executor->getSessionLock());
128  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
129  }
130  return false;
131 }
132 } // namespace
133 
134 // For logging std::vector<std::string> row.
135 namespace boost {
136 namespace log {
137 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
138  out << '[';
139  for (size_t i = 0; i < row.size(); ++i) {
140  out << (i ? ", " : "") << row[i];
141  }
142  out << ']';
143  return out;
144 }
145 } // namespace log
146 } // namespace boost
147 
148 namespace import_export {
149 
150 using FieldNameToIndexMapType = std::map<std::string, size_t>;
151 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
152 using FeaturePtrVector = std::vector<Geospatial::GDAL::FeatureUqPtr>;
153 
154 #define DEBUG_TIMING false
155 #define DEBUG_RENDER_GROUP_ANALYZER 0
156 #define DEBUG_AWS_AUTHENTICATION 0
157 
158 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
159 
160 // only auto-promote polygon column type
161 // @TODO auto-promote linestring
162 static constexpr bool PROMOTE_POINT_TO_MULTIPOINT = false;
163 static constexpr bool PROMOTE_LINESTRING_TO_MULTILINESTRING = false;
164 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
165 
167 static std::map<std::string, ImportStatus> import_status_map;
168 
169 // max # rows to import
170 static const size_t kImportRowLimit = 10000;
171 
173  const TableDescriptor* t,
174  const std::string& f,
175  const CopyParams& p)
176  : Importer(new Loader(c, t), f, p) {}
177 
178 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
179  : DataStreamSink(p, f), loader(providedLoader) {
180  import_id = boost::filesystem::path(file_path).filename().string();
181  file_size = 0;
182  max_threads = 0;
183  p_file = nullptr;
184  buffer[0] = nullptr;
185  buffer[1] = nullptr;
186  // we may be overallocating a little more memory here due to dropping phy cols.
187  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
188  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
189  int i = 0;
190  bool has_array = false;
191  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
192  int skip_physical_cols = 0;
193  for (auto& p : loader->get_column_descs()) {
194  // phy geo columns can't be in input file
195  if (skip_physical_cols-- > 0) {
196  continue;
197  }
198  // neither are rowid or $deleted$
199  // note: columns can be added after rowid/$deleted$
200  if (p->isVirtualCol || p->isDeletedCol) {
201  continue;
202  }
203  skip_physical_cols = p->columnType.get_physical_cols();
204  if (p->columnType.get_type() == kARRAY) {
205  is_array.get()[i] = true;
206  has_array = true;
207  } else {
208  is_array.get()[i] = false;
209  }
210  ++i;
211  }
212  if (has_array) {
213  is_array_a = std::unique_ptr<bool[]>(is_array.release());
214  } else {
215  is_array_a = std::unique_ptr<bool[]>(nullptr);
216  }
217 }
218 
220  if (p_file != nullptr) {
221  fclose(p_file);
222  }
223  if (buffer[0] != nullptr) {
224  free(buffer[0]);
225  }
226  if (buffer[1] != nullptr) {
227  free(buffer[1]);
228  }
229 }
230 
231 ImportStatus Importer::get_import_status(const std::string& import_id) {
233  auto it = import_status_map.find(import_id);
234  if (it == import_status_map.end()) {
235  throw std::runtime_error("Import status not found for id: " + import_id);
236  }
237  return it->second;
238 }
239 
240 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
242  is.end = std::chrono::steady_clock::now();
243  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
245 }
246 
247 static const std::string trim_space(const char* field, const size_t len) {
248  size_t i = 0;
249  size_t j = len;
250  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
251  i++;
252  }
253  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
254  j--;
255  }
256  return std::string(field + i, j - i);
257 }
258 
259 namespace {
261  SQLTypes type;
262  if (ti.is_decimal()) {
263  type = decimal_to_int_type(ti);
264  } else if (ti.is_dict_encoded_string()) {
265  type = string_dict_to_int_type(ti);
266  } else {
267  type = ti.get_type();
268  }
269  return type;
270 }
271 } // namespace
272 
274  Datum d;
275  const auto type = get_type_for_datum(ti);
276  switch (type) {
277  case kBOOLEAN:
279  break;
280  case kBIGINT:
282  break;
283  case kINT:
285  break;
286  case kSMALLINT:
288  break;
289  case kTINYINT:
291  break;
292  case kFLOAT:
294  break;
295  case kDOUBLE:
297  break;
298  case kTIME:
299  case kTIMESTAMP:
300  case kDATE:
302  break;
303  case kPOINT:
304  case kMULTIPOINT:
305  case kLINESTRING:
306  case kMULTILINESTRING:
307  case kPOLYGON:
308  case kMULTIPOLYGON:
309  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
310  default:
311  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
312  }
313  return d;
314 }
315 
316 ArrayDatum StringToArray(const std::string& s,
317  const SQLTypeInfo& ti,
318  const CopyParams& copy_params) {
319  SQLTypeInfo elem_ti = ti.get_elem_type();
320  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
321  return ArrayDatum(0, NULL, true);
322  }
323  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
324  LOG(WARNING) << "Malformed array: " << s;
325  return ArrayDatum(0, NULL, true);
326  }
327  std::vector<std::string> elem_strs;
328  size_t last = 1;
329  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
330  i = s.find(copy_params.array_delim, last)) {
331  elem_strs.push_back(s.substr(last, i - last));
332  last = i + 1;
333  }
334  if (last + 1 <= s.size()) {
335  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
336  }
337  if (elem_strs.size() == 1) {
338  auto str = elem_strs.front();
339  auto str_trimmed = trim_space(str.c_str(), str.length());
340  if (str_trimmed == "") {
341  elem_strs.clear(); // Empty array
342  }
343  }
344  if (!elem_ti.is_string()) {
345  size_t len = elem_strs.size() * elem_ti.get_size();
346  std::unique_ptr<int8_t, FreeDeleter> buf(
347  reinterpret_cast<int8_t*>(checked_malloc(len)));
348  int8_t* p = buf.get();
349  for (auto& es : elem_strs) {
350  auto e = trim_space(es.c_str(), es.length());
351  bool is_null = (e == copy_params.null_str) || e == "NULL";
352  if (!elem_ti.is_string() && e == "") {
353  is_null = true;
354  }
355  if (elem_ti.is_number() || elem_ti.is_time()) {
356  if (!isdigit(e[0]) && e[0] != '-') {
357  is_null = true;
358  }
359  }
360  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
361  p = append_datum(p, d, elem_ti);
362  CHECK(p);
363  }
364  return ArrayDatum(len, buf.release(), false);
365  }
366  // must not be called for array of strings
367  CHECK(false);
368  return ArrayDatum(0, NULL, true);
369 }
370 
372  SQLTypeInfo elem_ti = ti.get_elem_type();
373  auto len = ti.get_size();
374 
375  if (len > 0) {
376  // Compose a NULL fixlen array
377  int8_t* buf = (int8_t*)checked_malloc(len);
378  // First scalar is a NULL_ARRAY sentinel
379  Datum d = NullArrayDatum(elem_ti);
380  int8_t* p = append_datum(buf, d, elem_ti);
381  CHECK(p);
382  // Rest is filled with normal NULL sentinels
383  Datum d0 = NullDatum(elem_ti);
384  while ((p - buf) < len) {
385  p = append_datum(p, d0, elem_ti);
386  CHECK(p);
387  }
388  CHECK((p - buf) == len);
389  return ArrayDatum(len, buf, true);
390  }
391  // NULL varlen array
392  return ArrayDatum(0, NULL, true);
393 }
394 
396  return NullArray(ti);
397 }
398 
400  const SQLTypeInfo& geo_ti) {
401  if (geo_ti.get_compression() == kENCODING_GEOINT) {
402  CHECK(geo_ti.get_comp_param() == 32);
403  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
404  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
405  const size_t len = compressed_null_coords.size();
406  int8_t* buf = (int8_t*)checked_malloc(len);
407  memcpy(buf, compressed_null_coords.data(), len);
408  return ArrayDatum(len, buf, false);
409  }
410  auto modified_ti = coords_ti;
411  modified_ti.set_subtype(kDOUBLE);
413 }
414 
415 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
416  const auto& arr = datum.val.arr_val;
417  for (const auto& elem_datum : arr) {
418  string_vec.push_back(elem_datum.val.str_val);
419  }
420 }
421 
422 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
423  Datum d;
424  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
425  switch (type) {
426  case kBOOLEAN:
427  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
428  break;
429  case kBIGINT:
430  d.bigintval =
431  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
432  break;
433  case kINT:
434  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
435  break;
436  case kSMALLINT:
437  d.smallintval =
438  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
439  break;
440  case kTINYINT:
441  d.tinyintval =
442  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
443  break;
444  case kFLOAT:
445  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
446  break;
447  case kDOUBLE:
448  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
449  break;
450  case kTIME:
451  case kTIMESTAMP:
452  case kDATE:
453  d.bigintval =
454  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
455  break;
456  case kPOINT:
457  case kMULTIPOINT:
458  case kLINESTRING:
459  case kMULTILINESTRING:
460  case kPOLYGON:
461  case kMULTIPOLYGON:
462  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
463  default:
464  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
465  }
466  return d;
467 }
468 
469 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
470  SQLTypeInfo elem_ti = ti.get_elem_type();
471 
472  CHECK(!elem_ti.is_string());
473 
474  if (datum.is_null) {
475  return NullArray(ti);
476  }
477 
478  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
479  int8_t* buf = (int8_t*)checked_malloc(len);
480  int8_t* p = buf;
481  for (auto& e : datum.val.arr_val) {
482  p = append_datum(p, TDatumToDatum(e, elem_ti), elem_ti);
483  CHECK(p);
484  }
485 
486  return ArrayDatum(len, buf, false);
487 }
488 
489 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
491  std::vector<std::string_view> string_view_vec;
492  string_view_vec.reserve(string_vec.size());
493  for (const auto& str : string_vec) {
494  if (str.size() > StringDictionary::MAX_STRLEN) {
495  std::ostringstream oss;
496  oss << "while processing dictionary for column " << getColumnDesc()->columnName
497  << " a string was detected too long for encoding, string length = "
498  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
499  throw std::runtime_error(oss.str());
500  }
501  string_view_vec.push_back(str);
502  }
503  try {
504  switch (column_desc_->columnType.get_size()) {
505  case 1:
506  string_dict_i8_buffer_->resize(string_view_vec.size());
507  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
508  break;
509  case 2:
510  string_dict_i16_buffer_->resize(string_view_vec.size());
511  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
512  break;
513  case 4:
514  string_dict_i32_buffer_->resize(string_view_vec.size());
515  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
516  break;
517  default:
518  CHECK(false);
519  }
520  } catch (std::exception& e) {
521  std::ostringstream oss;
522  oss << "while processing dictionary for column " << getColumnDesc()->columnName
523  << " : " << e.what();
524  LOG(ERROR) << oss.str();
525  throw std::runtime_error(oss.str());
526  }
527 }
528 
530  const std::string_view val,
531  const bool is_null,
532  const CopyParams& copy_params,
533  const bool check_not_null) {
534  const auto type = cd->columnType.get_type();
535  switch (type) {
536  case kBOOLEAN: {
537  if (is_null) {
538  if (check_not_null && cd->columnType.get_notnull()) {
539  throw std::runtime_error("NULL for column " + cd->columnName);
540  }
542  } else {
543  auto ti = cd->columnType;
544  Datum d = StringToDatum(val, ti);
545  addBoolean(static_cast<int8_t>(d.boolval));
546  }
547  break;
548  }
549  case kTINYINT: {
550  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
551  auto ti = cd->columnType;
552  Datum d = StringToDatum(val, ti);
554  } else {
555  if (check_not_null && cd->columnType.get_notnull()) {
556  throw std::runtime_error("NULL for column " + cd->columnName);
557  }
559  }
560  break;
561  }
562  case kSMALLINT: {
563  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
564  auto ti = cd->columnType;
565  Datum d = StringToDatum(val, ti);
567  } else {
568  if (check_not_null && cd->columnType.get_notnull()) {
569  throw std::runtime_error("NULL for column " + cd->columnName);
570  }
572  }
573  break;
574  }
575  case kINT: {
576  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
577  auto ti = cd->columnType;
578  Datum d = StringToDatum(val, ti);
579  addInt(d.intval);
580  } else {
581  if (check_not_null && cd->columnType.get_notnull()) {
582  throw std::runtime_error("NULL for column " + cd->columnName);
583  }
585  }
586  break;
587  }
588  case kBIGINT: {
589  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
590  auto ti = cd->columnType;
591  Datum d = StringToDatum(val, ti);
592  addBigint(d.bigintval);
593  } else {
594  if (check_not_null && cd->columnType.get_notnull()) {
595  throw std::runtime_error("NULL for column " + cd->columnName);
596  }
598  }
599  break;
600  }
601  case kDECIMAL:
602  case kNUMERIC: {
603  if (!is_null) {
604  auto ti = cd->columnType;
605  Datum d = StringToDatum(val, ti);
606  DecimalOverflowValidator validator(ti);
607  validator.validate(d.bigintval);
608  addBigint(d.bigintval);
609  } else {
610  if (check_not_null && cd->columnType.get_notnull()) {
611  throw std::runtime_error("NULL for column " + cd->columnName);
612  }
614  }
615  break;
616  }
617  case kFLOAT:
618  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
619  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
620  } else {
621  if (check_not_null && cd->columnType.get_notnull()) {
622  throw std::runtime_error("NULL for column " + cd->columnName);
623  }
625  }
626  break;
627  case kDOUBLE:
628  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
629  addDouble(std::atof(std::string(val).c_str()));
630  } else {
631  if (check_not_null && cd->columnType.get_notnull()) {
632  throw std::runtime_error("NULL for column " + cd->columnName);
633  }
635  }
636  break;
637  case kTEXT:
638  case kVARCHAR:
639  case kCHAR: {
640  // @TODO(wei) for now, use empty string for nulls
641  if (is_null) {
642  if (check_not_null && cd->columnType.get_notnull()) {
643  throw std::runtime_error("NULL for column " + cd->columnName);
644  }
645  addString(std::string());
646  } else {
647  if (val.length() > cd->columnType.get_max_strlen()) {
648  throw std::runtime_error("String too long for column " + cd->columnName +
649  " was " + std::to_string(val.length()) + " max is " +
651  }
652  addString(val);
653  }
654  break;
655  }
656  case kTIME:
657  case kTIMESTAMP:
658  case kDATE:
659  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
660  SQLTypeInfo ti = cd->columnType;
661  Datum d = StringToDatum(val, ti);
662  addBigint(d.bigintval);
663  } else {
664  if (check_not_null && cd->columnType.get_notnull()) {
665  throw std::runtime_error("NULL for column " + cd->columnName);
666  }
668  }
669  break;
670  case kARRAY: {
671  if (check_not_null && is_null && cd->columnType.get_notnull()) {
672  throw std::runtime_error("NULL for column " + cd->columnName);
673  }
674  SQLTypeInfo ti = cd->columnType;
675  if (IS_STRING(ti.get_subtype())) {
676  std::vector<std::string> string_vec;
677  // Just parse string array, don't push it to buffer yet as we might throw
679  std::string(val), copy_params, string_vec);
680  if (!is_null) {
681  if (ti.get_size() > 0) {
682  auto sti = ti.get_elem_type();
683  size_t expected_size = ti.get_size() / sti.get_size();
684  size_t actual_size = string_vec.size();
685  if (actual_size != expected_size) {
686  throw std::runtime_error("Fixed length array column " + cd->columnName +
687  " expects " + std::to_string(expected_size) +
688  " values, received " +
689  std::to_string(actual_size));
690  }
691  }
692  addStringArray(string_vec);
693  } else {
694  addStringArray(std::nullopt);
695  }
696  } else {
697  if (!is_null) {
698  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
699  if (d.is_null) { // val could be "NULL"
700  addArray(NullArray(ti));
701  } else {
702  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
703  throw std::runtime_error("Fixed length array for column " + cd->columnName +
704  " has incorrect length: " + std::string(val));
705  }
706  addArray(d);
707  }
708  } else {
709  addArray(NullArray(ti));
710  }
711  }
712  break;
713  }
714  case kPOINT:
715  case kMULTIPOINT:
716  case kLINESTRING:
717  case kMULTILINESTRING:
718  case kPOLYGON:
719  case kMULTIPOLYGON:
720  addGeoString(val);
721  break;
722  default:
723  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
724  }
725 }
726 
728  const auto type = column_desc_->columnType.is_decimal()
731  switch (type) {
732  case kBOOLEAN:
733  bool_buffer_->pop_back();
734  break;
735  case kTINYINT:
736  tinyint_buffer_->pop_back();
737  break;
738  case kSMALLINT:
739  smallint_buffer_->pop_back();
740  break;
741  case kINT:
742  int_buffer_->pop_back();
743  break;
744  case kBIGINT:
745  bigint_buffer_->pop_back();
746  break;
747  case kFLOAT:
748  float_buffer_->pop_back();
749  break;
750  case kDOUBLE:
751  double_buffer_->pop_back();
752  break;
753  case kTEXT:
754  case kVARCHAR:
755  case kCHAR:
756  string_buffer_->pop_back();
757  break;
758  case kDATE:
759  case kTIME:
760  case kTIMESTAMP:
761  bigint_buffer_->pop_back();
762  break;
763  case kARRAY:
765  string_array_buffer_->pop_back();
766  } else {
767  array_buffer_->pop_back();
768  }
769  break;
770  case kPOINT:
771  case kMULTIPOINT:
772  case kLINESTRING:
773  case kMULTILINESTRING:
774  case kPOLYGON:
775  case kMULTIPOLYGON:
776  geo_string_buffer_->pop_back();
777  break;
778  default:
779  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
780  }
781 }
782 
783 struct GeoImportException : std::runtime_error {
784  using std::runtime_error::runtime_error;
785 };
786 
787 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
788 template <typename DATA_TYPE>
790  const ColumnDescriptor* cd,
791  const Array& array,
792  std::vector<DATA_TYPE>& buffer,
793  const ArraySliceRange& slice_range,
794  import_export::BadRowsTracker* const bad_rows_tracker) {
795  auto data =
796  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
797  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
798  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
799  if (bad_rows_tracker && cd->columnType.is_geometry()) {
800  f_add_geo_phy_cols = [&](const int64_t row) {
801  // Populate physical columns (ref. DBHandler::load_table)
802  std::vector<double> coords, bounds;
803  std::vector<int> ring_sizes, poly_rings;
804  SQLTypeInfo ti;
805  // replace any unexpected exception from getGeoColumns or other
806  // on this path with a GeoImportException so that we wont over
807  // push a null to the logical column...
808  try {
809  SQLTypeInfo import_ti{ti};
810  if (array.IsNull(row)) {
812  import_ti, coords, bounds, ring_sizes, poly_rings);
813  } else {
814  const bool validate_with_geos_if_available = false;
815  arrow_throw_if<GeoImportException>(
817  geo_string_buffer_->back(),
818  ti,
819  coords,
820  bounds,
821  ring_sizes,
822  poly_rings,
823  validate_with_geos_if_available),
824  error_context(cd, bad_rows_tracker) + "Invalid geometry");
825  arrow_throw_if<GeoImportException>(
826  cd->columnType.get_type() != ti.get_type(),
827  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
828  }
829  auto col_idx_workpad = col_idx; // what a pitfall!!
831  bad_rows_tracker->importer->getCatalog(),
832  cd,
834  col_idx_workpad,
835  coords,
836  bounds,
837  ring_sizes,
838  poly_rings);
839  } catch (GeoImportException&) {
840  throw;
841  } catch (std::runtime_error& e) {
842  throw GeoImportException(e.what());
843  } catch (const std::exception& e) {
844  throw GeoImportException(e.what());
845  } catch (...) {
846  throw GeoImportException("unknown exception");
847  }
848  };
849  }
850  auto f_mark_a_bad_row = [&](const auto row) {
851  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
852  bad_rows_tracker->rows.insert(row - slice_range.first);
853  };
854  buffer.reserve(slice_range.second - slice_range.first);
855  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
856  try {
857  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
858  f_add_geo_phy_cols(row);
859  } catch (GeoImportException&) {
860  f_mark_a_bad_row(row);
861  } catch (ArrowImporterException&) {
862  // trace bad rows of each column; otherwise rethrow.
863  if (bad_rows_tracker) {
864  *data << nullptr;
865  f_mark_a_bad_row(row);
866  } else {
867  throw;
868  }
869  }
870  }
871  return buffer.size();
872 }
873 
875  const Array& col,
876  const bool exact_type_match,
877  const ArraySliceRange& slice_range,
878  BadRowsTracker* const bad_rows_tracker) {
879  const auto type = cd->columnType.get_type();
880  if (cd->columnType.get_notnull()) {
881  // We can't have any null values for this column; to have them is an error
882  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
883  }
884 
885  switch (type) {
886  case kBOOLEAN:
887  if (exact_type_match) {
888  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
889  }
891  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
892  case kTINYINT:
893  if (exact_type_match) {
894  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
895  }
897  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
898  case kSMALLINT:
899  if (exact_type_match) {
900  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
901  }
903  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
904  case kINT:
905  if (exact_type_match) {
906  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
907  }
909  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
910  case kBIGINT:
911  case kNUMERIC:
912  case kDECIMAL:
913  if (exact_type_match) {
914  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
915  }
917  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
918  case kFLOAT:
919  if (exact_type_match) {
920  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
921  }
923  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
924  case kDOUBLE:
925  if (exact_type_match) {
926  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
927  }
929  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
930  case kTEXT:
931  case kVARCHAR:
932  case kCHAR:
933  if (exact_type_match) {
934  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
935  "Expected string type");
936  }
938  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
939  case kTIME:
940  if (exact_type_match) {
941  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
942  "Expected time32 or time64 type");
943  }
945  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
946  case kTIMESTAMP:
947  if (exact_type_match) {
948  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
949  }
951  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
952  case kDATE:
953  if (exact_type_match) {
954  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
955  "Expected date32 or date64 type");
956  }
958  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
959  case kPOINT:
960  case kMULTIPOINT:
961  case kLINESTRING:
962  case kMULTILINESTRING:
963  case kPOLYGON:
964  case kMULTIPOLYGON:
965  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
966  "Expected string type");
968  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
969  case kARRAY:
970  throw std::runtime_error("Arrow array appends not yet supported");
971  default:
972  throw std::runtime_error("Invalid Type");
973  }
974 }
975 
976 // this is exclusively used by load_table_binary_columnar
977 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
978  size_t dataSize = 0;
979  if (cd->columnType.get_notnull()) {
980  // We can't have any null values for this column; to have them is an error
981  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
982  throw std::runtime_error("NULL for column " + cd->columnName);
983  }
984  }
985 
986  switch (cd->columnType.get_type()) {
987  case kBOOLEAN: {
988  dataSize = col.data.int_col.size();
989  bool_buffer_->reserve(dataSize);
990  for (size_t i = 0; i < dataSize; i++) {
991  if (col.nulls[i]) {
993  } else {
994  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
995  }
996  }
997  break;
998  }
999  case kTINYINT: {
1000  dataSize = col.data.int_col.size();
1001  tinyint_buffer_->reserve(dataSize);
1002  for (size_t i = 0; i < dataSize; i++) {
1003  if (col.nulls[i]) {
1005  } else {
1006  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1007  }
1008  }
1009  break;
1010  }
1011  case kSMALLINT: {
1012  dataSize = col.data.int_col.size();
1013  smallint_buffer_->reserve(dataSize);
1014  for (size_t i = 0; i < dataSize; i++) {
1015  if (col.nulls[i]) {
1017  } else {
1018  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1019  }
1020  }
1021  break;
1022  }
1023  case kINT: {
1024  dataSize = col.data.int_col.size();
1025  int_buffer_->reserve(dataSize);
1026  for (size_t i = 0; i < dataSize; i++) {
1027  if (col.nulls[i]) {
1029  } else {
1030  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1031  }
1032  }
1033  break;
1034  }
1035  case kBIGINT:
1036  case kNUMERIC:
1037  case kDECIMAL: {
1038  dataSize = col.data.int_col.size();
1039  bigint_buffer_->reserve(dataSize);
1040  for (size_t i = 0; i < dataSize; i++) {
1041  if (col.nulls[i]) {
1043  } else {
1044  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1045  }
1046  }
1047  break;
1048  }
1049  case kFLOAT: {
1050  dataSize = col.data.real_col.size();
1051  float_buffer_->reserve(dataSize);
1052  for (size_t i = 0; i < dataSize; i++) {
1053  if (col.nulls[i]) {
1054  float_buffer_->push_back(NULL_FLOAT);
1055  } else {
1056  float_buffer_->push_back((float)col.data.real_col[i]);
1057  }
1058  }
1059  break;
1060  }
1061  case kDOUBLE: {
1062  dataSize = col.data.real_col.size();
1063  double_buffer_->reserve(dataSize);
1064  for (size_t i = 0; i < dataSize; i++) {
1065  if (col.nulls[i]) {
1066  double_buffer_->push_back(NULL_DOUBLE);
1067  } else {
1068  double_buffer_->push_back((double)col.data.real_col[i]);
1069  }
1070  }
1071  break;
1072  }
1073  case kTEXT:
1074  case kVARCHAR:
1075  case kCHAR: {
1076  // TODO: for now, use empty string for nulls
1077  dataSize = col.data.str_col.size();
1078  string_buffer_->reserve(dataSize);
1079  for (size_t i = 0; i < dataSize; i++) {
1080  if (col.nulls[i]) {
1081  string_buffer_->push_back(std::string());
1082  } else {
1083  string_buffer_->push_back(col.data.str_col[i]);
1084  }
1085  }
1086  break;
1087  }
1088  case kTIME:
1089  case kTIMESTAMP:
1090  case kDATE: {
1091  dataSize = col.data.int_col.size();
1092  bigint_buffer_->reserve(dataSize);
1093  for (size_t i = 0; i < dataSize; i++) {
1094  if (col.nulls[i]) {
1096  } else {
1097  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1098  }
1099  }
1100  break;
1101  }
1102  case kPOINT:
1103  case kMULTIPOINT:
1104  case kLINESTRING:
1105  case kMULTILINESTRING:
1106  case kPOLYGON:
1107  case kMULTIPOLYGON: {
1108  dataSize = col.data.str_col.size();
1109  geo_string_buffer_->reserve(dataSize);
1110  for (size_t i = 0; i < dataSize; i++) {
1111  if (col.nulls[i]) {
1112  // TODO: add support for NULL geo
1113  geo_string_buffer_->push_back(std::string());
1114  } else {
1115  geo_string_buffer_->push_back(col.data.str_col[i]);
1116  }
1117  }
1118  break;
1119  }
1120  case kARRAY: {
1121  dataSize = col.data.arr_col.size();
1122  if (IS_STRING(cd->columnType.get_subtype())) {
1123  for (size_t i = 0; i < dataSize; i++) {
1124  OptionalStringVector& string_vec = addStringArray();
1125  if (!col.nulls[i]) {
1126  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1127  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1128  string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1129  }
1130  }
1131  }
1132  } else {
1133  auto elem_ti = cd->columnType.get_subtype();
1134  switch (elem_ti) {
1135  case kBOOLEAN: {
1136  for (size_t i = 0; i < dataSize; i++) {
1137  if (col.nulls[i]) {
1139  } else {
1140  size_t len = col.data.arr_col[i].data.int_col.size();
1141  size_t byteSize = len * sizeof(int8_t);
1142  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1143  int8_t* p = buf;
1144  for (size_t j = 0; j < len; ++j) {
1145  // Explicitly checking the item for null because
1146  // casting null value (-128) to bool results
1147  // incorrect value 1.
1148  if (col.data.arr_col[i].nulls[j]) {
1149  *p = static_cast<int8_t>(
1151  } else {
1152  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1153  }
1154  p += sizeof(bool);
1155  }
1156  addArray(ArrayDatum(byteSize, buf, false));
1157  }
1158  }
1159  break;
1160  }
1161  case kTINYINT: {
1162  for (size_t i = 0; i < dataSize; i++) {
1163  if (col.nulls[i]) {
1165  } else {
1166  size_t len = col.data.arr_col[i].data.int_col.size();
1167  size_t byteSize = len * sizeof(int8_t);
1168  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1169  int8_t* p = buf;
1170  for (size_t j = 0; j < len; ++j) {
1171  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1172  p += sizeof(int8_t);
1173  }
1174  addArray(ArrayDatum(byteSize, buf, false));
1175  }
1176  }
1177  break;
1178  }
1179  case kSMALLINT: {
1180  for (size_t i = 0; i < dataSize; i++) {
1181  if (col.nulls[i]) {
1183  } else {
1184  size_t len = col.data.arr_col[i].data.int_col.size();
1185  size_t byteSize = len * sizeof(int16_t);
1186  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1187  int8_t* p = buf;
1188  for (size_t j = 0; j < len; ++j) {
1189  *(int16_t*)p =
1190  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1191  p += sizeof(int16_t);
1192  }
1193  addArray(ArrayDatum(byteSize, buf, false));
1194  }
1195  }
1196  break;
1197  }
1198  case kINT: {
1199  for (size_t i = 0; i < dataSize; i++) {
1200  if (col.nulls[i]) {
1202  } else {
1203  size_t len = col.data.arr_col[i].data.int_col.size();
1204  size_t byteSize = len * sizeof(int32_t);
1205  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1206  int8_t* p = buf;
1207  for (size_t j = 0; j < len; ++j) {
1208  *(int32_t*)p =
1209  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1210  p += sizeof(int32_t);
1211  }
1212  addArray(ArrayDatum(byteSize, buf, false));
1213  }
1214  }
1215  break;
1216  }
1217  case kBIGINT:
1218  case kNUMERIC:
1219  case kDECIMAL: {
1220  for (size_t i = 0; i < dataSize; i++) {
1221  if (col.nulls[i]) {
1223  } else {
1224  size_t len = col.data.arr_col[i].data.int_col.size();
1225  size_t byteSize = len * sizeof(int64_t);
1226  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1227  int8_t* p = buf;
1228  for (size_t j = 0; j < len; ++j) {
1229  *(int64_t*)p =
1230  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1231  p += sizeof(int64_t);
1232  }
1233  addArray(ArrayDatum(byteSize, buf, false));
1234  }
1235  }
1236  break;
1237  }
1238  case kFLOAT: {
1239  for (size_t i = 0; i < dataSize; i++) {
1240  if (col.nulls[i]) {
1242  } else {
1243  size_t len = col.data.arr_col[i].data.real_col.size();
1244  size_t byteSize = len * sizeof(float);
1245  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1246  int8_t* p = buf;
1247  for (size_t j = 0; j < len; ++j) {
1248  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1249  p += sizeof(float);
1250  }
1251  addArray(ArrayDatum(byteSize, buf, false));
1252  }
1253  }
1254  break;
1255  }
1256  case kDOUBLE: {
1257  for (size_t i = 0; i < dataSize; i++) {
1258  if (col.nulls[i]) {
1260  } else {
1261  size_t len = col.data.arr_col[i].data.real_col.size();
1262  size_t byteSize = len * sizeof(double);
1263  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1264  int8_t* p = buf;
1265  for (size_t j = 0; j < len; ++j) {
1266  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1267  p += sizeof(double);
1268  }
1269  addArray(ArrayDatum(byteSize, buf, false));
1270  }
1271  }
1272  break;
1273  }
1274  case kTIME:
1275  case kTIMESTAMP:
1276  case kDATE: {
1277  for (size_t i = 0; i < dataSize; i++) {
1278  if (col.nulls[i]) {
1280  } else {
1281  size_t len = col.data.arr_col[i].data.int_col.size();
1282  size_t byteWidth = sizeof(int64_t);
1283  size_t byteSize = len * byteWidth;
1284  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1285  int8_t* p = buf;
1286  for (size_t j = 0; j < len; ++j) {
1287  *reinterpret_cast<int64_t*>(p) =
1288  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1289  p += sizeof(int64_t);
1290  }
1291  addArray(ArrayDatum(byteSize, buf, false));
1292  }
1293  }
1294  break;
1295  }
1296  default:
1297  throw std::runtime_error("Invalid Array Type");
1298  }
1299  }
1300  break;
1301  }
1302  default:
1303  throw std::runtime_error("Invalid Type");
1304  }
1305  return dataSize;
1306 }
1307 
1309  const TDatum& datum,
1310  const bool is_null) {
1311  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1312  : cd->columnType.get_type();
1313  switch (type) {
1314  case kBOOLEAN: {
1315  if (is_null) {
1316  if (cd->columnType.get_notnull()) {
1317  throw std::runtime_error("NULL for column " + cd->columnName);
1318  }
1320  } else {
1321  addBoolean((int8_t)datum.val.int_val);
1322  }
1323  break;
1324  }
1325  case kTINYINT:
1326  if (!is_null) {
1327  addTinyint((int8_t)datum.val.int_val);
1328  } else {
1329  if (cd->columnType.get_notnull()) {
1330  throw std::runtime_error("NULL for column " + cd->columnName);
1331  }
1333  }
1334  break;
1335  case kSMALLINT:
1336  if (!is_null) {
1337  addSmallint((int16_t)datum.val.int_val);
1338  } else {
1339  if (cd->columnType.get_notnull()) {
1340  throw std::runtime_error("NULL for column " + cd->columnName);
1341  }
1343  }
1344  break;
1345  case kINT:
1346  if (!is_null) {
1347  addInt((int32_t)datum.val.int_val);
1348  } else {
1349  if (cd->columnType.get_notnull()) {
1350  throw std::runtime_error("NULL for column " + cd->columnName);
1351  }
1353  }
1354  break;
1355  case kBIGINT:
1356  if (!is_null) {
1357  addBigint(datum.val.int_val);
1358  } else {
1359  if (cd->columnType.get_notnull()) {
1360  throw std::runtime_error("NULL for column " + cd->columnName);
1361  }
1363  }
1364  break;
1365  case kFLOAT:
1366  if (!is_null) {
1367  addFloat((float)datum.val.real_val);
1368  } else {
1369  if (cd->columnType.get_notnull()) {
1370  throw std::runtime_error("NULL for column " + cd->columnName);
1371  }
1373  }
1374  break;
1375  case kDOUBLE:
1376  if (!is_null) {
1377  addDouble(datum.val.real_val);
1378  } else {
1379  if (cd->columnType.get_notnull()) {
1380  throw std::runtime_error("NULL for column " + cd->columnName);
1381  }
1383  }
1384  break;
1385  case kTEXT:
1386  case kVARCHAR:
1387  case kCHAR: {
1388  // @TODO(wei) for now, use empty string for nulls
1389  if (is_null) {
1390  if (cd->columnType.get_notnull()) {
1391  throw std::runtime_error("NULL for column " + cd->columnName);
1392  }
1393  addString(std::string());
1394  } else {
1395  addString(datum.val.str_val);
1396  }
1397  break;
1398  }
1399  case kTIME:
1400  case kTIMESTAMP:
1401  case kDATE: {
1402  if (!is_null) {
1403  addBigint(datum.val.int_val);
1404  } else {
1405  if (cd->columnType.get_notnull()) {
1406  throw std::runtime_error("NULL for column " + cd->columnName);
1407  }
1409  }
1410  break;
1411  }
1412  case kARRAY:
1413  if (is_null && cd->columnType.get_notnull()) {
1414  throw std::runtime_error("NULL for column " + cd->columnName);
1415  }
1416  if (IS_STRING(cd->columnType.get_subtype())) {
1417  OptionalStringVector& string_vec = addStringArray();
1418  addBinaryStringArray(datum, *string_vec);
1419  } else {
1420  if (!is_null) {
1421  addArray(TDatumToArrayDatum(datum, cd->columnType));
1422  } else {
1424  }
1425  }
1426  break;
1427  case kPOINT:
1428  case kMULTIPOINT:
1429  case kLINESTRING:
1430  case kMULTILINESTRING:
1431  case kPOLYGON:
1432  case kMULTIPOLYGON:
1433  if (is_null) {
1434  if (cd->columnType.get_notnull()) {
1435  throw std::runtime_error("NULL for column " + cd->columnName);
1436  }
1437  addGeoString(std::string());
1438  } else {
1439  addGeoString(datum.val.str_val);
1440  }
1441  break;
1442  default:
1443  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1444  }
1445 }
1446 
1447 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1448  bool is_null = !cd->default_value.has_value();
1449  CHECK(!(is_null && cd->columnType.get_notnull()));
1450  const auto type = cd->columnType.get_type();
1451  auto ti = cd->columnType;
1452  auto val = cd->default_value.value_or("NULL");
1453  CopyParams cp;
1454  switch (type) {
1455  case kBOOLEAN: {
1456  if (!is_null) {
1457  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1458  } else {
1459  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1460  }
1461  break;
1462  }
1463  case kTINYINT: {
1464  if (!is_null) {
1465  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1466  } else {
1468  }
1469  break;
1470  }
1471  case kSMALLINT: {
1472  if (!is_null) {
1473  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1474  } else {
1475  smallint_buffer_->resize(num_rows,
1477  }
1478  break;
1479  }
1480  case kINT: {
1481  if (!is_null) {
1482  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1483  } else {
1484  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1485  }
1486  break;
1487  }
1488  case kBIGINT: {
1489  if (!is_null) {
1490  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1491  } else {
1493  }
1494  break;
1495  }
1496  case kDECIMAL:
1497  case kNUMERIC: {
1498  if (!is_null) {
1499  const auto converted_decimal_value = convert_decimal_value_to_scale(
1500  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1501  bigint_buffer_->resize(num_rows, converted_decimal_value);
1502  } else {
1504  }
1505  break;
1506  }
1507  case kFLOAT:
1508  if (!is_null) {
1509  float_buffer_->resize(num_rows,
1510  static_cast<float>(std::atof(std::string(val).c_str())));
1511  } else {
1512  float_buffer_->resize(num_rows, NULL_FLOAT);
1513  }
1514  break;
1515  case kDOUBLE:
1516  if (!is_null) {
1517  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1518  } else {
1519  double_buffer_->resize(num_rows, NULL_DOUBLE);
1520  }
1521  break;
1522  case kTEXT:
1523  case kVARCHAR:
1524  case kCHAR: {
1525  if (is_null) {
1526  string_buffer_->resize(num_rows, "");
1527  } else {
1528  if (val.length() > ti.get_max_strlen()) {
1529  throw std::runtime_error("String too long for column " + cd->columnName +
1530  " was " + std::to_string(val.length()) + " max is " +
1531  std::to_string(ti.get_max_strlen()));
1532  }
1533  string_buffer_->resize(num_rows, val);
1534  }
1535  break;
1536  }
1537  case kTIME:
1538  case kTIMESTAMP:
1539  case kDATE:
1540  if (!is_null) {
1541  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1542  } else {
1544  }
1545  break;
1546  case kARRAY: {
1547  if (IS_STRING(ti.get_subtype())) {
1548  std::vector<std::string> string_vec;
1549  // Just parse string array, don't push it to buffer yet as we might throw
1551  std::string(val), cp, string_vec);
1552  if (!is_null) {
1553  // TODO: add support for NULL string arrays
1554  if (ti.get_size() > 0) {
1555  auto sti = ti.get_elem_type();
1556  size_t expected_size = ti.get_size() / sti.get_size();
1557  size_t actual_size = string_vec.size();
1558  if (actual_size != expected_size) {
1559  throw std::runtime_error("Fixed length array column " + cd->columnName +
1560  " expects " + std::to_string(expected_size) +
1561  " values, received " +
1562  std::to_string(actual_size));
1563  }
1564  }
1565  string_array_buffer_->resize(num_rows, string_vec);
1566  } else {
1567  if (ti.get_size() > 0) {
1568  // TODO: remove once NULL fixlen arrays are allowed
1569  throw std::runtime_error("Fixed length array column " + cd->columnName +
1570  " currently cannot accept NULL arrays");
1571  }
1572  // TODO: add support for NULL string arrays, replace with addStringArray(),
1573  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1574  string_array_buffer_->resize(num_rows, string_vec);
1575  }
1576  } else {
1577  if (!is_null) {
1578  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1579  if (d.is_null) { // val could be "NULL"
1580  array_buffer_->resize(num_rows, NullArray(ti));
1581  } else {
1582  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1583  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1584  " has incorrect length: " + std::string(val));
1585  }
1586  array_buffer_->resize(num_rows, d);
1587  }
1588  } else {
1589  array_buffer_->resize(num_rows, NullArray(ti));
1590  }
1591  }
1592  break;
1593  }
1594  case kPOINT:
1595  case kMULTIPOINT:
1596  case kLINESTRING:
1597  case kMULTILINESTRING:
1598  case kPOLYGON:
1599  case kMULTIPOLYGON:
1600  geo_string_buffer_->resize(num_rows, val);
1601  break;
1602  default:
1603  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1604  << type;
1605  }
1606 }
1607 
1608 bool importGeoFromLonLat(double lon,
1609  double lat,
1610  std::vector<double>& coords,
1611  std::vector<double>& bounds,
1612  SQLTypeInfo& ti) {
1613  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1614  return false;
1615  }
1616  if (ti.transforms()) {
1617  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1618  if (!pt.transform(ti)) {
1619  return false;
1620  }
1621  pt.getColumns(coords);
1622  } else {
1623  coords.push_back(lon);
1624  coords.push_back(lat);
1625  }
1626  // in case of promotion to MULTIPOINT
1627  CHECK_EQ(coords.size(), 2u);
1628  bounds.reserve(4);
1629  bounds.push_back(coords[0]);
1630  bounds.push_back(coords[1]);
1631  bounds.push_back(coords[0]);
1632  bounds.push_back(coords[1]);
1633  return true;
1634 }
1635 
1637  const Catalog_Namespace::Catalog& catalog,
1638  const ColumnDescriptor* cd,
1639  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1640  size_t& col_idx,
1641  std::vector<double>& coords,
1642  std::vector<double>& bounds,
1643  std::vector<int>& ring_sizes,
1644  std::vector<int>& poly_rings,
1645  const bool force_null) {
1646  const auto col_ti = cd->columnType;
1647  const auto col_type = col_ti.get_type();
1648  auto columnId = cd->columnId;
1649  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1650  bool is_null_geo = false;
1651  bool is_null_point = false;
1652  if (!col_ti.get_notnull()) {
1653  // Check for NULL geo
1654  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1655  is_null_point = true;
1656  coords.clear();
1657  }
1658  is_null_geo = coords.empty();
1659  if (is_null_point) {
1660  coords.push_back(NULL_ARRAY_DOUBLE);
1661  coords.push_back(NULL_DOUBLE);
1662  // Treating POINT coords as notnull, need to store actual encoding
1663  // [un]compressed+[not]null
1664  is_null_geo = false;
1665  }
1666  }
1667  if (force_null) {
1668  is_null_geo = true;
1669  }
1670  TDatum tdd_coords;
1671  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1672  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1673  // in a fixlen array, compressed and uncompressed.
1674  if (!is_null_geo) {
1675  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1676  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677  for (auto cc : compressed_coords) {
1678  tdd_coords.val.arr_val.emplace_back();
1679  tdd_coords.val.arr_val.back().val.int_val = cc;
1680  }
1681  }
1682  tdd_coords.is_null = is_null_geo;
1683  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1684 
1685  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1686  // Create [linest]ring_sizes array value and add it to the physical column
1687  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1688  TDatum tdd_ring_sizes;
1689  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1690  if (!is_null_geo) {
1691  for (auto ring_size : ring_sizes) {
1692  tdd_ring_sizes.val.arr_val.emplace_back();
1693  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1694  }
1695  }
1696  tdd_ring_sizes.is_null = is_null_geo;
1697  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1698  }
1699 
1700  if (col_type == kMULTIPOLYGON) {
1701  // Create poly_rings array value and add it to the physical column
1702  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  TDatum tdd_poly_rings;
1704  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1705  if (!is_null_geo) {
1706  for (auto num_rings : poly_rings) {
1707  tdd_poly_rings.val.arr_val.emplace_back();
1708  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1709  }
1710  }
1711  tdd_poly_rings.is_null = is_null_geo;
1712  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1713  }
1714 
1715  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1716  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1717  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1718  TDatum tdd_bounds;
1719  tdd_bounds.val.arr_val.reserve(bounds.size());
1720  if (!is_null_geo) {
1721  for (auto b : bounds) {
1722  tdd_bounds.val.arr_val.emplace_back();
1723  tdd_bounds.val.arr_val.back().val.real_val = b;
1724  }
1725  }
1726  tdd_bounds.is_null = is_null_geo;
1727  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1728  }
1729 }
1730 
1732  const Catalog_Namespace::Catalog& catalog,
1733  const ColumnDescriptor* cd,
1734  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1735  size_t& col_idx,
1736  std::vector<std::vector<double>>& coords_column,
1737  std::vector<std::vector<double>>& bounds_column,
1738  std::vector<std::vector<int>>& ring_sizes_column,
1739  std::vector<std::vector<int>>& poly_rings_column) {
1740  const auto col_ti = cd->columnType;
1741  const auto col_type = col_ti.get_type();
1742  auto columnId = cd->columnId;
1743 
1744  auto coords_row_count = coords_column.size();
1745  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto& coords : coords_column) {
1747  bool is_null_geo = false;
1748  bool is_null_point = false;
1749  if (!col_ti.get_notnull()) {
1750  // Check for NULL geo
1751  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1752  is_null_point = true;
1753  coords.clear();
1754  }
1755  is_null_geo = coords.empty();
1756  if (is_null_point) {
1757  coords.push_back(NULL_ARRAY_DOUBLE);
1758  coords.push_back(NULL_DOUBLE);
1759  // Treating POINT coords as notnull, need to store actual encoding
1760  // [un]compressed+[not]null
1761  is_null_geo = false;
1762  }
1763  }
1764  std::vector<TDatum> td_coords_data;
1765  if (!is_null_geo) {
1766  std::vector<uint8_t> compressed_coords =
1767  Geospatial::compress_coords(coords, col_ti);
1768  for (auto const& cc : compressed_coords) {
1769  TDatum td_byte;
1770  td_byte.val.int_val = cc;
1771  td_coords_data.push_back(td_byte);
1772  }
1773  }
1774  TDatum tdd_coords;
1775  tdd_coords.val.arr_val = td_coords_data;
1776  tdd_coords.is_null = is_null_geo;
1777  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1778  }
1779  col_idx++;
1780 
1781  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1782  if (ring_sizes_column.size() != coords_row_count) {
1783  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1784  }
1785  // Create [linest[ring_sizes array value and add it to the physical column
1786  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1787  for (auto const& ring_sizes : ring_sizes_column) {
1788  bool is_null_geo = false;
1789  if (!col_ti.get_notnull()) {
1790  // Check for NULL geo
1791  is_null_geo = ring_sizes.empty();
1792  }
1793  std::vector<TDatum> td_ring_sizes;
1794  for (auto const& ring_size : ring_sizes) {
1795  TDatum td_ring_size;
1796  td_ring_size.val.int_val = ring_size;
1797  td_ring_sizes.push_back(td_ring_size);
1798  }
1799  TDatum tdd_ring_sizes;
1800  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801  tdd_ring_sizes.is_null = is_null_geo;
1802  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1803  }
1804  col_idx++;
1805  }
1806 
1807  if (col_type == kMULTIPOLYGON) {
1808  if (poly_rings_column.size() != coords_row_count) {
1809  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1810  }
1811  // Create poly_rings array value and add it to the physical column
1812  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1813  for (auto const& poly_rings : poly_rings_column) {
1814  bool is_null_geo = false;
1815  if (!col_ti.get_notnull()) {
1816  // Check for NULL geo
1817  is_null_geo = poly_rings.empty();
1818  }
1819  std::vector<TDatum> td_poly_rings;
1820  for (auto const& num_rings : poly_rings) {
1821  TDatum td_num_rings;
1822  td_num_rings.val.int_val = num_rings;
1823  td_poly_rings.push_back(td_num_rings);
1824  }
1825  TDatum tdd_poly_rings;
1826  tdd_poly_rings.val.arr_val = td_poly_rings;
1827  tdd_poly_rings.is_null = is_null_geo;
1828  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1829  }
1830  col_idx++;
1831  }
1832 
1833  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1834  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1835  if (bounds_column.size() != coords_row_count) {
1836  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1837  }
1838  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1839  for (auto const& bounds : bounds_column) {
1840  bool is_null_geo = false;
1841  if (!col_ti.get_notnull()) {
1842  // Check for NULL geo
1843  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1844  }
1845  std::vector<TDatum> td_bounds_data;
1846  for (auto const& b : bounds) {
1847  TDatum td_double;
1848  td_double.val.real_val = b;
1849  td_bounds_data.push_back(td_double);
1850  }
1851  TDatum tdd_bounds;
1852  tdd_bounds.val.arr_val = td_bounds_data;
1853  tdd_bounds.is_null = is_null_geo;
1854  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1855  }
1856  col_idx++;
1857  }
1858 }
1859 
1860 namespace {
1861 
1862 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1863  const std::list<const ColumnDescriptor*>& col_descs) {
1864  // validate the columns
1865  // for now we can only explode into a single destination column
1866  // which must be of the child type (POLYGON, LINESTRING, POINT)
1867  int collection_col_idx = -1;
1868  int col_idx = 0;
1869  std::string collection_col_name;
1870  SQLTypes collection_child_type = kNULLT;
1871  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1872  auto const& cd = *cd_it;
1873  auto const col_type = cd->columnType.get_type();
1874  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1875  if (collection_col_idx >= 0) {
1876  throw std::runtime_error(
1877  "Explode Collections: Found more than one destination column");
1878  }
1879  collection_col_idx = col_idx;
1880  collection_child_type = col_type;
1881  collection_col_name = cd->columnName;
1882  }
1883  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1884  ++cd_it;
1885  }
1886  col_idx++;
1887  }
1888  if (collection_col_idx < 0) {
1889  throw std::runtime_error(
1890  "Explode Collections: Failed to find a supported column type to explode "
1891  "into");
1892  }
1893  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1894 }
1895 
1897  OGRGeometry* ogr_geometry,
1898  SQLTypes collection_child_type,
1899  const std::string& collection_col_name,
1900  size_t row_or_feature_idx,
1901  std::function<void(OGRGeometry*)> execute_import_lambda) {
1902  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1903  bool is_collection = false;
1904  switch (collection_child_type) {
1905  case kPOINT:
1906  switch (ogr_geometry_type) {
1907  case wkbMultiPoint:
1908  is_collection = true;
1909  break;
1910  case wkbPoint:
1911  break;
1912  default:
1913  throw std::runtime_error(
1914  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1915  }
1916  break;
1917  case kLINESTRING:
1918  switch (ogr_geometry_type) {
1919  case wkbMultiLineString:
1920  is_collection = true;
1921  break;
1922  case wkbLineString:
1923  break;
1924  default:
1925  throw std::runtime_error(
1926  "Explode Collections: Source geo type must be MULTILINESTRING or "
1927  "LINESTRING");
1928  }
1929  break;
1930  case kPOLYGON:
1931  switch (ogr_geometry_type) {
1932  case wkbMultiPolygon:
1933  is_collection = true;
1934  break;
1935  case wkbPolygon:
1936  break;
1937  default:
1938  throw std::runtime_error(
1939  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1940  }
1941  break;
1942  default:
1943  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1944  }
1945 
1946  int64_t us = 0LL;
1947 
1948  // explode or just import
1949  if (is_collection) {
1950  // cast to collection
1951  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1952  CHECK(collection_geometry);
1953 
1954 #if LOG_EXPLODE_COLLECTIONS
1955  // log number of children
1956  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1957  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1958  << " child rows";
1959 #endif
1960 
1961  // loop over children
1962  uint32_t child_geometry_count = 0;
1963  auto child_geometry_it = collection_geometry->begin();
1964  while (child_geometry_it != collection_geometry->end()) {
1965  // get and import this child
1966  OGRGeometry* import_geometry = *child_geometry_it;
1968  [&] { execute_import_lambda(import_geometry); });
1969 
1970  // next child
1971  child_geometry_it++;
1972  child_geometry_count++;
1973  }
1974  } else {
1975  // import non-collection row just once
1977  [&] { execute_import_lambda(ogr_geometry); });
1978  }
1979 
1980  // done
1981  return us;
1982 }
1983 
1984 } // namespace
1985 
1987  int thread_id,
1988  Importer* importer,
1989  std::unique_ptr<char[]> scratch_buffer,
1990  size_t begin_pos,
1991  size_t end_pos,
1992  size_t total_size,
1993  size_t first_row_index_this_buffer,
1994  const Catalog_Namespace::SessionInfo* session_info,
1995  Executor* executor) {
1996  ImportStatus thread_import_status;
1997  int64_t total_get_row_time_us = 0;
1998  int64_t total_str_to_val_time_us = 0;
1999  auto query_session = session_info ? session_info->get_session_id() : "";
2000  CHECK(scratch_buffer);
2001  auto buffer = scratch_buffer.get();
2002  auto load_ms = measure<>::execution([]() {});
2003 
2004  thread_import_status.thread_id = thread_id;
2005 
2006  auto ms = measure<>::execution([&]() {
2007  const CopyParams& copy_params = importer->get_copy_params();
2008  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2009  size_t begin =
2010  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2011  const char* thread_buf = buffer + begin_pos + begin;
2012  const char* thread_buf_end = buffer + end_pos;
2013  const char* buf_end = buffer + total_size;
2014  bool try_single_thread = false;
2015  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2016  importer->get_import_buffers(thread_id);
2018  int phys_cols = 0;
2019  int point_cols = 0;
2020  for (const auto cd : col_descs) {
2021  const auto& col_ti = cd->columnType;
2022  phys_cols += col_ti.get_physical_cols();
2023  if (cd->columnType.get_type() == kPOINT ||
2024  cd->columnType.get_type() == kMULTIPOINT) {
2025  point_cols++;
2026  }
2027  }
2028  auto num_cols = col_descs.size() - phys_cols;
2029  for (const auto& p : import_buffers) {
2030  p->clear();
2031  }
2032  std::vector<std::string_view> row;
2033  size_t row_index_plus_one = 0;
2034  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2035  row.clear();
2036  std::vector<std::unique_ptr<char[]>>
2037  tmp_buffers; // holds string w/ removed escape chars, etc
2038  row_index_plus_one++;
2039  if (DEBUG_TIMING) {
2042  thread_buf_end,
2043  buf_end,
2044  copy_params,
2045  importer->get_is_array(),
2046  row,
2047  tmp_buffers,
2048  try_single_thread,
2049  true);
2050  });
2051  total_get_row_time_us += us;
2052  } else {
2054  thread_buf_end,
2055  buf_end,
2056  copy_params,
2057  importer->get_is_array(),
2058  row,
2059  tmp_buffers,
2060  try_single_thread,
2061  true);
2062  }
2063  // Each POINT could consume two separate coords instead of a single WKT
2064  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2065  thread_import_status.rows_rejected++;
2066  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2067  << row.size() << "): " << shared::printContainer(row);
2068  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2069  break;
2070  }
2071  continue;
2072  }
2073 
2074  //
2075  // lambda for importing a row (perhaps multiple times if exploding a collection)
2076  //
2077 
2078  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2079  size_t import_idx = 0;
2080  size_t col_idx = 0;
2081  try {
2082  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2083  auto cd = *cd_it;
2084  const auto& col_ti = cd->columnType;
2085 
2086  bool is_null =
2087  ImportHelpers::is_null_datum(row[import_idx], copy_params.null_str);
2088  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2089  // So initially nullness may be missed and not passed to add_value,
2090  // which then might also check and still decide it's actually a NULL, e.g.
2091  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2092  // So "NULL" is not recognized as NULL but then it's not recognized as
2093  // a valid kINT, so it's a NULL after all.
2094  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2095 
2096  // Treating empty as NULL
2097  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2098  is_null = true;
2099  }
2100  if (!cd->columnType.is_string() && !copy_params.trim_spaces) {
2101  // everything but strings should be always trimmed
2102  row[import_idx] = sv_strip(row[import_idx]);
2103  }
2104 
2105  if (col_ti.get_physical_cols() == 0) {
2106  // not geo
2107 
2108  import_buffers[col_idx]->add_value(
2109  cd, row[import_idx], is_null, copy_params);
2110 
2111  // next
2112  ++import_idx;
2113  ++col_idx;
2114  } else {
2115  // geo
2116 
2117  // store null string in the base column
2118  import_buffers[col_idx]->add_value(
2119  cd, copy_params.null_str, true, copy_params);
2120 
2121  // WKT from string we're not storing
2122  auto const& geo_string = row[import_idx];
2123 
2124  // next
2125  ++import_idx;
2126  ++col_idx;
2127 
2128  SQLTypes col_type = col_ti.get_type();
2129  CHECK(IS_GEO(col_type));
2130 
2131  std::vector<double> coords;
2132  std::vector<double> bounds;
2133  std::vector<int> ring_sizes;
2134  std::vector<int> poly_rings;
2135 
2136  // if this is a POINT or MULTIPOINT column, and the field is not null, and
2137  // looks like a scalar numeric value (and not a hex blob) attempt to import
2138  // two columns as lon/lat (or lat/lon)
2139  if ((col_type == kPOINT || col_type == kMULTIPOINT) && !is_null &&
2140  geo_string.size() > 0 &&
2141  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2142  geo_string[0] == '-') &&
2143  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2144  double lon = std::atof(std::string(geo_string).c_str());
2145  double lat = NAN;
2146  auto lat_str = row[import_idx];
2147  ++import_idx;
2148  if (lat_str.size() > 0 &&
2149  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2150  lat = std::atof(std::string(lat_str).c_str());
2151  }
2152  // Swap coordinates if this table uses a reverse order: lat/lon
2153  if (!copy_params.lonlat) {
2154  std::swap(lat, lon);
2155  }
2156  // TODO: should check if POINT/MULTIPOINT column should have been declared
2157  // with SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2158  // throw std::runtime_error("POINT column " + cd->columnName + " is
2159  // not WGS84, cannot insert lon/lat");
2160  // }
2161  SQLTypeInfo import_ti{col_ti};
2162  if (copy_params.source_type ==
2164  import_ti.get_output_srid() == 4326) {
2165  auto srid0 = copy_params.source_srid;
2166  if (srid0 > 0) {
2167  // srid0 -> 4326 transform is requested on import
2168  import_ti.set_input_srid(srid0);
2169  }
2170  }
2171  if (!importGeoFromLonLat(lon, lat, coords, bounds, import_ti)) {
2172  throw std::runtime_error(
2173  "Cannot read lon/lat to insert into POINT/MULTIPOINT column " +
2174  cd->columnName);
2175  }
2176  } else {
2177  // import it
2178  SQLTypeInfo import_ti{col_ti};
2179  if (copy_params.source_type ==
2181  import_ti.get_output_srid() == 4326) {
2182  auto srid0 = copy_params.source_srid;
2183  if (srid0 > 0) {
2184  // srid0 -> 4326 transform is requested on import
2185  import_ti.set_input_srid(srid0);
2186  }
2187  }
2188  if (is_null) {
2189  if (col_ti.get_notnull()) {
2190  throw std::runtime_error("NULL geo for column " + cd->columnName);
2191  }
2193  import_ti, coords, bounds, ring_sizes, poly_rings);
2194  } else {
2195  if (import_geometry) {
2196  // geometry already exploded
2198  import_geometry,
2199  import_ti,
2200  coords,
2201  bounds,
2202  ring_sizes,
2203  poly_rings,
2204  copy_params.geo_validate_geometry)) {
2205  std::string msg =
2206  "Failed to extract valid geometry from exploded row " +
2207  std::to_string(first_row_index_this_buffer +
2208  row_index_plus_one) +
2209  " for column " + cd->columnName;
2210  throw std::runtime_error(msg);
2211  }
2212  } else {
2213  // extract geometry directly from WKT
2215  std::string(geo_string),
2216  import_ti,
2217  coords,
2218  bounds,
2219  ring_sizes,
2220  poly_rings,
2221  copy_params.geo_validate_geometry)) {
2222  std::string msg = "Failed to extract valid geometry from row " +
2223  std::to_string(first_row_index_this_buffer +
2224  row_index_plus_one) +
2225  " for column " + cd->columnName;
2226  throw std::runtime_error(msg);
2227  }
2228  }
2229 
2230  // validate types
2231  if (!geo_promoted_type_match(import_ti.get_type(), col_type)) {
2232  throw std::runtime_error(
2233  "Imported geometry doesn't match the type of column " +
2234  cd->columnName);
2235  }
2236  }
2237  }
2238 
2239  // import extracted geo
2241  cd,
2242  import_buffers,
2243  col_idx,
2244  coords,
2245  bounds,
2246  ring_sizes,
2247  poly_rings);
2248 
2249  // skip remaining physical columns
2250  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2251  ++cd_it;
2252  }
2253  }
2254  }
2255  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2256  check_session_interrupted(query_session, executor))) {
2257  thread_import_status.load_failed = true;
2258  thread_import_status.load_msg =
2259  "Table load was cancelled via Query Interrupt";
2260  return;
2261  }
2262  thread_import_status.rows_completed++;
2263  } catch (const std::exception& e) {
2264  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2265  import_buffers[col_idx_to_pop]->pop_value();
2266  }
2267  thread_import_status.rows_rejected++;
2268  LOG(ERROR) << "Input exception thrown: " << e.what()
2269  << ". Row discarded. Data: " << shared::printContainer(row);
2270  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2271  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2272  thread_import_status.load_failed = true;
2273  thread_import_status.load_msg =
2274  "Load was cancelled due to max reject rows being reached";
2275  }
2276  }
2277  }; // End of lambda
2278 
2279  if (copy_params.geo_explode_collections) {
2280  // explode and import
2281  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2282  explode_collections_step1(col_descs);
2283  // pull out the collection WKT or WKB hex
2284  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2285  auto const& collection_geo_string = row[collection_col_idx];
2286  // convert to OGR
2287  OGRGeometry* ogr_geometry = nullptr;
2288  ScopeGuard destroy_ogr_geometry = [&] {
2289  if (ogr_geometry) {
2290  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2291  }
2292  };
2294  std::string(collection_geo_string), copy_params.geo_validate_geometry);
2295  // do the explode and import
2296  us = explode_collections_step2(ogr_geometry,
2297  collection_child_type,
2298  collection_col_name,
2299  first_row_index_this_buffer + row_index_plus_one,
2300  execute_import_row);
2301  } else {
2302  // import non-collection row just once
2304  [&] { execute_import_row(nullptr); });
2305  }
2306 
2307  if (thread_import_status.load_failed) {
2308  break;
2309  }
2310  } // end thread
2311  total_str_to_val_time_us += us;
2312  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2313  load_ms = measure<>::execution([&]() {
2314  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2315  });
2316  }
2317  }); // end execution
2318 
2319  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2320  thread_import_status.rows_completed > 0) {
2321  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2322  << thread_import_status.rows_completed << " rows inserted in "
2323  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2324  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2325  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2326  << "sec" << std::endl;
2327  }
2328 
2329  return thread_import_status;
2330 }
2331 
2332 class ColumnNotGeoError : public std::runtime_error {
2333  public:
2334  ColumnNotGeoError(const std::string& column_name)
2335  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2336 };
2337 
2339  int thread_id,
2340  Importer* importer,
2341  OGRCoordinateTransformation* coordinate_transformation,
2342  const FeaturePtrVector& features,
2343  size_t firstFeature,
2344  size_t numFeatures,
2345  const FieldNameToIndexMapType& fieldNameToIndexMap,
2346  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2347  const Catalog_Namespace::SessionInfo* session_info,
2348  Executor* executor,
2349  const MetadataColumnInfos& metadata_column_infos) {
2350  ImportStatus thread_import_status;
2351  const CopyParams& copy_params = importer->get_copy_params();
2352  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2353  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2354  importer->get_import_buffers(thread_id);
2355  auto query_session = session_info ? session_info->get_session_id() : "";
2356  for (const auto& p : import_buffers) {
2357  p->clear();
2358  }
2359 
2360  auto convert_timer = timer_start();
2361 
2362  // for all the features in this chunk...
2363  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2364  // ignore null features
2365  if (!features[iFeature]) {
2366  continue;
2367  }
2368 
2369  // get this feature's geometry
2370  // for geodatabase, we need to consider features with no geometry
2371  // as we still want to create a table, even if it has no geo column
2372  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2373  if (pGeometry && coordinate_transformation) {
2374  pGeometry->transform(coordinate_transformation);
2375  }
2376 
2377  //
2378  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2379  //
2380 
2381  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2382  size_t col_idx = 0;
2383  try {
2384  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2385  check_session_interrupted(query_session, executor))) {
2386  thread_import_status.load_failed = true;
2387  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2388  throw QueryExecutionError(ErrorCode::INTERRUPTED);
2389  }
2390 
2391  uint32_t field_column_count{0u};
2392  uint32_t metadata_column_count{0u};
2393 
2394  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2395  auto cd = *cd_it;
2396 
2397  // is this a geo column?
2398  const auto& col_ti = cd->columnType;
2399  if (col_ti.is_geometry()) {
2400  // Note that this assumes there is one and only one geo column in the
2401  // table. Currently, the importer only supports reading a single
2402  // geospatial feature from an input shapefile / geojson file, but this
2403  // code will need to be modified if that changes
2404  SQLTypes col_type = col_ti.get_type();
2405 
2406  // store null string in the base column
2407  import_buffers[col_idx]->add_value(
2408  cd, copy_params.null_str, true, copy_params);
2409  ++col_idx;
2410 
2411  // the data we now need to extract for the other columns
2412  std::vector<double> coords;
2413  std::vector<double> bounds;
2414  std::vector<int> ring_sizes;
2415  std::vector<int> poly_rings;
2416 
2417  // extract it
2418  SQLTypeInfo import_ti{col_ti};
2419  bool is_null_geo = !import_geometry;
2420  if (is_null_geo) {
2421  if (col_ti.get_notnull()) {
2422  throw std::runtime_error("NULL geo for column " + cd->columnName);
2423  }
2425  import_ti, coords, bounds, ring_sizes, poly_rings);
2426  } else {
2428  import_geometry,
2429  import_ti,
2430  coords,
2431  bounds,
2432  ring_sizes,
2433  poly_rings,
2434  copy_params.geo_validate_geometry)) {
2435  std::string msg = "Failed to extract valid geometry from feature " +
2436  std::to_string(firstFeature + iFeature + 1) +
2437  " for column " + cd->columnName;
2438  throw std::runtime_error(msg);
2439  }
2440 
2441  // validate types
2442  if (!geo_promoted_type_match(import_ti.get_type(), col_type)) {
2443  throw std::runtime_error(
2444  "Imported geometry doesn't match the type of column " +
2445  cd->columnName);
2446  }
2447  }
2448 
2449  // create coords array value and add it to the physical column
2450  ++cd_it;
2451  auto cd_coords = *cd_it;
2452  std::vector<TDatum> td_coord_data;
2453  if (!is_null_geo) {
2454  std::vector<uint8_t> compressed_coords =
2455  Geospatial::compress_coords(coords, col_ti);
2456  for (auto cc : compressed_coords) {
2457  TDatum td_byte;
2458  td_byte.val.int_val = cc;
2459  td_coord_data.push_back(td_byte);
2460  }
2461  }
2462  TDatum tdd_coords;
2463  tdd_coords.val.arr_val = td_coord_data;
2464  tdd_coords.is_null = is_null_geo;
2465  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2466  ++col_idx;
2467 
2468  if (col_type == kMULTILINESTRING || col_type == kPOLYGON ||
2469  col_type == kMULTIPOLYGON) {
2470  // Create [linest]ring_sizes array value and add it to the physical column
2471  ++cd_it;
2472  auto cd_ring_sizes = *cd_it;
2473  std::vector<TDatum> td_ring_sizes;
2474  if (!is_null_geo) {
2475  for (auto ring_size : ring_sizes) {
2476  TDatum td_ring_size;
2477  td_ring_size.val.int_val = ring_size;
2478  td_ring_sizes.push_back(td_ring_size);
2479  }
2480  }
2481  TDatum tdd_ring_sizes;
2482  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2483  tdd_ring_sizes.is_null = is_null_geo;
2484  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2485  ++col_idx;
2486  }
2487 
2488  if (col_type == kMULTIPOLYGON) {
2489  // Create poly_rings array value and add it to the physical column
2490  ++cd_it;
2491  auto cd_poly_rings = *cd_it;
2492  std::vector<TDatum> td_poly_rings;
2493  if (!is_null_geo) {
2494  for (auto num_rings : poly_rings) {
2495  TDatum td_num_rings;
2496  td_num_rings.val.int_val = num_rings;
2497  td_poly_rings.push_back(td_num_rings);
2498  }
2499  }
2500  TDatum tdd_poly_rings;
2501  tdd_poly_rings.val.arr_val = td_poly_rings;
2502  tdd_poly_rings.is_null = is_null_geo;
2503  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2504  ++col_idx;
2505  }
2506 
2507  if (col_type == kLINESTRING || col_type == kMULTILINESTRING ||
2508  col_type == kPOLYGON || col_type == kMULTIPOLYGON ||
2509  col_type == kMULTIPOINT) {
2510  // Create bounds array value and add it to the physical column
2511  ++cd_it;
2512  auto cd_bounds = *cd_it;
2513  std::vector<TDatum> td_bounds_data;
2514  if (!is_null_geo) {
2515  for (auto b : bounds) {
2516  TDatum td_double;
2517  td_double.val.real_val = b;
2518  td_bounds_data.push_back(td_double);
2519  }
2520  }
2521  TDatum tdd_bounds;
2522  tdd_bounds.val.arr_val = td_bounds_data;
2523  tdd_bounds.is_null = is_null_geo;
2524  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2525  ++col_idx;
2526  }
2527  } else if (field_column_count < fieldNameToIndexMap.size()) {
2528  //
2529  // field column
2530  //
2531  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2532  CHECK(cit != columnNameToSourceNameMap.end());
2533  auto const& field_name = cit->second;
2534 
2535  auto const fit = fieldNameToIndexMap.find(field_name);
2536  if (fit == fieldNameToIndexMap.end()) {
2537  throw ColumnNotGeoError(cd->columnName);
2538  }
2539 
2540  auto const& field_index = fit->second;
2541  CHECK(field_index < fieldNameToIndexMap.size());
2542 
2543  auto const& feature = features[iFeature];
2544 
2545  auto field_defn = feature->GetFieldDefnRef(field_index);
2546  CHECK(field_defn);
2547 
2548  // OGRFeature::GetFieldAsString() can only return 80 characters
2549  // so for array columns, we are obliged to fetch the actual values
2550  // and construct the concatenated string ourselves
2551 
2552  std::string value_string;
2553  int array_index = 0, array_size = 0;
2554 
2555  auto stringify_numeric_list = [&](auto* values) {
2556  value_string = "{";
2557  while (array_index < array_size) {
2558  auto separator = (array_index > 0) ? "," : "";
2559  value_string += separator + std::to_string(values[array_index]);
2560  array_index++;
2561  }
2562  value_string += "}";
2563  };
2564 
2565  auto field_type = field_defn->GetType();
2566  switch (field_type) {
2567  case OFTInteger:
2568  case OFTInteger64:
2569  case OFTReal:
2570  case OFTString:
2571  case OFTBinary:
2572  case OFTDate:
2573  case OFTTime:
2574  case OFTDateTime: {
2575  value_string = feature->GetFieldAsString(field_index);
2576  } break;
2577  case OFTIntegerList: {
2578  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2579  stringify_numeric_list(values);
2580  } break;
2581  case OFTInteger64List: {
2582  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2583  stringify_numeric_list(values);
2584  } break;
2585  case OFTRealList: {
2586  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2587  stringify_numeric_list(values);
2588  } break;
2589  case OFTStringList: {
2590  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2591  value_string = "{";
2592  if (array_of_strings) {
2593  while (auto* this_string = array_of_strings[array_index]) {
2594  auto separator = (array_index > 0) ? "," : "";
2595  value_string += separator + std::string(this_string);
2596  array_index++;
2597  }
2598  }
2599  value_string += "}";
2600  } break;
2601  default:
2602  throw std::runtime_error("Unsupported geo file field type (" +
2603  std::to_string(static_cast<int>(field_type)) +
2604  ")");
2605  }
2606 
2607  import_buffers[col_idx]->add_value(cd, value_string, false, copy_params);
2608  ++col_idx;
2609  field_column_count++;
2610  } else if (metadata_column_count < metadata_column_infos.size()) {
2611  //
2612  // metadata column
2613  //
2614  auto const& mci = metadata_column_infos[metadata_column_count];
2615  if (mci.column_descriptor.columnName != cd->columnName) {
2616  throw std::runtime_error("Metadata column name mismatch");
2617  }
2618  import_buffers[col_idx]->add_value(cd, mci.value, false, copy_params);
2619  ++col_idx;
2620  metadata_column_count++;
2621  } else {
2622  throw std::runtime_error("Column count mismatch");
2623  }
2624  }
2625  thread_import_status.rows_completed++;
2626  } catch (QueryExecutionError& e) {
2627  if (e.hasErrorCode(ErrorCode::INTERRUPTED)) {
2628  throw e;
2629  }
2630  } catch (ColumnNotGeoError& e) {
2631  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2632  throw std::runtime_error(e.what());
2633  } catch (const std::exception& e) {
2634  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2635  import_buffers[col_idx_to_pop]->pop_value();
2636  }
2637  thread_import_status.rows_rejected++;
2638  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2639  }
2640  };
2641 
2642  if (pGeometry && copy_params.geo_explode_collections) {
2643  // explode and import
2644  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2645  explode_collections_step1(col_descs);
2646  explode_collections_step2(pGeometry,
2647  collection_child_type,
2648  collection_col_name,
2649  firstFeature + iFeature + 1,
2650  execute_import_feature);
2651  } else {
2652  // import non-collection or null feature just once
2653  execute_import_feature(pGeometry);
2654  }
2655  } // end features
2656 
2657  float convert_s = TIMER_STOP(convert_timer);
2658 
2659  float load_s = 0.0f;
2660  if (thread_import_status.rows_completed > 0) {
2661  auto load_timer = timer_start();
2662  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2663  load_s = TIMER_STOP(load_timer);
2664  }
2665 
2666  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2667  LOG(INFO) << "DEBUG: Process " << convert_s << "s";
2668  LOG(INFO) << "DEBUG: Load " << load_s << "s";
2669  LOG(INFO) << "DEBUG: Total " << (convert_s + load_s) << "s";
2670  }
2671 
2672  thread_import_status.thread_id = thread_id;
2673 
2674  return thread_import_status;
2675 }
2676 
2678  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2679  size_t row_count,
2680  const Catalog_Namespace::SessionInfo* session_info) {
2681  return loadImpl(import_buffers, row_count, false, session_info);
2682 }
2683 
2684 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2685  size_t row_count,
2686  const Catalog_Namespace::SessionInfo* session_info) {
2687  return loadImpl(import_buffers, row_count, true, session_info);
2688 }
2689 
2690 namespace {
2691 
2692 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2693  const auto& ti = import_buffer.getTypeInfo();
2694  const int8_t* values_buffer{nullptr};
2695  if (ti.is_string()) {
2696  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2697  values_buffer = import_buffer.getStringDictBuffer();
2698  } else {
2699  values_buffer = import_buffer.getAsBytes();
2700  }
2701  CHECK(values_buffer);
2702  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2703  switch (logical_size) {
2704  case 1: {
2705  return values_buffer[index];
2706  }
2707  case 2: {
2708  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2709  }
2710  case 4: {
2711  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2712  }
2713  case 8: {
2714  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2715  }
2716  default:
2717  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2718  }
2719  UNREACHABLE();
2720  return 0;
2721 }
2722 
2723 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2724  const auto& ti = import_buffer.getTypeInfo();
2725  CHECK_EQ(kFLOAT, ti.get_type());
2726  const auto values_buffer = import_buffer.getAsBytes();
2727  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2728 }
2729 
2730 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2731  const auto& ti = import_buffer.getTypeInfo();
2732  CHECK_EQ(kDOUBLE, ti.get_type());
2733  const auto values_buffer = import_buffer.getAsBytes();
2734  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2735 }
2736 
2737 } // namespace
2738 
2739 void Loader::fillShardRow(const size_t row_index,
2740  OneShardBuffers& shard_output_buffers,
2741  const OneShardBuffers& import_buffers) {
2742  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2743  const auto& input_buffer = import_buffers[col_idx];
2744  const auto& col_ti = input_buffer->getTypeInfo();
2745  const auto type =
2746  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2747 
2748  switch (type) {
2749  case kBOOLEAN:
2750  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2751  break;
2752  case kTINYINT:
2753  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2754  break;
2755  case kSMALLINT:
2756  shard_output_buffers[col_idx]->addSmallint(
2757  int_value_at(*input_buffer, row_index));
2758  break;
2759  case kINT:
2760  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2761  break;
2762  case kBIGINT:
2763  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2764  break;
2765  case kFLOAT:
2766  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2767  break;
2768  case kDOUBLE:
2769  shard_output_buffers[col_idx]->addDouble(
2770  double_value_at(*input_buffer, row_index));
2771  break;
2772  case kTEXT:
2773  case kVARCHAR:
2774  case kCHAR: {
2775  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2776  shard_output_buffers[col_idx]->addString(
2777  (*input_buffer->getStringBuffer())[row_index]);
2778  break;
2779  }
2780  case kTIME:
2781  case kTIMESTAMP:
2782  case kDATE:
2783  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2784  break;
2785  case kARRAY:
2786  if (IS_STRING(col_ti.get_subtype())) {
2787  CHECK(input_buffer->getStringArrayBuffer());
2788  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2789  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2790  shard_output_buffers[col_idx]->addStringArray(input_arr);
2791  } else {
2792  shard_output_buffers[col_idx]->addArray(
2793  (*input_buffer->getArrayBuffer())[row_index]);
2794  }
2795  break;
2796  case kPOINT:
2797  case kMULTIPOINT:
2798  case kLINESTRING:
2799  case kMULTILINESTRING:
2800  case kPOLYGON:
2801  case kMULTIPOLYGON: {
2802  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2803  shard_output_buffers[col_idx]->addGeoString(
2804  (*input_buffer->getGeoStringBuffer())[row_index]);
2805  break;
2806  }
2807  default:
2808  CHECK(false);
2809  }
2810  }
2811 }
2812 
2814  std::vector<OneShardBuffers>& all_shard_import_buffers,
2815  std::vector<size_t>& all_shard_row_counts,
2816  const OneShardBuffers& import_buffers,
2817  const size_t row_count,
2818  const size_t shard_count,
2819  const Catalog_Namespace::SessionInfo* session_info) {
2820  int col_idx{0};
2821  const ColumnDescriptor* shard_col_desc{nullptr};
2822  for (const auto col_desc : column_descs_) {
2823  ++col_idx;
2824  if (col_idx == table_desc_->shardedColumnId) {
2825  shard_col_desc = col_desc;
2826  break;
2827  }
2828  }
2829  CHECK(shard_col_desc);
2830  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2831  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2832  const auto& shard_col_ti = shard_col_desc->columnType;
2833  CHECK(shard_col_ti.is_integer() ||
2834  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2835  shard_col_ti.is_time());
2836  if (shard_col_ti.is_string()) {
2837  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2838  CHECK(payloads_ptr);
2839  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2840  }
2841 
2842  for (size_t i = 0; i < row_count; ++i) {
2843  const size_t shard =
2844  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2845  auto& shard_output_buffers = all_shard_import_buffers[shard];
2846  fillShardRow(i, shard_output_buffers, import_buffers);
2847  ++all_shard_row_counts[shard];
2848  }
2849 }
2850 
2852  std::vector<OneShardBuffers>& all_shard_import_buffers,
2853  std::vector<size_t>& all_shard_row_counts,
2854  const OneShardBuffers& import_buffers,
2855  const size_t row_count,
2856  const size_t shard_count,
2857  const Catalog_Namespace::SessionInfo* session_info) {
2858  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2859  CHECK(shard_tds.size() == shard_count);
2860 
2861  for (size_t shard = 0; shard < shard_count; ++shard) {
2862  auto& shard_output_buffers = all_shard_import_buffers[shard];
2863  if (row_count != 0) {
2864  fillShardRow(0, shard_output_buffers, import_buffers);
2865  }
2866  // when replicating a column, row count of a shard == replicate count of the column
2867  // on the shard
2868  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2869  }
2870 }
2871 
2872 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2873  std::vector<size_t>& all_shard_row_counts,
2874  const OneShardBuffers& import_buffers,
2875  const size_t row_count,
2876  const size_t shard_count,
2877  const Catalog_Namespace::SessionInfo* session_info) {
2878  all_shard_row_counts.resize(shard_count);
2879  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2880  all_shard_import_buffers.emplace_back();
2881  for (const auto& typed_import_buffer : import_buffers) {
2882  all_shard_import_buffers.back().emplace_back(
2883  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2884  typed_import_buffer->getStringDictionary()));
2885  }
2886  }
2888  if (isAddingColumns()) {
2889  distributeToShardsNewColumns(all_shard_import_buffers,
2890  all_shard_row_counts,
2891  import_buffers,
2892  row_count,
2893  shard_count,
2894  session_info);
2895  } else {
2896  distributeToShardsExistingColumns(all_shard_import_buffers,
2897  all_shard_row_counts,
2898  import_buffers,
2899  row_count,
2900  shard_count,
2901  session_info);
2902  }
2903 }
2904 
2906  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2907  size_t row_count,
2908  bool checkpoint,
2909  const Catalog_Namespace::SessionInfo* session_info) {
2910  if (load_callback_) {
2911  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2912  return load_callback_(import_buffers, data_blocks, row_count);
2913  }
2914  if (table_desc_->nShards) {
2915  std::vector<OneShardBuffers> all_shard_import_buffers;
2916  std::vector<size_t> all_shard_row_counts;
2917  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2918  distributeToShards(all_shard_import_buffers,
2919  all_shard_row_counts,
2920  import_buffers,
2921  row_count,
2922  shard_tables.size(),
2923  session_info);
2924  bool success = true;
2925  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2926  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2927  all_shard_row_counts[shard_idx],
2928  shard_tables[shard_idx],
2929  checkpoint,
2930  session_info);
2931  }
2932  return success;
2933  }
2934  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
2935 }
2936 
2938  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2939  std::vector<DataBlockPtr> result(import_buffers.size());
2940  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2941  encoded_data_block_ptrs_futures;
2942  // make all async calls to string dictionary here and then continue execution
2943  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2944  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2945  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2946  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2947  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2948 
2949  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2950  buf_idx,
2951  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2952  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2953  return import_buffers[buf_idx]->getStringDictBuffer();
2954  })));
2955  }
2956  }
2957 
2958  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2959  DataBlockPtr p;
2960  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2961  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2962  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2963  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2964  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2965  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2966  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2967  p.stringsPtr = string_payload_ptr;
2968  } else {
2969  // This condition means we have column which is ENCODED string. We already made
2970  // Async request to gain the encoded integer values above so we should skip this
2971  // iteration and continue.
2972  continue;
2973  }
2974  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2975  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2976  p.stringsPtr = geo_payload_ptr;
2977  } else {
2978  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2979  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2980  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2981  import_buffers[buf_idx]->addDictEncodedStringArray(
2982  *import_buffers[buf_idx]->getStringArrayBuffer());
2983  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2984  } else {
2985  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2986  }
2987  }
2988  result[buf_idx] = p;
2989  }
2990 
2991  // wait for the async requests we made for string dictionary
2992  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2993  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2994  }
2995  return result;
2996 }
2997 
2999  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3000  size_t row_count,
3001  const TableDescriptor* shard_table,
3002  bool checkpoint,
3003  const Catalog_Namespace::SessionInfo* session_info) {
3004  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3006  ins_data.numRows = row_count;
3007  bool success = false;
3008  try {
3009  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3010  } catch (std::exception& e) {
3011  std::ostringstream oss;
3012  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3013  << e.what();
3014 
3015  LOG(ERROR) << oss.str();
3016  error_msg_ = oss.str();
3017  return success;
3018  }
3019  if (isAddingColumns()) {
3020  // when Adding columns we omit any columns except the ones being added
3021  ins_data.columnIds.clear();
3022  ins_data.is_default.clear();
3023  for (auto& buffer : import_buffers) {
3024  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3025  ins_data.is_default.push_back(true);
3026  }
3027  } else {
3028  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3029  }
3030  // release loader_lock so that in InsertOrderFragmenter::insertDat
3031  // we can have multiple threads sort/shuffle InsertData
3032  loader_lock.unlock();
3033  success = true;
3034  {
3035  try {
3036  if (checkpoint) {
3037  shard_table->fragmenter->insertData(ins_data);
3038  } else {
3039  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3040  }
3041  } catch (std::exception& e) {
3042  std::ostringstream oss;
3043  oss << "Fragmenter Insert Exception when processing Table "
3044  << shard_table->tableName << " issue was " << e.what();
3045 
3046  LOG(ERROR) << oss.str();
3047  loader_lock.lock();
3048  error_msg_ = oss.str();
3049  success = false;
3050  }
3051  }
3052  return success;
3053 }
3054 
3055 void Loader::dropColumns(const std::vector<int>& columnIds) {
3056  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3057  if (table_desc_->nShards) {
3059  }
3060  for (auto table_desc : table_descs) {
3061  table_desc->fragmenter->dropColumns(columnIds);
3062  }
3063 }
3064 
3068  for (auto cd : column_descs_) {
3069  insert_data_.columnIds.push_back(cd->columnId);
3070  if (cd->columnType.get_compression() == kENCODING_DICT) {
3071  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3072  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3073  CHECK(dd);
3074  dict_map_[cd->columnId] = dd->stringDict.get();
3075  }
3076  }
3077  insert_data_.numRows = 0;
3078 }
3079 
3082  split_raw_data();
3084 }
3085 
3087  const std::string& file_path,
3088  const bool decompressed,
3089  const Catalog_Namespace::SessionInfo* session_info) {
3090  // we do not check interrupt status for this detection
3091  if (!p_file) {
3092  p_file = fopen(file_path.c_str(), "rb");
3093  }
3094  if (!p_file) {
3095  throw std::runtime_error("failed to open file '" + file_path +
3096  "': " + strerror(errno));
3097  }
3098 
3099  // somehow clang does not support ext/stdio_filebuf.h, so
3100  // need to diy readline with customized copy_params.line_delim...
3101  std::string line;
3102  line.reserve(1 * 1024 * 1024);
3103  auto end_time = std::chrono::steady_clock::now() +
3104  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3105  try {
3106  while (!feof(p_file)) {
3107  int c;
3108  size_t n = 0;
3109  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3110  if (n++ >= line.capacity()) {
3111  break;
3112  }
3113  line += c;
3114  }
3115  if (0 == n) {
3116  break;
3117  }
3118  // remember the first line, which is possibly a header line, to
3119  // ignore identical header line(s) in 2nd+ files of a archive;
3120  // otherwise, 2nd+ header may be mistaken as an all-string row
3121  // and so be final column types.
3122  if (line1.empty()) {
3123  line1 = line;
3124  } else if (line == line1) {
3125  line.clear();
3126  continue;
3127  }
3128 
3129  raw_data += line;
3131  line.clear();
3133  if (std::chrono::steady_clock::now() > end_time) {
3135  // stop import when row limit reached
3136  break;
3137  }
3138  }
3139  }
3140  } catch (std::exception& e) {
3141  }
3142 
3144  import_status_.load_failed = true;
3145 
3146  fclose(p_file);
3147  p_file = nullptr;
3148  return import_status_;
3149 }
3150 
3152  // this becomes analogous to Importer::import()
3153  (void)DataStreamSink::archivePlumber(nullptr);
3154 }
3155 
3157  if (copy_params.delimiter == '\0') {
3158  copy_params.delimiter = ',';
3159  if (boost::filesystem::path(file_path).extension() == ".tsv") {
3160  copy_params.delimiter = '\t';
3161  }
3162  }
3163 }
3164 
3166  const char* buf = raw_data.c_str();
3167  const char* buf_end = buf + raw_data.size();
3168  bool try_single_thread = false;
3169  for (const char* p = buf; p < buf_end; p++) {
3170  std::vector<std::string> row;
3171  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3173  buf_end,
3174  buf_end,
3175  copy_params,
3176  nullptr,
3177  row,
3178  tmp_buffers,
3179  try_single_thread,
3180  true);
3181  raw_rows.push_back(row);
3182  if (try_single_thread) {
3183  break;
3184  }
3185  }
3186  if (try_single_thread) {
3187  copy_params.threads = 1;
3188  raw_rows.clear();
3189  for (const char* p = buf; p < buf_end; p++) {
3190  std::vector<std::string> row;
3191  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3193  buf_end,
3194  buf_end,
3195  copy_params,
3196  nullptr,
3197  row,
3198  tmp_buffers,
3199  try_single_thread,
3200  true);
3201  raw_rows.push_back(row);
3202  }
3203  }
3204 }
3205 
3206 template <class T>
3207 bool try_cast(const std::string& str) {
3208  try {
3209  boost::lexical_cast<T>(str);
3210  } catch (const boost::bad_lexical_cast& e) {
3211  return false;
3212  }
3213  return true;
3214 }
3215 
3216 SQLTypes Detector::detect_sqltype(const std::string& str) {
3217  SQLTypes type = kTEXT;
3218  if (try_cast<double>(str)) {
3219  type = kDOUBLE;
3220  /*if (try_cast<bool>(str)) {
3221  type = kBOOLEAN;
3222  }*/
3223  if (try_cast<int16_t>(str)) {
3224  type = kSMALLINT;
3225  } else if (try_cast<int32_t>(str)) {
3226  type = kINT;
3227  } else if (try_cast<int64_t>(str)) {
3228  type = kBIGINT;
3229  } else if (try_cast<float>(str)) {
3230  type = kFLOAT;
3231  }
3232  }
3233 
3234  // check for geo types
3235  if (type == kTEXT) {
3236  // convert to upper case
3237  std::string str_upper_case = str;
3239  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3240 
3241  // then test for leading words
3242  if (str_upper_case.find("POINT") == 0) {
3243  // promote column type?
3245  } else if (str_upper_case.find("MULTIPOINT") == 0) {
3246  type = kMULTIPOINT;
3247  } else if (str_upper_case.find("LINESTRING") == 0) {
3248  // promote column type?
3250  } else if (str_upper_case.find("MULTILINESTRING") == 0) {
3251  type = kMULTILINESTRING;
3252  } else if (str_upper_case.find("POLYGON") == 0) {
3253  // promote column type?
3255  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3256  type = kMULTIPOLYGON;
3257  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3258  std::string::npos &&
3259  (str_upper_case.size() % 2) == 0) {
3260  // simple hex blob (two characters per byte, not uu-encode or base64)
3261  if (str_upper_case.size() >= 10) {
3262  // match WKB blobs for supported geometry types
3263  // the first byte specifies if the data is big-endian or little-endian
3264  // the next four bytes are the geometry type (1 = POINT etc.)
3265  // @TODO support eWKB, which has extra bits set in the geometry type
3266  auto first_five_bytes = str_upper_case.substr(0, 10);
3267  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3268  type = kPOINT;
3269  } else if (first_five_bytes == "0000000004" || first_five_bytes == "0104000000") {
3270  type = kMULTIPOINT;
3271  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3272  type = kLINESTRING;
3273  } else if (first_five_bytes == "0000000005" || first_five_bytes == "0105000000") {
3274  type = kMULTILINESTRING;
3275  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3276  type = kPOLYGON;
3277  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3278  type = kMULTIPOLYGON;
3279  } else {
3280  // unsupported WKB type
3281  return type;
3282  }
3283  } else {
3284  // too short to be WKB
3285  return type;
3286  }
3287  }
3288  }
3289 
3290  // check for time types
3291  if (type == kTEXT) {
3292  // This won't match unix timestamp, since floats and ints were checked above.
3293  if (dateTimeParseOptional<kTIME>(str, 0)) {
3294  type = kTIME;
3295  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3296  type = kTIMESTAMP;
3297  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3298  type = kDATE;
3299  }
3300  }
3301 
3302  return type;
3303 }
3304 
3305 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3306  std::vector<SQLTypes> types(row.size());
3307  for (size_t i = 0; i < row.size(); i++) {
3308  types[i] = detect_sqltype(row[i]);
3309  }
3310  return types;
3311 }
3312 
3314  static std::array<int, kSQLTYPE_LAST> typeorder;
3315  typeorder[kCHAR] = 0;
3316  typeorder[kBOOLEAN] = 2;
3317  typeorder[kSMALLINT] = 3;
3318  typeorder[kINT] = 4;
3319  typeorder[kBIGINT] = 5;
3320  typeorder[kFLOAT] = 6;
3321  typeorder[kDOUBLE] = 7;
3322  typeorder[kTIMESTAMP] = 8;
3323  typeorder[kTIME] = 9;
3324  typeorder[kDATE] = 10;
3325  typeorder[kPOINT] = 11;
3326  typeorder[kMULTIPOINT] = 11;
3327  typeorder[kLINESTRING] = 11;
3328  typeorder[kMULTILINESTRING] = 11;
3329  typeorder[kPOLYGON] = 11;
3330  typeorder[kMULTIPOLYGON] = 11;
3331  typeorder[kTEXT] = 12;
3332 
3333  // note: b < a instead of a < b because the map is ordered most to least restrictive
3334  return typeorder[b] < typeorder[a];
3335 }
3336 
3339  best_encodings =
3340  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3341  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3342  switch (copy_params.has_header) {
3344  has_headers = detect_headers(head_types, best_sqltypes);
3345  if (has_headers) {
3347  } else {
3349  }
3350  break;
3352  has_headers = false;
3353  break;
3355  has_headers = true;
3356  break;
3357  }
3358 }
3359 
3362 }
3363 
3364 std::vector<SQLTypes> Detector::find_best_sqltypes(
3365  const std::vector<std::vector<std::string>>& raw_rows,
3366  const CopyParams& copy_params) {
3367  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3368 }
3369 
3370 std::vector<SQLTypes> Detector::find_best_sqltypes(
3371  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3372  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3373  const CopyParams& copy_params) {
3374  if (raw_rows.size() < 1) {
3375  throw std::runtime_error("No rows found in: " +
3376  boost::filesystem::path(file_path).stem().string());
3377  }
3378  auto end_time = std::chrono::steady_clock::now() + timeout;
3379  size_t num_cols = raw_rows.front().size();
3380  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3381  std::vector<size_t> non_null_col_counts(num_cols, 0);
3382  for (auto row = row_begin; row != row_end; row++) {
3383  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3384  best_types.push_back(kCHAR);
3385  non_null_col_counts.push_back(0);
3386  }
3387  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3388  // do not count nulls
3389  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3390  continue;
3391  }
3392  SQLTypes t = detect_sqltype(row->at(col_idx));
3393  non_null_col_counts[col_idx]++;
3394  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3395  best_types[col_idx] = t;
3396  }
3397  }
3398  if (std::chrono::steady_clock::now() > end_time) {
3399  break;
3400  }
3401  }
3402  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3403  // if we don't have any non-null values for this column make it text to be
3404  // safe b/c that is least restrictive type
3405  if (non_null_col_counts[col_idx] == 0) {
3406  best_types[col_idx] = kTEXT;
3407  }
3408  }
3409 
3410  return best_types;
3411 }
3412 
3413 std::vector<EncodingType> Detector::find_best_encodings(
3414  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3415  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3416  const std::vector<SQLTypes>& best_types) {
3417  if (raw_rows.size() < 1) {
3418  throw std::runtime_error("No rows found in: " +
3419  boost::filesystem::path(file_path).stem().string());
3420  }
3421  size_t num_cols = best_types.size();
3422  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3423  std::vector<size_t> num_rows_per_col(num_cols, 1);
3424  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3425  for (auto row = row_begin; row != row_end; row++) {
3426  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3427  if (IS_STRING(best_types[col_idx])) {
3428  count_set[col_idx].insert(row->at(col_idx));
3429  num_rows_per_col[col_idx]++;
3430  }
3431  }
3432  }
3433  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3434  if (IS_STRING(best_types[col_idx])) {
3435  float uniqueRatio =
3436  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3437  if (uniqueRatio < 0.75) {
3438  best_encodes[col_idx] = kENCODING_DICT;
3439  }
3440  }
3441  }
3442  return best_encodes;
3443 }
3444 
3445 // detect_headers returns true if:
3446 // - all elements of the first argument are kTEXT
3447 // - there is at least one instance where tail_types is more restrictive than head_types
3448 // (ie, not kTEXT)
3449 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3450  const std::vector<SQLTypes>& tail_types) {
3451  if (head_types.size() != tail_types.size()) {
3452  return false;
3453  }
3454  bool has_headers = false;
3455  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3456  if (head_types[col_idx] != kTEXT) {
3457  return false;
3458  }
3459  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3460  }
3461  return has_headers;
3462 }
3463 
3464 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3465 #if defined(ENABLE_IMPORT_PARQUET)
3466  if (data_preview_.has_value()) {
3467  return data_preview_.value().sample_rows;
3468  } else
3469 #endif
3470  {
3471  n = std::min(n, raw_rows.size());
3472  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3473  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3474  raw_rows.begin() + n);
3475  return sample_rows;
3476  }
3477 }
3478 
3479 std::vector<std::string> Detector::get_headers() {
3480 #if defined(ENABLE_IMPORT_PARQUET)
3481  if (data_preview_.has_value()) {
3482  return data_preview_.value().column_names;
3483  } else
3484 #endif
3485  {
3486  std::vector<std::string> headers(best_sqltypes.size());
3487  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3488  if (has_headers && i < raw_rows[0].size()) {
3489  headers[i] = raw_rows[0][i];
3490  } else {
3491  headers[i] = "column_" + std::to_string(i + 1);
3492  }
3493  }
3494  return headers;
3495  }
3496 }
3497 
3498 std::vector<SQLTypeInfo> Detector::getBestColumnTypes() const {
3499 #if defined(ENABLE_IMPORT_PARQUET)
3500  if (data_preview_.has_value()) {
3501  return data_preview_.value().column_types;
3502  } else
3503 #endif
3504  {
3505  std::vector<SQLTypeInfo> types;
3506  CHECK_EQ(best_sqltypes.size(), best_encodings.size());
3507  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3508  types.emplace_back(best_sqltypes[i], false, best_encodings[i]);
3509  }
3510  return types;
3511  }
3512 }
3513 
3514 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3515  size_t row_count,
3516  const Catalog_Namespace::SessionInfo* session_info) {
3517  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3519  import_status_.load_failed = true;
3520  import_status_.load_msg = loader->getErrorMessage();
3521  }
3522 }
3523 
3525  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3526  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3529  // rollback to starting epoch - undo all the added records
3530  loader->setTableEpochs(table_epochs);
3531  } else {
3532  loader->checkpoint();
3533  }
3534  }
3535 
3536  if (loader->getTableDesc()->persistenceLevel ==
3537  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3538  // tables
3539  auto ms = measure<>::execution([&]() {
3541  if (!import_status_.load_failed) {
3542  for (auto& p : import_buffers_vec[0]) {
3543  if (!p->stringDictCheckpoint()) {
3544  LOG(ERROR) << "Checkpointing Dictionary for Column "
3545  << p->getColumnDesc()->columnName << " failed.";
3546  import_status_.load_failed = true;
3547  import_status_.load_msg = "Dictionary checkpoint failed";
3548  break;
3549  }
3550  }
3551  }
3552  });
3553  if (DEBUG_TIMING) {
3554  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3555  << std::endl;
3556  }
3557  }
3558 }
3559 
3561  const Catalog_Namespace::SessionInfo* session_info) {
3562  // in generalized importing scheme, reaching here file_path may
3563  // contain a file path, a url or a wildcard of file paths.
3564  // see CopyTableStmt::execute.
3565 
3566  std::vector<std::string> file_paths;
3567  try {
3572  file_paths = shared::local_glob_filter_sort_files(file_path, options);
3573  } catch (const shared::FileNotFoundException& e) {
3574  // After finding no matching files locally, file_path may still be an s3 url
3575  file_paths.push_back(file_path);
3576  }
3577 
3578  // sum up sizes of all local files -- only for local files. if
3579  // file_path is a s3 url, sizes will be obtained via S3Archive.
3580  for (const auto& file_path : file_paths) {
3582  }
3583 
3584  // s3 parquet goes different route because the files do not use libarchive
3585  // but parquet api, and they need to landed like .7z files.
3586  //
3587  // note: parquet must be explicitly specified by a WITH parameter
3588  // "source_type='parquet_file'", because for example spark sql users may specify a
3589  // output url w/o file extension like this:
3590  // df.write
3591  // .mode("overwrite")
3592  // .parquet("s3://bucket/folder/parquet/mydata")
3593  // without the parameter, it means plain or compressed csv files.
3594  // note: .ORC and AVRO files should follow a similar path to Parquet?
3596 #ifdef ENABLE_IMPORT_PARQUET
3597  import_parquet(file_paths, session_info);
3598 #else
3599  throw std::runtime_error("Parquet not supported!");
3600 #endif
3601  } else {
3602  import_compressed(file_paths, session_info);
3603  }
3604 
3605  return import_status_;
3606 }
3607 
3608 namespace {
3609 #ifdef ENABLE_IMPORT_PARQUET
3610 
3611 #ifdef HAVE_AWS_S3
3612 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3613 create_parquet_s3_detect_filesystem_config(const foreign_storage::ForeignServer* server,
3614  const CopyParams& copy_params) {
3615  foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3616  config;
3617 
3618  if (!copy_params.s3_access_key.empty()) {
3619  config.s3_access_key = copy_params.s3_access_key;
3620  }
3621  if (!copy_params.s3_secret_key.empty()) {
3622  config.s3_secret_key = copy_params.s3_secret_key;
3623  }
3624  if (!copy_params.s3_session_token.empty()) {
3625  config.s3_session_token = copy_params.s3_session_token;
3626  }
3627 
3628  return config;
3629 }
3630 #endif
3631 
3632 foreign_storage::DataPreview get_parquet_data_preview(const std::string& file_name,
3633  const CopyParams& copy_params) {
3634  TableDescriptor td;
3635  td.tableName = "parquet-detect-table";
3636  td.tableId = -1;
3638  auto [foreign_server, user_mapping, foreign_table] =
3639  foreign_storage::create_proxy_fsi_objects(file_name, copy_params, &td);
3640 
3641  std::shared_ptr<arrow::fs::FileSystem> file_system;
3642  auto& server_options = foreign_server->options;
3643  if (server_options
3645  ->second ==
3647  file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3648 #ifdef HAVE_AWS_S3
3649  } else if (server_options
3651  ->second ==
3654  create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3655 #endif
3656  } else {
3657  UNREACHABLE();
3658  }
3659 
3660  auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3661  foreign_table.get(), file_system);
3662  return parquet_data_wrapper->getDataPreview(g_detect_test_sample_size.has_value()
3663  ? g_detect_test_sample_size.value()
3665 }
3666 #endif
3667 
3668 } // namespace
3669 
3670 Detector::Detector(const boost::filesystem::path& fp, CopyParams& cp)
3671  : DataStreamSink(cp, fp.string()), file_path(fp) {
3672 #ifdef ENABLE_IMPORT_PARQUET
3674  !g_enable_legacy_parquet_import) {
3675  data_preview_ = get_parquet_data_preview(fp.string(), cp);
3676  } else
3677 #endif
3678  {
3679  read_file();
3680  init();
3681  }
3682 }
3683 
3684 #ifdef ENABLE_IMPORT_PARQUET
3685 inline auto open_parquet_table(const std::string& file_path,
3686  std::shared_ptr<arrow::io::ReadableFile>& infile,
3687  std::unique_ptr<parquet::arrow::FileReader>& reader,
3688  std::shared_ptr<arrow::Table>& table) {
3689  using namespace parquet::arrow;
3690  auto file_result = arrow::io::ReadableFile::Open(file_path);
3691  PARQUET_THROW_NOT_OK(file_result.status());
3692  infile = file_result.ValueOrDie();
3693 
3694  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3695  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3696  const auto num_row_groups = reader->num_row_groups();
3697  const auto num_columns = table->num_columns();
3698  const auto num_rows = table->num_rows();
3699  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3700  << " columns in " << num_row_groups << " groups.";
3701  return std::make_tuple(num_row_groups, num_columns, num_rows);
3702 } // namespace import_export
3703 
3704 void Detector::import_local_parquet(const std::string& file_path,
3705  const Catalog_Namespace::SessionInfo* session_info) {
3706  /*Skip interrupt checking in detector*/
3707  std::shared_ptr<arrow::io::ReadableFile> infile;
3708  std::unique_ptr<parquet::arrow::FileReader> reader;
3709  std::shared_ptr<arrow::Table> table;
3710  int num_row_groups, num_columns;
3711  int64_t num_rows;
3712  std::tie(num_row_groups, num_columns, num_rows) =
3713  open_parquet_table(file_path, infile, reader, table);
3714  // make up header line if not yet
3715  if (0 == raw_data.size()) {
3716  copy_params.has_header = ImportHeaderRow::kHasHeader;
3717  copy_params.line_delim = '\n';
3718  copy_params.delimiter = ',';
3719  // must quote values to skip any embedded delimiter
3720  copy_params.quoted = true;
3721  copy_params.quote = '"';
3722  copy_params.escape = '"';
3723  for (int c = 0; c < num_columns; ++c) {
3724  if (c) {
3725  raw_data += copy_params.delimiter;
3726  }
3727  raw_data += table->ColumnNames().at(c);
3728  }
3729  raw_data += copy_params.line_delim;
3730  }
3731  // make up raw data... rowwize...
3732  const ColumnDescriptor cd;
3733  for (int g = 0; g < num_row_groups; ++g) {
3734  // data is columnwise
3735  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3736  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3737  arrays.resize(num_columns);
3738  for (int c = 0; c < num_columns; ++c) {
3739  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3740  for (auto chunk : arrays[c]->chunks()) {
3741  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3742  }
3743  }
3744  for (int r = 0; r < num_rows; ++r) {
3745  for (int c = 0; c < num_columns; ++c) {
3746  std::vector<std::string> buffer;
3747  for (auto chunk : arrays[c]->chunks()) {
3748  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3749  if (c) {
3750  raw_data += copy_params.delimiter;
3751  }
3752  if (!chunk->IsNull(r)) {
3753  raw_data += copy_params.quote;
3754  raw_data += boost::replace_all_copy(
3755  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3756  raw_data += copy_params.quote;
3757  }
3758  }
3759  }
3760  raw_data += copy_params.line_delim;
3762  if (++import_status_.rows_completed >= 10000) {
3763  // as if load truncated
3764  import_status_.load_failed = true;
3765  import_status_.load_msg = "Detector processed 10000 records";
3766  return;
3767  }
3768  }
3769  }
3770 }
3771 
3772 template <typename DATA_TYPE>
3774  std::vector<DATA_TYPE>& buffer,
3775  import_export::BadRowsTracker* const bad_rows_tracker) {
3776  const auto old_size = buffer.size();
3777  // erase backward to minimize memory movement overhead
3778  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3779  ++rit) {
3780  buffer.erase(buffer.begin() + *rit);
3781  }
3782  return std::make_tuple(old_size, buffer.size());
3783 }
3784 
3786  BadRowsTracker* const bad_rows_tracker) {
3787  switch (type) {
3788  case kBOOLEAN:
3789  return del_values(*bool_buffer_, bad_rows_tracker);
3790  case kTINYINT:
3791  return del_values(*tinyint_buffer_, bad_rows_tracker);
3792  case kSMALLINT:
3793  return del_values(*smallint_buffer_, bad_rows_tracker);
3794  case kINT:
3795  return del_values(*int_buffer_, bad_rows_tracker);
3796  case kBIGINT:
3797  case kNUMERIC:
3798  case kDECIMAL:
3799  case kTIME:
3800  case kTIMESTAMP:
3801  case kDATE:
3802  return del_values(*bigint_buffer_, bad_rows_tracker);
3803  case kFLOAT:
3804  return del_values(*float_buffer_, bad_rows_tracker);
3805  case kDOUBLE:
3806  return del_values(*double_buffer_, bad_rows_tracker);
3807  case kTEXT:
3808  case kVARCHAR:
3809  case kCHAR:
3810  return del_values(*string_buffer_, bad_rows_tracker);
3811  case kPOINT:
3812  case kMULTIPOINT:
3813  case kLINESTRING:
3814  case kMULTILINESTRING:
3815  case kPOLYGON:
3816  case kMULTIPOLYGON:
3817  return del_values(*geo_string_buffer_, bad_rows_tracker);
3818  case kARRAY:
3819  return del_values(*array_buffer_, bad_rows_tracker);
3820  default:
3821  throw std::runtime_error("Invalid Type");
3822  }
3823 }
3824 
3825 void Importer::import_local_parquet(const std::string& file_path,
3826  const Catalog_Namespace::SessionInfo* session_info) {
3827  std::shared_ptr<arrow::io::ReadableFile> infile;
3828  std::unique_ptr<parquet::arrow::FileReader> reader;
3829  std::shared_ptr<arrow::Table> table;
3830  int num_row_groups, num_columns;
3831  int64_t nrow_in_file;
3832  std::tie(num_row_groups, num_columns, nrow_in_file) =
3833  open_parquet_table(file_path, infile, reader, table);
3834  // column_list has no $deleted
3835  const auto& column_list = get_column_descs();
3836  // for now geo columns expect a wkt or wkb hex string
3837  std::vector<const ColumnDescriptor*> cds;
3838  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3839  int num_physical_cols = 0;
3840  for (auto& cd : column_list) {
3841  cds.push_back(cd);
3842  num_physical_cols += cd->columnType.get_physical_cols();
3843  }
3844  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3845  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3846  std::to_string(num_columns) + " columns in file vs " +
3847  std::to_string(column_list.size() - num_physical_cols) +
3848  " columns in table.");
3849  // slice each group to import slower columns faster, eg. geo or string
3850  max_threads = num_import_threads(copy_params.threads);
3851  VLOG(1) << "Parquet import # threads: " << max_threads;
3852  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3853  // init row estimate for this file
3854  const auto filesize = get_filesize(file_path);
3855  size_t nrow_completed{0};
3856  file_offsets.push_back(0);
3857  // map logic column index to physical column index
3858  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto{
3859  int physical_col_idx = 0;
3860  for (int i = 0; i < logic_col_idx; ++i) {
3861  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3862  }
3863  return physical_col_idx;
3864  };
3865  // load a file = nested iteration of row groups, row slices and logical columns
3866  auto query_session = session_info ? session_info->get_session_id() : "";
3867  auto ms_load_a_file = measure<>::execution([&]() {
3868  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3869  {
3872  break;
3873  }
3874  }
3875  // a sliced row group will be handled like a (logic) parquet file, with
3876  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3877  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3879  import_status_.load_failed = true;
3880  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3881  }
3882  import_buffers_vec.resize(num_slices);
3883  for (int slice = 0; slice < num_slices; slice++) {
3884  import_buffers_vec[slice].clear();
3885  for (const auto cd : cds) {
3886  import_buffers_vec[slice].emplace_back(
3887  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3888  }
3889  }
3890  /*
3891  * A caveat here is: Parquet files or arrow data is imported column wise.
3892  * Unlike importing row-wise csv files, a error on any row of any column
3893  * forces to give up entire row group of all columns, unless there is a
3894  * sophisticated method to trace erroneous rows in individual columns so
3895  * that we can union bad rows and drop them from corresponding
3896  * import_buffers_vec; otherwise, we may exceed maximum number of
3897  * truncated rows easily even with very sparse errors in the files.
3898  */
3899  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3900  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3901  auto& bad_rows_tracker = bad_rows_trackers[slice];
3902  bad_rows_tracker.file_name = file_path;
3903  bad_rows_tracker.row_group = slice;
3904  bad_rows_tracker.importer = this;
3905  }
3906  // process arrow arrays to import buffers
3907  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3908  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3909  const auto cd = cds[physical_col_idx];
3910  std::shared_ptr<arrow::ChunkedArray> array;
3911  PARQUET_THROW_NOT_OK(
3912  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3913  const size_t array_size = array->length();
3914  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3915  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3916  for (int slice = 0; slice < num_slices; ++slice) {
3917  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3919  import_status_.load_failed = true;
3920  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3921  }
3922  thread_controller.startThread([&, slice] {
3923  const auto slice_offset = slice % num_slices;
3924  ArraySliceRange slice_range(
3925  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3926  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3927  auto& bad_rows_tracker = bad_rows_trackers[slice];
3928  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3929  import_buffer->import_buffers = &import_buffers_vec[slice];
3930  import_buffer->col_idx = physical_col_idx + 1;
3931  for (auto chunk : array->chunks()) {
3932  import_buffer->add_arrow_values(
3933  cd, *chunk, false, slice_range, &bad_rows_tracker);
3934  }
3935  });
3936  }
3937  thread_controller.finish();
3938  }
3939  std::vector<size_t> nrow_in_slice_raw(num_slices);
3940  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3941  // trim bad rows from import buffers
3942  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3943  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3944  const auto cd = cds[physical_col_idx];
3945  for (int slice = 0; slice < num_slices; ++slice) {
3946  auto& bad_rows_tracker = bad_rows_trackers[slice];
3947  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3948  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3949  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3950  }
3951  }
3952  // flush slices of this row group to chunks
3953  for (int slice = 0; slice < num_slices; ++slice) {
3954  load(import_buffers_vec[slice],
3955  nrow_in_slice_successfully_loaded[slice],
3956  session_info);
3957  }
3958  // update import stats
3959  const auto nrow_original =
3960  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3961  const auto nrow_imported =
3962  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3963  nrow_in_slice_successfully_loaded.end(),
3964  0);
3965  const auto nrow_dropped = nrow_original - nrow_imported;
3966  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3967  << " rows, drop " << nrow_dropped << " rows.";
3968  {
3970  import_status_.rows_completed += nrow_imported;
3971  import_status_.rows_rejected += nrow_dropped;
3972  if (import_status_.rows_rejected > copy_params.max_reject) {
3973  import_status_.load_failed = true;
3974  import_status_.load_msg = "Maximum (" + std::to_string(copy_params.max_reject) +
3975  ") rows rejected exceeded. Halting load.";
3976  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3977  << ") rows rejected exceeded. Halting load.";
3978  }
3979  }
3980  // row estimate
3981  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3982  nrow_completed += nrow_imported;
3983  file_offsets.back() =
3984  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3985  // sum up current total file offsets
3986  const auto total_file_offset =
3987  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3988  // estimate number of rows per current total file offset
3989  if (total_file_offset) {
3991  (float)total_file_size / total_file_offset * import_status_.rows_completed;
3992  LOG(INFO) << "rows_completed " << import_status_.rows_completed
3993  << ", rows_estimated " << import_status_.rows_estimated
3994  << ", total_file_size " << total_file_size << ", total_file_offset "
3995  << total_file_offset;
3996  }
3997  }
3998  });
3999  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
4000  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
4001 }
4002 
4003 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4004  const Catalog_Namespace::SessionInfo* session_info) {
4005  auto importer = dynamic_cast<Importer*>(this);
4006  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4007  : std::vector<Catalog_Namespace::TableEpochInfo>{};
4008  try {
4009  std::exception_ptr teptr;
4010  // file_paths may contain one local file path, a list of local file paths
4011  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4012  for (auto const& file_path : file_paths) {
4013  std::map<int, std::string> url_parts;
4014  Archive::parse_url(file_path, url_parts);
4015 
4016  // for a s3 url we need to know the obj keys that it comprises
4017  std::vector<std::string> objkeys;
4018  std::unique_ptr<S3ParquetArchive> us3arch;
4019  if ("s3" == url_parts[2]) {
4020 #ifdef HAVE_AWS_S3
4021  us3arch.reset(new S3ParquetArchive(file_path,
4022  copy_params.s3_access_key,
4023  copy_params.s3_secret_key,
4024  copy_params.s3_session_token,
4025  copy_params.s3_region,
4026  copy_params.s3_endpoint,
4027  copy_params.plain_text,
4028  copy_params.regex_path_filter,
4029  copy_params.file_sort_order_by,
4030  copy_params.file_sort_regex));
4031  us3arch->init_for_read();
4032  total_file_size += us3arch->get_total_file_size();
4033  objkeys = us3arch->get_objkeys();
4034 #else
4035  throw std::runtime_error("AWS S3 support not available");
4036 #endif // HAVE_AWS_S3
4037  } else {
4038  objkeys.emplace_back(file_path);
4039  }
4040 
4041  // for each obj key of a s3 url we need to land it before
4042  // importing it like doing with a 'local file'.
4043  for (auto const& objkey : objkeys) {
4044  try {
4045  auto file_path =
4046  us3arch
4047  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4048  : objkey;
4049  import_local_parquet(file_path, session_info);
4050  if (us3arch) {
4051  us3arch->vacuum(objkey);
4052  }
4053  } catch (...) {
4054  if (us3arch) {
4055  us3arch->vacuum(objkey);
4056  }
4057  throw;
4058  }
4061  break;
4062  }
4063  }
4064  }
4065  // rethrow any exception happened herebefore
4066  if (teptr) {
4067  std::rethrow_exception(teptr);
4068  }
4069  } catch (const shared::NoRegexFilterMatchException& e) {
4071  import_status_.load_failed = true;
4072  import_status_.load_msg = e.what();
4073  throw e;
4074  } catch (const std::exception& e) {
4076  import_status_.load_failed = true;
4077  import_status_.load_msg = e.what();
4078  }
4079 
4080  if (importer) {
4081  importer->checkpoint(table_epochs);
4082  }
4083 }
4084 #endif // ENABLE_IMPORT_PARQUET
4085 
4087  std::vector<std::string>& file_paths,
4088  const Catalog_Namespace::SessionInfo* session_info) {
4089  // a new requirement is to have one single input stream into
4090  // Importer::importDelimited, so need to move pipe related
4091  // stuff to the outmost block.
4092  int fd[2];
4093 #ifdef _WIN32
4094  // For some reason when folly is used to create the pipe, reader can
4095  // read nothing.
4096  auto pipe_res =
4097  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4098 #else
4099  auto pipe_res = pipe(fd);
4100 #endif
4101  if (pipe_res < 0) {
4102  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4103  }
4104 #ifndef _WIN32
4105  signal(SIGPIPE, SIG_IGN);
4106 #endif
4107 
4108  std::exception_ptr teptr;
4109  // create a thread to read uncompressed byte stream out of pipe and
4110  // feed into importDelimited()
4111  ImportStatus ret1;
4112  auto th_pipe_reader = std::thread([&]() {
4113  try {
4114  // importDelimited will read from FILE* p_file
4115  if (0 == (p_file = fdopen(fd[0], "r"))) {
4116  throw std::runtime_error(std::string("failed to open a pipe: ") +
4117  strerror(errno));
4118  }
4119 
4120  // in future, depending on data types of this uncompressed stream
4121  // it can be feed into other function such like importParquet, etc
4122  ret1 = importDelimited(file_path, true, session_info);
4123 
4124  } catch (...) {
4125  if (!teptr) { // no replace
4126  teptr = std::current_exception();
4127  }
4128  }
4129 
4130  if (p_file) {
4131  fclose(p_file);
4132  }
4133  p_file = 0;
4134  });
4135 
4136  // create a thread to iterate all files (in all archives) and
4137  // forward the uncompressed byte stream to fd[1] which is
4138  // then feed into importDelimited, importParquet, and etc.
4139  auto th_pipe_writer = std::thread([&]() {
4140  std::unique_ptr<S3Archive> us3arch;
4141  bool stop = false;
4142  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4143  try {
4144  auto file_path = file_paths[fi];
4145  std::unique_ptr<Archive> uarch;
4146  std::map<int, std::string> url_parts;
4147  Archive::parse_url(file_path, url_parts);
4148  const std::string S3_objkey_url_scheme = "s3ok";
4149  if ("file" == url_parts[2] || "" == url_parts[2]) {
4150  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4151  } else if ("s3" == url_parts[2]) {
4152 #ifdef HAVE_AWS_S3
4153  // new a S3Archive with a shared s3client.
4154  // should be safe b/c no wildcard with s3 url
4155  us3arch.reset(new S3Archive(file_path,
4156  copy_params.s3_access_key,
4157  copy_params.s3_secret_key,
4158  copy_params.s3_session_token,
4159  copy_params.s3_region,
4160  copy_params.s3_endpoint,
4161  copy_params.plain_text,
4162  copy_params.regex_path_filter,
4163  copy_params.file_sort_order_by,
4164  copy_params.file_sort_regex));
4165  us3arch->init_for_read();
4166  total_file_size += us3arch->get_total_file_size();
4167  // not land all files here but one by one in following iterations
4168  for (const auto& objkey : us3arch->get_objkeys()) {
4169  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4170  }
4171  continue;
4172 #else
4173  throw std::runtime_error("AWS S3 support not available");
4174 #endif // HAVE_AWS_S3
4175  } else if (S3_objkey_url_scheme == url_parts[2]) {
4176 #ifdef HAVE_AWS_S3
4177  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4178  auto file_path =
4179  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4180  if (0 == file_path.size()) {
4181  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4182  }
4183  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4184  // file not removed until file closed
4185  us3arch->vacuum(objkey);
4186 #else
4187  throw std::runtime_error("AWS S3 support not available");
4188 #endif // HAVE_AWS_S3
4189  }
4190 #if 0 // TODO(ppan): implement and enable any other archive class
4191  else
4192  if ("hdfs" == url_parts[2])
4193  uarch.reset(new HdfsArchive(file_path));
4194 #endif
4195  else {
4196  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4197  }
4198 
4199  // init the archive for read
4200  auto& arch = *uarch;
4201 
4202  // coming here, the archive of url should be ready to be read, unarchived
4203  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4204  const void* buf;
4205  size_t size;
4206  bool just_saw_archive_header;
4207  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4208  bool first_text_header_skipped = false;
4209  // start reading uncompressed bytes of this archive from libarchive
4210  // note! this archive may contain more than one files!
4211  file_offsets.push_back(0);
4212  size_t num_block_read = 0;
4213  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4214  bool insert_line_delim_after_this_file = false;
4215  while (!stop) {
4216  int64_t offset{-1};
4217  auto ok = arch.read_data_block(&buf, &size, &offset);
4218  // can't use (uncompressed) size, so track (max) file offset.
4219  // also we want to capture offset even on e.o.f.
4220  if (offset > 0) {
4221  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4222  file_offsets.back() = offset;
4223  }
4224  if (!ok) {
4225  break;
4226  }
4227  // one subtle point here is now we concatenate all files
4228  // to a single FILE stream with which we call importDelimited
4229  // only once. this would make it misunderstand that only one
4230  // header line is with this 'single' stream, while actually
4231  // we may have one header line for each of the files.
4232  // so we need to skip header lines here instead in importDelimited.
4233  const char* buf2 = (const char*)buf;
4234  int size2 = size;
4235  if (copy_params.has_header != import_export::ImportHeaderRow::kNoHeader &&
4236  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4237  while (size2-- > 0) {
4238  if (*buf2++ == copy_params.line_delim) {
4239  break;
4240  }
4241  }
4242  if (size2 <= 0) {
4243  LOG(WARNING) << "No line delimiter in block." << std::endl;
4244  } else {
4245  just_saw_archive_header = false;
4246  first_text_header_skipped = true;
4247  }
4248  }
4249  // In very rare occasions the write pipe somehow operates in a mode similar
4250  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4251  // means blocking mode. On such a unreliable blocking mode, a possible fix
4252  // is to loop reading till no bytes left, otherwise the annoying `failed to
4253  // write pipe: Success`...
4254  if (size2 > 0) {
4255  int nremaining = size2;
4256  while (nremaining > 0) {
4257  // try to write the entire remainder of the buffer to the pipe
4258  int nwritten = write(fd[1], buf2, nremaining);
4259  // how did we do?
4260  if (nwritten < 0) {
4261  // something bad happened
4262  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4263  // ignore these, assume nothing written, try again
4264  nwritten = 0;
4265  } else if (errno == EPIPE &&
4267  // the reader thread has shut down the pipe from the read end
4268  stop = true;
4269  break;
4270  } else {
4271  // a real error
4272  throw std::runtime_error(
4273  std::string("failed or interrupted write to pipe: ") +
4274  strerror(errno));
4275  }
4276  } else if (nwritten == nremaining) {
4277  // we wrote everything; we're done
4278  break;
4279  }
4280  // only wrote some (or nothing), try again
4281  nremaining -= nwritten;
4282  buf2 += nwritten;
4283  // no exception when too many rejected
4286  stop = true;
4287  break;
4288  }
4289  }
4290  // check that this file (buf for size) ended with a line delim
4291  if (size > 0) {
4292  const char* plast = static_cast<const char*>(buf) + (size - 1);
4293  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4294  }
4295  }
4296  ++num_block_read;
4297  }
4298 
4299  // if that file didn't end with a line delim, we insert one here to terminate
4300  // that file's stream use a loop for the same reason as above
4301  if (insert_line_delim_after_this_file) {
4302  while (true) {
4303  // write the delim char to the pipe
4304  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4305  // how did we do?
4306  if (nwritten < 0) {
4307  // something bad happened
4308  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4309  // ignore these, assume nothing written, try again
4310  nwritten = 0;
4311  } else if (errno == EPIPE &&
4313  // the reader thread has shut down the pipe from the read end
4314  stop = true;
4315  break;
4316  } else {
4317  // a real error
4318  throw std::runtime_error(
4319  std::string("failed or interrupted write to pipe: ") +
4320  strerror(errno));
4321  }
4322  } else if (nwritten == 1) {
4323  // we wrote it; we're done
4324  break;
4325  }
4326  }
4327  }
4328  }
4329  } catch (...) {
4330  // when import is aborted because too many data errors or because end of a
4331  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4332  // suppressed.
4335  break;
4336  }
4337  if (import_status_.rows_completed > 0) {
4338  if (nullptr != dynamic_cast<Detector*>(this)) {
4339  break;
4340  }
4341  }
4342  if (!teptr) { // no replace
4343  teptr = std::current_exception();
4344  }
4345  break;
4346  }
4347  }
4348  // close writer end
4349  close(fd[1]);
4350  });
4351 
4352  th_pipe_reader.join();
4353  th_pipe_writer.join();
4354 
4355  // rethrow any exception happened herebefore
4356  if (teptr) {
4357  std::rethrow_exception(teptr);
4358  }
4359 }
4360 
4362  return DataStreamSink::archivePlumber(session_info);
4363 }
4364 
4366  const std::string& file_path,
4367  const bool decompressed,
4368  const Catalog_Namespace::SessionInfo* session_info) {
4370  auto query_session = session_info ? session_info->get_session_id() : "";
4371 
4372  if (!p_file) {
4373  p_file = fopen(file_path.c_str(), "rb");
4374  }
4375  if (!p_file) {
4376  throw std::runtime_error("failed to open file '" + file_path +
4377  "': " + strerror(errno));
4378  }
4379 
4380  if (!decompressed) {
4381  (void)fseek(p_file, 0, SEEK_END);
4382  file_size = ftell(p_file);
4383  }
4384 
4385  max_threads = num_import_threads(copy_params.threads);
4386  VLOG(1) << "Delimited import # threads: " << max_threads;
4387 
4388  // deal with small files
4389  size_t alloc_size = copy_params.buffer_size;
4390  if (!decompressed && file_size < alloc_size) {
4391  alloc_size = file_size;
4392  }
4393 
4394  for (size_t i = 0; i < max_threads; i++) {
4395  import_buffers_vec.emplace_back();
4396  for (const auto cd : loader->get_column_descs()) {
4397  import_buffers_vec[i].emplace_back(
4398  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4399  }
4400  }
4401 
4402  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4403  size_t current_pos = 0;
4404  size_t end_pos;
4405  size_t begin_pos = 0;
4406 
4407  (void)fseek(p_file, current_pos, SEEK_SET);
4408  size_t size =
4409  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4410 
4411  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4412  loader->getTableDesc()->tableId};
4413  auto table_epochs = loader->getTableEpochs();
4415  {
4416  std::list<std::future<ImportStatus>> threads;
4417 
4418  // use a stack to track thread_ids which must not overlap among threads
4419  // because thread_id is used to index import_buffers_vec[]
4420  std::stack<size_t> stack_thread_ids;
4421  for (size_t i = 0; i < max_threads; i++) {
4422  stack_thread_ids.push(i);
4423  }
4424  // added for true row index on error
4425  size_t first_row_index_this_buffer = 0;
4426 
4427  while (size > 0) {
4428  unsigned int num_rows_this_buffer = 0;
4429  CHECK(scratch_buffer);
4430  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4431  scratch_buffer,
4432  size,
4433  copy_params,
4434  first_row_index_this_buffer,
4435  num_rows_this_buffer,
4436  p_file);
4437 
4438  // unput residual
4439  int nresidual = size - end_pos;
4440  std::unique_ptr<char[]> unbuf;
4441  if (nresidual > 0) {
4442  unbuf = std::make_unique<char[]>(nresidual);
4443  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4444  }
4445 
4446  // get a thread_id not in use
4447  auto thread_id = stack_thread_ids.top();
4448  stack_thread_ids.pop();
4449  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4450 
4451  threads.push_back(std::async(std::launch::async,
4453  thread_id,
4454  this,
4455  std::move(scratch_buffer),
4456  begin_pos,
4457  end_pos,
4458  end_pos,
4459  first_row_index_this_buffer,
4460  session_info,
4461  executor));
4462 
4463  first_row_index_this_buffer += num_rows_this_buffer;
4464 
4465  current_pos += end_pos;
4466  scratch_buffer = std::make_unique<char[]>(alloc_size);
4467  CHECK(scratch_buffer);
4468  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4469  size = nresidual +
4470  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4471 
4472  begin_pos = 0;
4473  while (threads.size() > 0) {
4474  int nready = 0;
4475  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4476  it != threads.end();) {
4477  auto& p = *it;
4478  std::chrono::milliseconds span(0);
4479  if (p.wait_for(span) == std::future_status::ready) {
4480  auto ret_import_status = p.get();
4481  {
4483  import_status_ += ret_import_status;
4484  if (ret_import_status.load_failed) {
4486  }
4487  }
4488  // sum up current total file offsets
4489  size_t total_file_offset{0};
4490  if (decompressed) {
4491  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4492  for (const auto file_offset : file_offsets) {
4493  total_file_offset += file_offset;
4494  }
4495  }
4496  // estimate number of rows per current total file offset
4497  if (decompressed ? total_file_offset : current_pos) {
4499  (decompressed ? (float)total_file_size / total_file_offset
4500  : (float)file_size / current_pos) *
4502  }
4503 
4504  LOG(INFO) << "rows_completed " << import_status_.rows_completed
4505  << ", rows_estimated " << import_status_.rows_estimated
4506  << ", total_file_size " << total_file_size << ", total_file_offset "
4507  << total_file_offset;
4509  // recall thread_id for reuse
4510  stack_thread_ids.push(ret_import_status.thread_id);
4511  threads.erase(it++);
4512  ++nready;
4513  } else {
4514  ++it;
4515  }
4516  }
4517 
4518  if (nready == 0) {
4519  std::this_thread::yield();
4520  }
4521 
4522  // on eof, wait all threads to finish
4523  if (0 == size) {
4524  continue;
4525  }
4526 
4527  // keep reading if any free thread slot
4528  // this is one of the major difference from old threading model !!
4529  if (threads.size() < max_threads) {
4530  break;
4531  }
4534  break;
4535  }
4536  }
4538  if (import_status_.rows_rejected > copy_params.max_reject) {
4539  import_status_.load_failed = true;
4540  // todo use better message
4541  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4542  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4543  break;
4544  }
4546  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4547  break;
4548  }
4549  }
4550 
4551  // join dangling threads in case of LOG(ERROR) above
4552  for (auto& p : threads) {
4553  p.wait();
4554  }
4555  }
4556 
4557  checkpoint(table_epochs);
4558 
4559  fclose(p_file);
4560  p_file = nullptr;
4561  return import_status_;
4562 }
4563 
4565  if (getTableDesc()->persistenceLevel ==
4566  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4567  // tables
4569  }
4570 }
4571 
4572 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4573  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4574  getTableDesc()->tableId);
4575 }
4576 
4578  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4579  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4580 }
4581 
4582 /* static */
4584  const std::string& file_name,
4585  const CopyParams& copy_params) {
4588  copy_params.s3_endpoint,
4589  copy_params.s3_access_key,
4590  copy_params.s3_secret_key,
4591  copy_params.s3_session_token);
4592  if (copy_params.source_type != import_export::SourceType::kGeoFile) {
4593  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4594  std::to_string(static_cast<int>(copy_params.source_type)) +
4595  ")");
4596  }
4598 }
4599 
4600 namespace {
4601 
4602 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4604  const std::string& file_name) {
4605  // get layer with specified name, or default to first layer
4606  OGRLayer* poLayer = nullptr;
4607  if (geo_layer_name.size()) {
4608  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4609  if (poLayer == nullptr) {
4610  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4611  file_name);
4612  }
4613  } else {
4614  poLayer = poDS->GetLayer(0);
4615  if (poLayer == nullptr) {
4616  throw std::runtime_error("No layers found in " + file_name);
4617  }
4618  }
4619  return *poLayer;
4620 }
4621 
4622 } // namespace
4623 
4624 /* static */
4626  const std::string& file_name,
4627  const std::string& geo_column_name,
4628  std::map<std::string, std::vector<std::string>>& sample_data,
4629  int row_limit,
4630  const CopyParams& copy_params) {
4632  openGDALDataSource(file_name, copy_params));
4633  if (datasource == nullptr) {
4634  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4635  file_name);
4636  }
4637 
4638  OGRLayer& layer =
4639  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4640 
4641  auto const* feature_defn = layer.GetLayerDefn();
4642  CHECK(feature_defn);
4643 
4644  // metadata columns?
4645  auto const metadata_column_infos =
4646  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4647 
4648  // get limited feature count
4649  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4650  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4651  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4652 
4653  // prepare sample data map
4654  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4655  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4656  CHECK(column_name);
4657  sample_data[column_name] = {};
4658  }
4659  sample_data[geo_column_name] = {};
4660  for (auto const& mci : metadata_column_infos) {
4661  sample_data[mci.column_descriptor.columnName] = {};
4662  }
4663 
4664  // prepare to read
4665  layer.ResetReading();
4666 
4667  // read features (up to limited count)
4668  uint64_t feature_index{0u};
4669  while (feature_index < num_features) {
4670  // get (and take ownership of) feature
4671  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4672  if (!feature) {
4673  break;
4674  }
4675 
4676  // get feature geometry
4677  auto const* geometry = feature->GetGeometryRef();
4678  if (geometry == nullptr) {
4679  break;
4680  }
4681 
4682  // validate geom type (again?)
4683  switch (wkbFlatten(geometry->getGeometryType())) {
4684  case wkbPoint:
4685  case wkbMultiPoint:
4686  case wkbLineString:
4687  case wkbMultiLineString:
4688  case wkbPolygon:
4689  case wkbMultiPolygon:
4690  break;
4691  default:
4692  throw std::runtime_error("Unsupported geometry type: " +
4693  std::string(geometry->getGeometryName()));
4694  }
4695 
4696  // populate sample data for regular field columns
4697  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4698  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4699  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4700  }
4701 
4702  // populate sample data for metadata columns?
4703  for (auto const& mci : metadata_column_infos) {
4704  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4705  }
4706 
4707  // populate sample data for geo column with WKT string
4708  char* wkts = nullptr;
4709  geometry->exportToWkt(&wkts);
4710  CHECK(wkts);
4711  sample_data[geo_column_name].push_back(wkts);
4712  CPLFree(wkts);
4713 
4714  // next feature
4715  feature_index++;
4716  }
4717 }
4718 
4719 namespace {
4720 
4721 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4722  switch (ogr_type) {
4723  case OFTInteger:
4724  return std::make_pair(kINT, false);
4725  case OFTIntegerList:
4726  return std::make_pair(kINT, true);
4727 #if GDAL_VERSION_MAJOR > 1
4728  case OFTInteger64:
4729  return std::make_pair(kBIGINT, false);
4730  case OFTInteger64List:
4731  return std::make_pair(kBIGINT, true);
4732 #endif
4733  case OFTReal:
4734  return std::make_pair(kDOUBLE, false);
4735  case OFTRealList:
4736  return std::make_pair(kDOUBLE, true);
4737  case OFTString:
4738  return std::make_pair(kTEXT, false);
4739  case OFTStringList:
4740  return std::make_pair(kTEXT, true);
4741  case OFTDate:
4742  return std::make_pair(kDATE, false);
4743  case OFTTime:
4744  return std::make_pair(kTIME, false);
4745  case OFTDateTime:
4746  return std::make_pair(kTIMESTAMP, false);
4747  case OFTBinary:
4748  // Interpret binary blobs as byte arrays here
4749  // but actual import will store NULL as GDAL will not
4750  // extract the blob (OGRFeature::GetFieldAsString will
4751  // result in the import buffers having an empty string)
4752  return std::make_pair(kTINYINT, true);
4753  default:
4754  break;
4755  }
4756  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4757 }
4758 
4759 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4760  switch (ogr_type) {
4761  case wkbPoint:
4762  return kPOINT;
4763  case wkbMultiPoint:
4764  return kMULTIPOINT;
4765  case wkbLineString:
4766  return kLINESTRING;
4767  case wkbMultiLineString:
4768  return kMULTILINESTRING;
4769  case wkbPolygon:
4770  return kPOLYGON;
4771  case wkbMultiPolygon:
4772  return kMULTIPOLYGON;
4773  default:
4774  break;
4775  }
4776  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4777 }
4778 
4780  const import_export::RasterPointType raster_point_type) {
4781  switch (raster_point_type) {
4796  }
4797  UNREACHABLE();
4799 }
4800 
4802  const import_export::RasterPointTransform raster_point_transform) {
4803  switch (raster_point_transform) {
4812  }
4813  UNREACHABLE();
4815 }
4816 
4817 } // namespace
4818 
4819 /* static */
4820 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4821  const std::string& file_name,
4822  const bool is_raster,
4823  const std::string& geo_column_name,
4824  const CopyParams& copy_params) {
4825  if (is_raster) {
4826  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4827  }
4828  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4829 }
4830 
4831 /* static */
4832 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsRaster(
4833  const std::string& file_name,
4834  const std::string& geo_column_name,
4835  const CopyParams& copy_params) {
4836  // lazy init GDAL
4839  copy_params.s3_endpoint,
4840  copy_params.s3_access_key,
4841  copy_params.s3_secret_key,
4842  copy_params.s3_session_token);
4843 
4844  // prepare for metadata column
4845  auto metadata_column_infos =
4846  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4847 
4848  // create a raster importer and do the detect
4849  RasterImporter raster_importer;
4850  raster_importer.detect(
4851  file_name,
4852  copy_params.raster_import_bands,
4853  copy_params.raster_import_dimensions,
4856  copy_params.raster_point_compute_angle,
4857  false,
4858  metadata_column_infos);
4859 
4860  // prepare to capture column descriptors
4861  std::list<ColumnDescriptor> cds;
4862 
4863  // get the point column info
4864  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4865 
4866  // create the columns for the point in the specified type
4867  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4868  ColumnDescriptor cd;
4869  cd.columnName = cd.sourceName = col_name;
4870  cd.columnType.set_type(sql_type);
4871  // hardwire other POINT attributes for now
4872  if (sql_type == kPOINT) {
4874  cd.columnType.set_input_srid(4326);
4875  cd.columnType.set_output_srid(4326);
4877  cd.columnType.set_comp_param(32);
4878  }
4879  cds.push_back(cd);
4880  }
4881 
4882  // get the names and types for the band column(s)
4883  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4884 
4885  // add column descriptors for each band
4886  for (auto const& [band_name, sql_type] : band_names_and_types) {
4887  ColumnDescriptor cd;
4888  cd.columnName = cd.sourceName = band_name;
4889  cd.columnType.set_type(sql_type);
4891  cds.push_back(cd);
4892  }
4893 
4894  // metadata columns?
4895  for (auto& mci : metadata_column_infos) {
4896  cds.push_back(std::move(mci.column_descriptor));
4897  }
4898 
4899  // return the results
4900  return cds;
4901 }
4902 
4903 /* static */
4904 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsGeo(
4905  const std::string& file_name,
4906  const std::string& geo_column_name,
4907  const CopyParams& copy_params) {
4908  std::list<ColumnDescriptor> cds;
4909 
4910  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
4911  if (poDS == nullptr) {
4912  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4913  file_name);
4914  }
4915  if (poDS->GetLayerCount() == 0) {
4916  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
4917  " has no layers");
4918  }
4919 
4920  OGRLayer& layer =
4921  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4922 
4923  layer.ResetReading();
4924  // TODO(andrewseidl): support multiple features
4925  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
4926  if (poFeature == nullptr) {
4927  throw std::runtime_error("No features found in " + file_name);
4928  }
4929  // get fields as regular columns
4930  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4931  CHECK(poFDefn);
4932  int iField;
4933  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4934  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4935  auto typePair = ogr_to_type(poFieldDefn->GetType());
4936  ColumnDescriptor cd;
4937  cd.columnName = poFieldDefn->GetNameRef();
4938  cd.sourceName = poFieldDefn->GetNameRef();
4939  SQLTypeInfo ti;
4940  if (typePair.second) {
4941  ti.set_type(kARRAY);
4942  ti.set_subtype(typePair.first);
4943  } else {
4944  ti.set_type(typePair.first);
4945  }
4946  if (typePair.first == kTEXT) {
4948  ti.set_comp_param(0);
4949  }
4950  ti.set_fixed_size();
4951  cd.columnType = ti;
4952  cds.push_back(cd);
4953  }
4954  // try getting the geo column type from the layer
4955  auto ogr_type = wkbFlatten(layer.GetGeomType());
4956  if (ogr_type == wkbUnknown) {
4957  // layer geo type unknown, so try the feature (that we already got)
4958  auto const* ogr_geometry = poFeature->GetGeometryRef();
4959  if (ogr_geometry) {
4960  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
4961  }
4962  }
4963  // do we have a geo column?
4964  if (ogr_type != wkbNone) {
4965  ColumnDescriptor cd;
4966  cd.columnName = geo_column_name;
4967  cd.sourceName = geo_column_name;
4968 
4969  // if exploding, override any collection type to child type
4970  if (copy_params.geo_explode_collections) {
4971  if (ogr_type == wkbMultiPolygon) {
4972  ogr_type = wkbPolygon;
4973  } else if (ogr_type == wkbMultiLineString) {
4974  ogr_type = wkbLineString;
4975  } else if (ogr_type == wkbMultiPoint) {
4976  ogr_type = wkbPoint;
4977  }
4978  }
4979 
4980  // convert to internal type
4981  // this will throw if the type is unsupported
4982  SQLTypes geoType = ogr_to_type(ogr_type);
4983 
4984  // promote column type? (unless exploding)
4985  if (!copy_params.geo_explode_collections) {
4986  if (PROMOTE_POINT_TO_MULTIPOINT && geoType == kPOINT) {
4987  geoType = kMULTIPOINT;
4988  } else if (PROMOTE_LINESTRING_TO_MULTILINESTRING && geoType == kLINESTRING) {
4989  geoType = kMULTILINESTRING;
4990  } else if (PROMOTE_POLYGON_TO_MULTIPOLYGON && geoType == kPOLYGON) {
4991  geoType = kMULTIPOLYGON;
4992  }
4993  }
4994 
4995  // build full internal type
4996  SQLTypeInfo ti;
4997  ti.set_type(geoType);
4998  ti.set_subtype(copy_params.geo_coords_type);
4999  ti.set_input_srid(copy_params.geo_coords_srid);
5000  ti.set_output_srid(copy_params.geo_coords_srid);
5001  ti.set_compression(copy_params.geo_coords_encoding);
5002  ti.set_comp_param(copy_params.geo_coords_comp_param);
5003  cd.columnType = ti;
5004 
5005  cds.push_back(cd);
5006  }
5007 
5008  // metadata columns?
5009  auto metadata_column_infos =
5010  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
5011  for (auto& mci : metadata_column_infos) {
5012  cds.push_back(std::move(mci.column_descriptor));
5013  }
5014 
5015  return cds;
5016 }
5017 
5018 bool Importer::gdalStatInternal(const std::string& path,
5019  const CopyParams& copy_params,
5020  bool also_dir) {
5021  // lazy init GDAL
5024  copy_params.s3_endpoint,
5025  copy_params.s3_access_key,
5026  copy_params.s3_secret_key,
5027  copy_params.s3_session_token);
5028 
5029 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5030  // clear GDAL stat cache
5031  // without this, file existence will be cached, even if authentication changes
5032  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5033  VSICurlClearCache();
5034 #endif
5035 
5036  // stat path
5037  VSIStatBufL sb;
5038  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5039  if (result < 0) {
5040  return false;
5041  }
5042 
5043  // exists?
5044  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5045  return true;
5046  } else if (VSI_ISREG(sb.st_mode)) {
5047  return true;
5048  }
5049  return false;
5050 }
5051 
5052 /* static */
5053 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
5054  return gdalStatInternal(path, copy_params, false);
5055 }
5056 
5057 /* static */
5058 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
5059  const CopyParams& copy_params) {
5060  return gdalStatInternal(path, copy_params, true);
5061 }
5062 
5063 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
5064  std::vector<std::string>& files) {
5065  // prepare to gather subdirectories
5066  std::vector<std::string> subdirectories;
5067 
5068  // get entries
5069  char** entries = VSIReadDir(archive_path.c_str());
5070  if (!entries) {
5071  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
5072  return;
5073  }
5074 
5075  // force scope
5076  {
5077  // request clean-up
5078  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5079 
5080  // check all the entries
5081  int index = 0;
5082  while (true) {
5083  // get next entry, or drop out if there isn't one
5084  char* entry_c = entries[index++];
5085  if (!entry_c) {
5086  break;
5087  }
5088  std::string entry(entry_c);
5089 
5090  // ignore '.' and '..'
5091  if (entry == "." || entry == "..") {
5092  continue;
5093  }
5094 
5095  // build the full path
5096  std::string entry_path = archive_path + std::string("/") + entry;
5097 
5098  // is it a file or a sub-folder
5099  VSIStatBufL sb;
5100  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5101  if (result < 0) {
5102  break;
5103  }
5104 
5105  if (VSI_ISDIR(sb.st_mode)) {
5106  // a directory that ends with .gdb could be a Geodatabase bundle
5107  // arguably dangerous to decide this purely by name, but any further
5108  // validation would be very complex especially at this scope
5109  if (boost::iends_with(entry_path, ".gdb")) {
5110  // add the directory as if it was a file and don't recurse into it
5111  files.push_back(entry_path);
5112  } else {
5113  // add subdirectory to be recursed into
5114  subdirectories.push_back(entry_path);
5115  }
5116  } else {
5117  // add this file
5118  files.push_back(entry_path);
5119  }
5120  }
5121  }
5122 
5123  // recurse into each subdirectories we found
5124  for (const auto& subdirectory : subdirectories) {
5125  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5126  }
5127 }
5128 
5129 /* static */
5130 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5131  const std::string& archive_path,
5132  const CopyParams& copy_params) {
5133  // lazy init GDAL
5136  copy_params.s3_endpoint,
5137  copy_params.s3_access_key,
5138  copy_params.s3_secret_key,
5139  copy_params.s3_session_token);
5140 
5141  // prepare to gather files
5142  std::vector<std::string> files;
5143 
5144  // gather the files recursively
5145  gdalGatherFilesInArchiveRecursive(archive_path, files);
5146 
5147  // convert to relative paths inside archive
5148  for (auto& file : files) {
5149  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5150  }
5151 
5152  // done
5153  return files;
5154 }
5155 
5156 /* static */
5157 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5158  const std::string& file_name,
5159  const CopyParams& copy_params) {
5160  // lazy init GDAL
5163  copy_params.s3_endpoint,
5164  copy_params.s3_access_key,
5165  copy_params.s3_secret_key,
5166  copy_params.s3_session_token);
5167 
5168  // prepare to gather layer info
5169  std::vector<GeoFileLayerInfo> layer_info;
5170 
5171  // open the data set
5172  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5173  if (poDS == nullptr) {
5174  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5175  file_name);
5176  }
5177 
5178  // enumerate the layers
5179  for (auto&& poLayer : poDS->GetLayers()) {
5181  // prepare to read this layer
5182  poLayer->ResetReading();
5183  // skip layer if empty
5184  if (poLayer->GetFeatureCount() > 0) {
5185  // first read layer geo type
5186  auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5187  if (ogr_type == wkbUnknown) {
5188  // layer geo type unknown, so try reading from the first feature
5189  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5190  CHECK(first_feature);
5191  auto const* ogr_geometry = first_feature->GetGeometryRef();
5192  if (ogr_geometry) {
5193  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5194  } else {
5195  ogr_type = wkbNone;
5196  }
5197  }
5198  switch (ogr_type) {
5199  case wkbNone:
5200  // no geo
5201  contents = GeoFileLayerContents::NON_GEO;
5202  break;
5203  case wkbPoint:
5204  case wkbMultiPoint:
5205  case wkbLineString:
5206  case wkbMultiLineString:
5207  case wkbPolygon:
5208  case wkbMultiPolygon:
5209  // layer has supported geo
5210  contents = GeoFileLayerContents::GEO;
5211  break;
5212  default:
5213  // layer has unsupported geometry
5215  break;
5216  }
5217  }
5218  // store info for this layer
5219  layer_info.emplace_back(poLayer->GetName(), contents);
5220  }
5221 
5222  // done
5223  return layer_info;
5224 }
5225 
5227  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5228  const Catalog_Namespace::SessionInfo* session_info,
5229  const bool is_raster) {
5230  if (is_raster) {
5231  return importGDALRaster(session_info);
5232  }
5233  return importGDALGeo(columnNameToSourceNameMap, session_info);
5234 }
5235 
5237  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5238  const Catalog_Namespace::SessionInfo* session_info) {
5239  // initial status
5241  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_path, copy_params));
5242  if (poDS == nullptr) {
5243  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5244  file_path);
5245  }
5246 
5247  OGRLayer& layer =
5248  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_path);
5249 
5250  // get the number of features in this layer
5251  size_t numFeatures = layer.GetFeatureCount();
5252 
5253  // build map of metadata field (additional columns) name to index
5254  // use shared_ptr since we need to pass it to the worker
5255  FieldNameToIndexMapType fieldNameToIndexMap;
5256  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5257  CHECK(poFDefn);
5258  size_t numFields = poFDefn->GetFieldCount();
5259  for (size_t iField = 0; iField < numFields; iField++) {
5260  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5261  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5262  }
5263 
5264  // the geographic spatial reference we want to put everything in
5265  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5266  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5267 
5268 #if GDAL_VERSION_MAJOR >= 3
5269  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5270  // this results in X and Y being transposed for angle-based
5271  // coordinate systems. This restores the previous behavior.
5272  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5273 #endif
5274 
5275 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5276  // just one "thread"
5277  max_threads = 1;
5278 #else
5279  // how many threads to use
5280  max_threads = num_import_threads(copy_params.threads);
5281 #endif
5282  VLOG(1) << "GDAL import # threads: " << max_threads;
5283 
5284  // metadata columns?
5285  auto const metadata_column_infos =
5286  parse_add_metadata_columns(copy_params.add_metadata_columns, file_path);
5287 
5288  // import geo table is specifically handled in both DBHandler and QueryRunner
5289  // that is separate path against a normal SQL execution
5290  // so we here explicitly enroll the import session to allow interruption
5291  // while importing geo table
5292  auto query_session = session_info ? session_info->get_session_id() : "";
5293  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5295  auto is_session_already_registered = false;
5296  {
5298  executor->getSessionLock());
5299  is_session_already_registered =
5300  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5301  }
5302  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5303  !is_session_already_registered) {
5304  executor->enrollQuerySession(query_session,
5305  "IMPORT_GEO_TABLE",
5306  query_submitted_time,
5308  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5309  }
5310  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5311  // reset the runtime query interrupt status
5312  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5313  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5314  }
5315  };
5316 
5317  // make an import buffer for each thread
5318  CHECK_EQ(import_buffers_vec.size(), 0u);
5319  import_buffers_vec.resize(max_threads);
5320  for (size_t i = 0; i < max_threads; i++) {
5321  for (const auto cd : loader->get_column_descs()) {
5322  import_buffers_vec[i].emplace_back(
5323  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5324  }
5325  }
5326 
5327 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5328  // threads
5329  std::list<std::future<ImportStatus>> threads;
5330 
5331  // use a stack to track thread_ids which must not overlap among threads
5332  // because thread_id is used to index import_buffers_vec[]
5333  std::stack<size_t> stack_thread_ids;
5334  for (size_t i = 0; i < max_threads; i++) {
5335  stack_thread_ids.push(i);
5336  }
5337 #endif
5338 
5339  // checkpoint the table
5340  auto table_epochs = loader->getTableEpochs();
5341 
5342  // reset the layer
5343  layer.ResetReading();
5344 
5345  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5346 
5347  // make a features buffer for each thread
5348  std::vector<FeaturePtrVector> features(max_threads);
5349 
5350  // make one of these for each thread, based on the first feature's SR
5351  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5352  max_threads);
5353 
5354  // for each feature...
5355  size_t firstFeatureThisChunk = 0;
5356  while (firstFeatureThisChunk < numFeatures) {
5357  // how many features this chunk
5358  size_t numFeaturesThisChunk =
5359  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5360 
5361 // get a thread_id not in use
5362 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5363  size_t thread_id = 0;
5364 #else
5365  auto thread_id = stack_thread_ids.top();
5366  stack_thread_ids.pop();
5367  CHECK(thread_id < max_threads);
5368 #endif
5369 
5370  // fill features buffer for new thread
5371  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5372  features[thread_id].emplace_back(layer.GetNextFeature());
5373  }
5374 
5375  // construct a coordinate transformation for each thread, if needed
5376  // some features may not have geometry, so look for the first one that does
5377  if (coordinate_transformations[thread_id] == nullptr) {
5378  for (auto const& feature : features[thread_id]) {
5379  auto const* geometry = feature->GetGeometryRef();
5380  if (geometry) {
5381  auto const* geometry_sr = geometry->getSpatialReference();
5382  // if the SR is non-null and non-empty and different from what we want
5383  // we need to make a reusable CoordinateTransformation
5384  if (geometry_sr &&
5385 #if GDAL_VERSION_MAJOR >= 3
5386  !geometry_sr->IsEmpty() &&
5387 #endif
5388  !geometry_sr->IsSame(poGeographicSR.get())) {
5389  // validate the SR before trying to use it
5390  if (geometry_sr->Validate() != OGRERR_NONE) {
5391  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
5392  }
5393  // create the OGRCoordinateTransformation that will be used for
5394  // all the features in this chunk
5395  coordinate_transformations[thread_id].reset(
5396  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5397  if (coordinate_transformations[thread_id] == nullptr) {
5398  throw std::runtime_error(
5399  "Failed to create a GDAL CoordinateTransformation for incoming geo");
5400  }
5401  }
5402  // once we find at least one geometry with an SR, we're done
5403  break;
5404  }
5405  }
5406  }
5407 
5408 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5409  // call worker function directly
5410  auto ret_import_status =
5412  this,
5413  coordinate_transformations[thread_id].get(),
5414  std::move(features[thread_id]),
5415  firstFeatureThisChunk,
5416  numFeaturesThisChunk,
5417  fieldNameToIndexMap,
5418  columnNameToSourceNameMap,
5419  session_info,
5420  executor.get(),
5421  metadata_column_infos);
5422  import_status += ret_import_status;
5423  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5424  import_status.rows_completed;
5425  set_import_status(import_id, import_status);
5426 #else
5427  // fire up that thread to import this geometry
5428  threads.push_back(std::async(std::launch::async,
5430  thread_id,
5431  this,
5432  coordinate_transformations[thread_id].get(),
5433  std::move(features[thread_id]),
5434  firstFeatureThisChunk,
5435  numFeaturesThisChunk,
5436  fieldNameToIndexMap,
5437  columnNameToSourceNameMap,
5438  session_info,
5439  executor.get(),
5440  metadata_column_infos));
5441 
5442  // let the threads run
5443  while (threads.size() > 0) {
5444  int nready = 0;
5445  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5446  it != threads.end();) {
5447  auto& p = *it;
5448  std::chrono::milliseconds span(
5449  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5450  if (p.wait_for(span) == std::future_status::ready) {
5451  auto ret_import_status = p.get();
5452  {
5454  import_status_ += ret_import_status;
5456  ((float)firstFeatureThisChunk / (float)numFeatures) *
5460  break;
5461  }
5462  }
5463  // recall thread_id for reuse
5464  stack_thread_ids.push(ret_import_status.thread_id);
5465 
5466  threads.erase(it++);
5467  ++nready;
5468  } else {
5469  ++it;
5470  }
5471  }
5472 
5473  if (nready == 0) {
5474  std::this_thread::yield();
5475  }
5476 
5477  // keep reading if any free thread slot
5478  // this is one of the major difference from old threading model !!
5479  if (threads.size() < max_threads) {
5480  break;
5481  }
5482  }
5483 #endif
5484 
5485  // out of rows?
5486 
5488  if (import_status_.rows_rejected > copy_params.max_reject) {
5489  import_status_.load_failed = true;
5490  // todo use better message
5491  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5492  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";