OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @brief Functions for Importer class
20  *
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
28 #include <gdal.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
55 #include "Archive/S3Archive.h"
56 #include "ArrowImporter.h"
57 #include "Catalog/os/UserMapping.h"
58 #ifdef ENABLE_IMPORT_PARQUET
60 #endif
61 #if defined(ENABLE_IMPORT_PARQUET)
62 #include "Catalog/ForeignTable.h"
64 #endif
65 #ifdef ENABLE_IMPORT_PARQUET
67 #endif
68 #include "Geospatial/Compression.h"
69 #include "Geospatial/GDAL.h"
70 #include "Geospatial/Transforms.h"
71 #include "Geospatial/Types.h"
76 #include "Logger/Logger.h"
78 #include "QueryEngine/Execute.h"
80 #include "RenderGroupAnalyzer.h"
81 #include "Shared/DateTimeParser.h"
82 #include "Shared/SqlTypesLayout.h"
84 #include "Shared/file_path_util.h"
85 #include "Shared/import_helpers.h"
86 #include "Shared/likely.h"
87 #include "Shared/measure.h"
88 #include "Shared/misc.h"
89 #include "Shared/scope.h"
90 #include "Shared/shard_key.h"
91 #include "Shared/thread_count.h"
93 
94 #include "gen-cpp/Heavy.h"
95 
96 #ifdef _WIN32
97 #include <fcntl.h>
98 #include <io.h>
99 #endif
100 
101 #define TIMER_STOP(t) \
102  (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
103  t)) / \
104  1.0E6f)
105 
107  32; // Max number of default import threads to use (num hardware threads will be used
108 // if lower, and can also be explicitly overriden in copy statement with threads
109 // option)
110 size_t g_archive_read_buf_size = 1 << 20;
111 
112 std::optional<size_t> g_detect_test_sample_size = std::nullopt;
113 
114 static constexpr int kMaxRasterScanlinesPerThread = 32;
115 
116 inline auto get_filesize(const std::string& file_path) {
117  boost::filesystem::path boost_file_path{file_path};
118  boost::system::error_code ec;
119  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
120  return ec ? 0 : filesize;
121 }
122 
123 namespace {
124 
125 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
126  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
128  executor->getSessionLock());
129  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
130  }
131  return false;
132 }
133 } // namespace
134 
135 // For logging std::vector<std::string> row.
136 namespace boost {
137 namespace log {
138 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
139  out << '[';
140  for (size_t i = 0; i < row.size(); ++i) {
141  out << (i ? ", " : "") << row[i];
142  }
143  out << ']';
144  return out;
145 }
146 } // namespace log
147 } // namespace boost
148 
149 namespace import_export {
150 
151 using FieldNameToIndexMapType = std::map<std::string, size_t>;
152 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
154  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
155 using FeaturePtrVector = std::vector<Geospatial::GDAL::FeatureUqPtr>;
156 
157 #define DEBUG_TIMING false
158 #define DEBUG_RENDER_GROUP_ANALYZER 0
159 #define DEBUG_AWS_AUTHENTICATION 0
160 
161 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
162 
163 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
164 
166 static std::map<std::string, ImportStatus> import_status_map;
167 
168 // max # rows to import
169 static const size_t kImportRowLimit = 10000;
170 
172  const TableDescriptor* t,
173  const std::string& f,
174  const CopyParams& p)
175  : Importer(new Loader(c, t), f, p) {}
176 
177 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
217 
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }
229 
230 ImportStatus Importer::get_import_status(const std::string& import_id) {
232  auto it = import_status_map.find(import_id);
233  if (it == import_status_map.end()) {
234  throw std::runtime_error("Import status not found for id: " + import_id);
235  }
236  return it->second;
237 }
238 
239 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
241  is.end = std::chrono::steady_clock::now();
242  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
244 }
245 
246 static const std::string trim_space(const char* field, const size_t len) {
247  size_t i = 0;
248  size_t j = len;
249  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
250  i++;
251  }
252  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
253  j--;
254  }
255  return std::string(field + i, j - i);
256 }
257 
258 namespace {
260  SQLTypes type;
261  if (ti.is_decimal()) {
262  type = decimal_to_int_type(ti);
263  } else if (ti.is_dict_encoded_string()) {
264  type = string_dict_to_int_type(ti);
265  } else {
266  type = ti.get_type();
267  }
268  return type;
269 }
270 } // namespace
271 
273  Datum d;
274  const auto type = get_type_for_datum(ti);
275  switch (type) {
276  case kBOOLEAN:
278  break;
279  case kBIGINT:
281  break;
282  case kINT:
284  break;
285  case kSMALLINT:
287  break;
288  case kTINYINT:
290  break;
291  case kFLOAT:
293  break;
294  case kDOUBLE:
296  break;
297  case kTIME:
298  case kTIMESTAMP:
299  case kDATE:
301  break;
302  case kPOINT:
303  case kMULTIPOINT:
304  case kLINESTRING:
305  case kMULTILINESTRING:
306  case kPOLYGON:
307  case kMULTIPOLYGON:
308  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
309  default:
310  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
311  }
312  return d;
313 }
314 
315 ArrayDatum StringToArray(const std::string& s,
316  const SQLTypeInfo& ti,
317  const CopyParams& copy_params) {
318  SQLTypeInfo elem_ti = ti.get_elem_type();
319  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
320  return ArrayDatum(0, NULL, true);
321  }
322  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
323  LOG(WARNING) << "Malformed array: " << s;
324  return ArrayDatum(0, NULL, true);
325  }
326  std::vector<std::string> elem_strs;
327  size_t last = 1;
328  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
329  i = s.find(copy_params.array_delim, last)) {
330  elem_strs.push_back(s.substr(last, i - last));
331  last = i + 1;
332  }
333  if (last + 1 <= s.size()) {
334  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
335  }
336  if (elem_strs.size() == 1) {
337  auto str = elem_strs.front();
338  auto str_trimmed = trim_space(str.c_str(), str.length());
339  if (str_trimmed == "") {
340  elem_strs.clear(); // Empty array
341  }
342  }
343  if (!elem_ti.is_string()) {
344  size_t len = elem_strs.size() * elem_ti.get_size();
345  std::unique_ptr<int8_t, FreeDeleter> buf(
346  reinterpret_cast<int8_t*>(checked_malloc(len)));
347  int8_t* p = buf.get();
348  for (auto& es : elem_strs) {
349  auto e = trim_space(es.c_str(), es.length());
350  bool is_null = (e == copy_params.null_str) || e == "NULL";
351  if (!elem_ti.is_string() && e == "") {
352  is_null = true;
353  }
354  if (elem_ti.is_number() || elem_ti.is_time()) {
355  if (!isdigit(e[0]) && e[0] != '-') {
356  is_null = true;
357  }
358  }
359  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
360  p = append_datum(p, d, elem_ti);
361  CHECK(p);
362  }
363  return ArrayDatum(len, buf.release(), false);
364  }
365  // must not be called for array of strings
366  CHECK(false);
367  return ArrayDatum(0, NULL, true);
368 }
369 
371  SQLTypeInfo elem_ti = ti.get_elem_type();
372  auto len = ti.get_size();
373 
374  if (len > 0) {
375  // Compose a NULL fixlen array
376  int8_t* buf = (int8_t*)checked_malloc(len);
377  // First scalar is a NULL_ARRAY sentinel
378  Datum d = NullArrayDatum(elem_ti);
379  int8_t* p = append_datum(buf, d, elem_ti);
380  CHECK(p);
381  // Rest is filled with normal NULL sentinels
382  Datum d0 = NullDatum(elem_ti);
383  while ((p - buf) < len) {
384  p = append_datum(p, d0, elem_ti);
385  CHECK(p);
386  }
387  CHECK((p - buf) == len);
388  return ArrayDatum(len, buf, true);
389  }
390  // NULL varlen array
391  return ArrayDatum(0, NULL, true);
392 }
393 
395  return NullArray(ti);
396 }
397 
399  const SQLTypeInfo& geo_ti) {
400  if (geo_ti.get_compression() == kENCODING_GEOINT) {
401  CHECK(geo_ti.get_comp_param() == 32);
402  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
403  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
404  const size_t len = compressed_null_coords.size();
405  int8_t* buf = (int8_t*)checked_malloc(len);
406  memcpy(buf, compressed_null_coords.data(), len);
407  return ArrayDatum(len, buf, false);
408  }
409  auto modified_ti = coords_ti;
410  modified_ti.set_subtype(kDOUBLE);
412 }
413 
414 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
415  const auto& arr = datum.val.arr_val;
416  for (const auto& elem_datum : arr) {
417  string_vec.push_back(elem_datum.val.str_val);
418  }
419 }
420 
421 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
422  Datum d;
423  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
424  switch (type) {
425  case kBOOLEAN:
426  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
427  break;
428  case kBIGINT:
429  d.bigintval =
430  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
431  break;
432  case kINT:
433  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
434  break;
435  case kSMALLINT:
436  d.smallintval =
437  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
438  break;
439  case kTINYINT:
440  d.tinyintval =
441  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
442  break;
443  case kFLOAT:
444  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
445  break;
446  case kDOUBLE:
447  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
448  break;
449  case kTIME:
450  case kTIMESTAMP:
451  case kDATE:
452  d.bigintval =
453  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
454  break;
455  case kPOINT:
456  case kMULTIPOINT:
457  case kLINESTRING:
458  case kMULTILINESTRING:
459  case kPOLYGON:
460  case kMULTIPOLYGON:
461  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
462  default:
463  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
464  }
465  return d;
466 }
467 
468 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
469  SQLTypeInfo elem_ti = ti.get_elem_type();
470 
471  CHECK(!elem_ti.is_string());
472 
473  if (datum.is_null) {
474  return NullArray(ti);
475  }
476 
477  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
478  int8_t* buf = (int8_t*)checked_malloc(len);
479  int8_t* p = buf;
480  for (auto& e : datum.val.arr_val) {
481  p = append_datum(p, TDatumToDatum(e, elem_ti), elem_ti);
482  CHECK(p);
483  }
484 
485  return ArrayDatum(len, buf, false);
486 }
487 
488 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
490  std::vector<std::string_view> string_view_vec;
491  string_view_vec.reserve(string_vec.size());
492  for (const auto& str : string_vec) {
493  if (str.size() > StringDictionary::MAX_STRLEN) {
494  std::ostringstream oss;
495  oss << "while processing dictionary for column " << getColumnDesc()->columnName
496  << " a string was detected too long for encoding, string length = "
497  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
498  throw std::runtime_error(oss.str());
499  }
500  string_view_vec.push_back(str);
501  }
502  try {
503  switch (column_desc_->columnType.get_size()) {
504  case 1:
505  string_dict_i8_buffer_->resize(string_view_vec.size());
506  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
507  break;
508  case 2:
509  string_dict_i16_buffer_->resize(string_view_vec.size());
510  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
511  break;
512  case 4:
513  string_dict_i32_buffer_->resize(string_view_vec.size());
514  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
515  break;
516  default:
517  CHECK(false);
518  }
519  } catch (std::exception& e) {
520  std::ostringstream oss;
521  oss << "while processing dictionary for column " << getColumnDesc()->columnName
522  << " : " << e.what();
523  LOG(ERROR) << oss.str();
524  throw std::runtime_error(oss.str());
525  }
526 }
527 
529  const std::string_view val,
530  const bool is_null,
531  const CopyParams& copy_params,
532  const bool check_not_null) {
533  const auto type = cd->columnType.get_type();
534  switch (type) {
535  case kBOOLEAN: {
536  if (is_null) {
537  if (check_not_null && cd->columnType.get_notnull()) {
538  throw std::runtime_error("NULL for column " + cd->columnName);
539  }
541  } else {
542  auto ti = cd->columnType;
543  Datum d = StringToDatum(val, ti);
544  addBoolean(static_cast<int8_t>(d.boolval));
545  }
546  break;
547  }
548  case kTINYINT: {
549  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
550  auto ti = cd->columnType;
551  Datum d = StringToDatum(val, ti);
553  } else {
554  if (check_not_null && cd->columnType.get_notnull()) {
555  throw std::runtime_error("NULL for column " + cd->columnName);
556  }
558  }
559  break;
560  }
561  case kSMALLINT: {
562  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
563  auto ti = cd->columnType;
564  Datum d = StringToDatum(val, ti);
566  } else {
567  if (check_not_null && cd->columnType.get_notnull()) {
568  throw std::runtime_error("NULL for column " + cd->columnName);
569  }
571  }
572  break;
573  }
574  case kINT: {
575  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
576  auto ti = cd->columnType;
577  Datum d = StringToDatum(val, ti);
578  addInt(d.intval);
579  } else {
580  if (check_not_null && cd->columnType.get_notnull()) {
581  throw std::runtime_error("NULL for column " + cd->columnName);
582  }
584  }
585  break;
586  }
587  case kBIGINT: {
588  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
589  auto ti = cd->columnType;
590  Datum d = StringToDatum(val, ti);
591  addBigint(d.bigintval);
592  } else {
593  if (check_not_null && cd->columnType.get_notnull()) {
594  throw std::runtime_error("NULL for column " + cd->columnName);
595  }
597  }
598  break;
599  }
600  case kDECIMAL:
601  case kNUMERIC: {
602  if (!is_null) {
603  auto ti = cd->columnType;
604  Datum d = StringToDatum(val, ti);
605  DecimalOverflowValidator validator(ti);
606  validator.validate(d.bigintval);
607  addBigint(d.bigintval);
608  } else {
609  if (check_not_null && cd->columnType.get_notnull()) {
610  throw std::runtime_error("NULL for column " + cd->columnName);
611  }
613  }
614  break;
615  }
616  case kFLOAT:
617  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
618  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
619  } else {
620  if (check_not_null && cd->columnType.get_notnull()) {
621  throw std::runtime_error("NULL for column " + cd->columnName);
622  }
624  }
625  break;
626  case kDOUBLE:
627  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
628  addDouble(std::atof(std::string(val).c_str()));
629  } else {
630  if (check_not_null && cd->columnType.get_notnull()) {
631  throw std::runtime_error("NULL for column " + cd->columnName);
632  }
634  }
635  break;
636  case kTEXT:
637  case kVARCHAR:
638  case kCHAR: {
639  // @TODO(wei) for now, use empty string for nulls
640  if (is_null) {
641  if (check_not_null && cd->columnType.get_notnull()) {
642  throw std::runtime_error("NULL for column " + cd->columnName);
643  }
644  addString(std::string());
645  } else {
646  if (val.length() > StringDictionary::MAX_STRLEN) {
647  throw std::runtime_error("String too long for column " + cd->columnName +
648  " was " + std::to_string(val.length()) + " max is " +
650  }
651  addString(val);
652  }
653  break;
654  }
655  case kTIME:
656  case kTIMESTAMP:
657  case kDATE:
658  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
659  SQLTypeInfo ti = cd->columnType;
660  Datum d = StringToDatum(val, ti);
661  addBigint(d.bigintval);
662  } else {
663  if (check_not_null && cd->columnType.get_notnull()) {
664  throw std::runtime_error("NULL for column " + cd->columnName);
665  }
667  }
668  break;
669  case kARRAY: {
670  if (check_not_null && is_null && cd->columnType.get_notnull()) {
671  throw std::runtime_error("NULL for column " + cd->columnName);
672  }
673  SQLTypeInfo ti = cd->columnType;
674  if (IS_STRING(ti.get_subtype())) {
675  std::vector<std::string> string_vec;
676  // Just parse string array, don't push it to buffer yet as we might throw
678  std::string(val), copy_params, string_vec);
679  if (!is_null) {
680  if (ti.get_size() > 0) {
681  auto sti = ti.get_elem_type();
682  size_t expected_size = ti.get_size() / sti.get_size();
683  size_t actual_size = string_vec.size();
684  if (actual_size != expected_size) {
685  throw std::runtime_error("Fixed length array column " + cd->columnName +
686  " expects " + std::to_string(expected_size) +
687  " values, received " +
688  std::to_string(actual_size));
689  }
690  }
691  addStringArray(string_vec);
692  } else {
693  addStringArray(std::nullopt);
694  }
695  } else {
696  if (!is_null) {
697  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
698  if (d.is_null) { // val could be "NULL"
699  addArray(NullArray(ti));
700  } else {
701  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
702  throw std::runtime_error("Fixed length array for column " + cd->columnName +
703  " has incorrect length: " + std::string(val));
704  }
705  addArray(d);
706  }
707  } else {
708  addArray(NullArray(ti));
709  }
710  }
711  break;
712  }
713  case kPOINT:
714  case kMULTIPOINT:
715  case kLINESTRING:
716  case kMULTILINESTRING:
717  case kPOLYGON:
718  case kMULTIPOLYGON:
719  addGeoString(val);
720  break;
721  default:
722  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
723  }
724 }
725 
727  const auto type = column_desc_->columnType.is_decimal()
730  switch (type) {
731  case kBOOLEAN:
732  bool_buffer_->pop_back();
733  break;
734  case kTINYINT:
735  tinyint_buffer_->pop_back();
736  break;
737  case kSMALLINT:
738  smallint_buffer_->pop_back();
739  break;
740  case kINT:
741  int_buffer_->pop_back();
742  break;
743  case kBIGINT:
744  bigint_buffer_->pop_back();
745  break;
746  case kFLOAT:
747  float_buffer_->pop_back();
748  break;
749  case kDOUBLE:
750  double_buffer_->pop_back();
751  break;
752  case kTEXT:
753  case kVARCHAR:
754  case kCHAR:
755  string_buffer_->pop_back();
756  break;
757  case kDATE:
758  case kTIME:
759  case kTIMESTAMP:
760  bigint_buffer_->pop_back();
761  break;
762  case kARRAY:
764  string_array_buffer_->pop_back();
765  } else {
766  array_buffer_->pop_back();
767  }
768  break;
769  case kPOINT:
770  case kMULTIPOINT:
771  case kLINESTRING:
772  case kMULTILINESTRING:
773  case kPOLYGON:
774  case kMULTIPOLYGON:
775  geo_string_buffer_->pop_back();
776  break;
777  default:
778  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
779  }
780 }
781 
782 struct GeoImportException : std::runtime_error {
783  using std::runtime_error::runtime_error;
784 };
785 
786 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
787 template <typename DATA_TYPE>
789  const ColumnDescriptor* cd,
790  const Array& array,
791  std::vector<DATA_TYPE>& buffer,
792  const ArraySliceRange& slice_range,
793  import_export::BadRowsTracker* const bad_rows_tracker) {
794  auto data =
795  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
796  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
797  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
798  if (bad_rows_tracker && cd->columnType.is_geometry()) {
799  f_add_geo_phy_cols = [&](const int64_t row) {
800  // Populate physical columns (ref. DBHandler::load_table)
801  std::vector<double> coords, bounds;
802  std::vector<int> ring_sizes, poly_rings;
803  int render_group = 0;
804  SQLTypeInfo ti;
805  // replace any unexpected exception from getGeoColumns or other
806  // on this path with a GeoImportException so that we wont over
807  // push a null to the logical column...
808  try {
809  SQLTypeInfo import_ti{ti};
810  if (array.IsNull(row)) {
812  import_ti, coords, bounds, ring_sizes, poly_rings, false);
813  } else {
814  arrow_throw_if<GeoImportException>(
816  ti,
817  coords,
818  bounds,
819  ring_sizes,
820  poly_rings,
821  false),
822  error_context(cd, bad_rows_tracker) + "Invalid geometry");
823  arrow_throw_if<GeoImportException>(
824  cd->columnType.get_type() != ti.get_type(),
825  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
826  }
827  auto col_idx_workpad = col_idx; // what a pitfall!!
829  bad_rows_tracker->importer->getCatalog(),
830  cd,
832  col_idx_workpad,
833  coords,
834  bounds,
835  ring_sizes,
836  poly_rings,
837  render_group);
838  } catch (GeoImportException&) {
839  throw;
840  } catch (std::runtime_error& e) {
841  throw GeoImportException(e.what());
842  } catch (const std::exception& e) {
843  throw GeoImportException(e.what());
844  } catch (...) {
845  throw GeoImportException("unknown exception");
846  }
847  };
848  }
849  auto f_mark_a_bad_row = [&](const auto row) {
850  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
851  bad_rows_tracker->rows.insert(row - slice_range.first);
852  };
853  buffer.reserve(slice_range.second - slice_range.first);
854  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
855  try {
856  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
857  f_add_geo_phy_cols(row);
858  } catch (GeoImportException&) {
859  f_mark_a_bad_row(row);
860  } catch (ArrowImporterException&) {
861  // trace bad rows of each column; otherwise rethrow.
862  if (bad_rows_tracker) {
863  *data << nullptr;
864  f_mark_a_bad_row(row);
865  } else {
866  throw;
867  }
868  }
869  }
870  return buffer.size();
871 }
872 
874  const Array& col,
875  const bool exact_type_match,
876  const ArraySliceRange& slice_range,
877  BadRowsTracker* const bad_rows_tracker) {
878  const auto type = cd->columnType.get_type();
879  if (cd->columnType.get_notnull()) {
880  // We can't have any null values for this column; to have them is an error
881  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
882  }
883 
884  switch (type) {
885  case kBOOLEAN:
886  if (exact_type_match) {
887  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
888  }
890  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
891  case kTINYINT:
892  if (exact_type_match) {
893  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
894  }
896  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
897  case kSMALLINT:
898  if (exact_type_match) {
899  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
900  }
902  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
903  case kINT:
904  if (exact_type_match) {
905  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
906  }
908  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
909  case kBIGINT:
910  case kNUMERIC:
911  case kDECIMAL:
912  if (exact_type_match) {
913  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
914  }
916  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
917  case kFLOAT:
918  if (exact_type_match) {
919  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
920  }
922  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
923  case kDOUBLE:
924  if (exact_type_match) {
925  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
926  }
928  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
929  case kTEXT:
930  case kVARCHAR:
931  case kCHAR:
932  if (exact_type_match) {
933  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
934  "Expected string type");
935  }
937  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
938  case kTIME:
939  if (exact_type_match) {
940  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
941  "Expected time32 or time64 type");
942  }
944  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
945  case kTIMESTAMP:
946  if (exact_type_match) {
947  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
948  }
950  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
951  case kDATE:
952  if (exact_type_match) {
953  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
954  "Expected date32 or date64 type");
955  }
957  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
958  case kPOINT:
959  case kMULTIPOINT:
960  case kLINESTRING:
961  case kMULTILINESTRING:
962  case kPOLYGON:
963  case kMULTIPOLYGON:
964  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
965  "Expected string type");
967  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
968  case kARRAY:
969  throw std::runtime_error("Arrow array appends not yet supported");
970  default:
971  throw std::runtime_error("Invalid Type");
972  }
973 }
974 
975 // this is exclusively used by load_table_binary_columnar
976 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
977  size_t dataSize = 0;
978  if (cd->columnType.get_notnull()) {
979  // We can't have any null values for this column; to have them is an error
980  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
981  throw std::runtime_error("NULL for column " + cd->columnName);
982  }
983  }
984 
985  switch (cd->columnType.get_type()) {
986  case kBOOLEAN: {
987  dataSize = col.data.int_col.size();
988  bool_buffer_->reserve(dataSize);
989  for (size_t i = 0; i < dataSize; i++) {
990  if (col.nulls[i]) {
992  } else {
993  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
994  }
995  }
996  break;
997  }
998  case kTINYINT: {
999  dataSize = col.data.int_col.size();
1000  tinyint_buffer_->reserve(dataSize);
1001  for (size_t i = 0; i < dataSize; i++) {
1002  if (col.nulls[i]) {
1004  } else {
1005  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1006  }
1007  }
1008  break;
1009  }
1010  case kSMALLINT: {
1011  dataSize = col.data.int_col.size();
1012  smallint_buffer_->reserve(dataSize);
1013  for (size_t i = 0; i < dataSize; i++) {
1014  if (col.nulls[i]) {
1016  } else {
1017  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1018  }
1019  }
1020  break;
1021  }
1022  case kINT: {
1023  dataSize = col.data.int_col.size();
1024  int_buffer_->reserve(dataSize);
1025  for (size_t i = 0; i < dataSize; i++) {
1026  if (col.nulls[i]) {
1028  } else {
1029  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1030  }
1031  }
1032  break;
1033  }
1034  case kBIGINT:
1035  case kNUMERIC:
1036  case kDECIMAL: {
1037  dataSize = col.data.int_col.size();
1038  bigint_buffer_->reserve(dataSize);
1039  for (size_t i = 0; i < dataSize; i++) {
1040  if (col.nulls[i]) {
1042  } else {
1043  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1044  }
1045  }
1046  break;
1047  }
1048  case kFLOAT: {
1049  dataSize = col.data.real_col.size();
1050  float_buffer_->reserve(dataSize);
1051  for (size_t i = 0; i < dataSize; i++) {
1052  if (col.nulls[i]) {
1053  float_buffer_->push_back(NULL_FLOAT);
1054  } else {
1055  float_buffer_->push_back((float)col.data.real_col[i]);
1056  }
1057  }
1058  break;
1059  }
1060  case kDOUBLE: {
1061  dataSize = col.data.real_col.size();
1062  double_buffer_->reserve(dataSize);
1063  for (size_t i = 0; i < dataSize; i++) {
1064  if (col.nulls[i]) {
1065  double_buffer_->push_back(NULL_DOUBLE);
1066  } else {
1067  double_buffer_->push_back((double)col.data.real_col[i]);
1068  }
1069  }
1070  break;
1071  }
1072  case kTEXT:
1073  case kVARCHAR:
1074  case kCHAR: {
1075  // TODO: for now, use empty string for nulls
1076  dataSize = col.data.str_col.size();
1077  string_buffer_->reserve(dataSize);
1078  for (size_t i = 0; i < dataSize; i++) {
1079  if (col.nulls[i]) {
1080  string_buffer_->push_back(std::string());
1081  } else {
1082  string_buffer_->push_back(col.data.str_col[i]);
1083  }
1084  }
1085  break;
1086  }
1087  case kTIME:
1088  case kTIMESTAMP:
1089  case kDATE: {
1090  dataSize = col.data.int_col.size();
1091  bigint_buffer_->reserve(dataSize);
1092  for (size_t i = 0; i < dataSize; i++) {
1093  if (col.nulls[i]) {
1095  } else {
1096  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1097  }
1098  }
1099  break;
1100  }
1101  case kPOINT:
1102  case kMULTIPOINT:
1103  case kLINESTRING:
1104  case kMULTILINESTRING:
1105  case kPOLYGON:
1106  case kMULTIPOLYGON: {
1107  dataSize = col.data.str_col.size();
1108  geo_string_buffer_->reserve(dataSize);
1109  for (size_t i = 0; i < dataSize; i++) {
1110  if (col.nulls[i]) {
1111  // TODO: add support for NULL geo
1112  geo_string_buffer_->push_back(std::string());
1113  } else {
1114  geo_string_buffer_->push_back(col.data.str_col[i]);
1115  }
1116  }
1117  break;
1118  }
1119  case kARRAY: {
1120  dataSize = col.data.arr_col.size();
1121  if (IS_STRING(cd->columnType.get_subtype())) {
1122  for (size_t i = 0; i < dataSize; i++) {
1123  OptionalStringVector& string_vec = addStringArray();
1124  if (!col.nulls[i]) {
1125  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1126  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1127  string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1128  }
1129  }
1130  }
1131  } else {
1132  auto elem_ti = cd->columnType.get_subtype();
1133  switch (elem_ti) {
1134  case kBOOLEAN: {
1135  for (size_t i = 0; i < dataSize; i++) {
1136  if (col.nulls[i]) {
1138  } else {
1139  size_t len = col.data.arr_col[i].data.int_col.size();
1140  size_t byteSize = len * sizeof(int8_t);
1141  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1142  int8_t* p = buf;
1143  for (size_t j = 0; j < len; ++j) {
1144  // Explicitly checking the item for null because
1145  // casting null value (-128) to bool results
1146  // incorrect value 1.
1147  if (col.data.arr_col[i].nulls[j]) {
1148  *p = static_cast<int8_t>(
1150  } else {
1151  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1152  }
1153  p += sizeof(bool);
1154  }
1155  addArray(ArrayDatum(byteSize, buf, false));
1156  }
1157  }
1158  break;
1159  }
1160  case kTINYINT: {
1161  for (size_t i = 0; i < dataSize; i++) {
1162  if (col.nulls[i]) {
1164  } else {
1165  size_t len = col.data.arr_col[i].data.int_col.size();
1166  size_t byteSize = len * sizeof(int8_t);
1167  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1168  int8_t* p = buf;
1169  for (size_t j = 0; j < len; ++j) {
1170  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1171  p += sizeof(int8_t);
1172  }
1173  addArray(ArrayDatum(byteSize, buf, false));
1174  }
1175  }
1176  break;
1177  }
1178  case kSMALLINT: {
1179  for (size_t i = 0; i < dataSize; i++) {
1180  if (col.nulls[i]) {
1182  } else {
1183  size_t len = col.data.arr_col[i].data.int_col.size();
1184  size_t byteSize = len * sizeof(int16_t);
1185  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1186  int8_t* p = buf;
1187  for (size_t j = 0; j < len; ++j) {
1188  *(int16_t*)p =
1189  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1190  p += sizeof(int16_t);
1191  }
1192  addArray(ArrayDatum(byteSize, buf, false));
1193  }
1194  }
1195  break;
1196  }
1197  case kINT: {
1198  for (size_t i = 0; i < dataSize; i++) {
1199  if (col.nulls[i]) {
1201  } else {
1202  size_t len = col.data.arr_col[i].data.int_col.size();
1203  size_t byteSize = len * sizeof(int32_t);
1204  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1205  int8_t* p = buf;
1206  for (size_t j = 0; j < len; ++j) {
1207  *(int32_t*)p =
1208  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1209  p += sizeof(int32_t);
1210  }
1211  addArray(ArrayDatum(byteSize, buf, false));
1212  }
1213  }
1214  break;
1215  }
1216  case kBIGINT:
1217  case kNUMERIC:
1218  case kDECIMAL: {
1219  for (size_t i = 0; i < dataSize; i++) {
1220  if (col.nulls[i]) {
1222  } else {
1223  size_t len = col.data.arr_col[i].data.int_col.size();
1224  size_t byteSize = len * sizeof(int64_t);
1225  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1226  int8_t* p = buf;
1227  for (size_t j = 0; j < len; ++j) {
1228  *(int64_t*)p =
1229  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1230  p += sizeof(int64_t);
1231  }
1232  addArray(ArrayDatum(byteSize, buf, false));
1233  }
1234  }
1235  break;
1236  }
1237  case kFLOAT: {
1238  for (size_t i = 0; i < dataSize; i++) {
1239  if (col.nulls[i]) {
1241  } else {
1242  size_t len = col.data.arr_col[i].data.real_col.size();
1243  size_t byteSize = len * sizeof(float);
1244  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1245  int8_t* p = buf;
1246  for (size_t j = 0; j < len; ++j) {
1247  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1248  p += sizeof(float);
1249  }
1250  addArray(ArrayDatum(byteSize, buf, false));
1251  }
1252  }
1253  break;
1254  }
1255  case kDOUBLE: {
1256  for (size_t i = 0; i < dataSize; i++) {
1257  if (col.nulls[i]) {
1259  } else {
1260  size_t len = col.data.arr_col[i].data.real_col.size();
1261  size_t byteSize = len * sizeof(double);
1262  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1263  int8_t* p = buf;
1264  for (size_t j = 0; j < len; ++j) {
1265  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1266  p += sizeof(double);
1267  }
1268  addArray(ArrayDatum(byteSize, buf, false));
1269  }
1270  }
1271  break;
1272  }
1273  case kTIME:
1274  case kTIMESTAMP:
1275  case kDATE: {
1276  for (size_t i = 0; i < dataSize; i++) {
1277  if (col.nulls[i]) {
1279  } else {
1280  size_t len = col.data.arr_col[i].data.int_col.size();
1281  size_t byteWidth = sizeof(int64_t);
1282  size_t byteSize = len * byteWidth;
1283  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1284  int8_t* p = buf;
1285  for (size_t j = 0; j < len; ++j) {
1286  *reinterpret_cast<int64_t*>(p) =
1287  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1288  p += sizeof(int64_t);
1289  }
1290  addArray(ArrayDatum(byteSize, buf, false));
1291  }
1292  }
1293  break;
1294  }
1295  default:
1296  throw std::runtime_error("Invalid Array Type");
1297  }
1298  }
1299  break;
1300  }
1301  default:
1302  throw std::runtime_error("Invalid Type");
1303  }
1304  return dataSize;
1305 }
1306 
1308  const TDatum& datum,
1309  const bool is_null) {
1310  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1311  : cd->columnType.get_type();
1312  switch (type) {
1313  case kBOOLEAN: {
1314  if (is_null) {
1315  if (cd->columnType.get_notnull()) {
1316  throw std::runtime_error("NULL for column " + cd->columnName);
1317  }
1319  } else {
1320  addBoolean((int8_t)datum.val.int_val);
1321  }
1322  break;
1323  }
1324  case kTINYINT:
1325  if (!is_null) {
1326  addTinyint((int8_t)datum.val.int_val);
1327  } else {
1328  if (cd->columnType.get_notnull()) {
1329  throw std::runtime_error("NULL for column " + cd->columnName);
1330  }
1332  }
1333  break;
1334  case kSMALLINT:
1335  if (!is_null) {
1336  addSmallint((int16_t)datum.val.int_val);
1337  } else {
1338  if (cd->columnType.get_notnull()) {
1339  throw std::runtime_error("NULL for column " + cd->columnName);
1340  }
1342  }
1343  break;
1344  case kINT:
1345  if (!is_null) {
1346  addInt((int32_t)datum.val.int_val);
1347  } else {
1348  if (cd->columnType.get_notnull()) {
1349  throw std::runtime_error("NULL for column " + cd->columnName);
1350  }
1352  }
1353  break;
1354  case kBIGINT:
1355  if (!is_null) {
1356  addBigint(datum.val.int_val);
1357  } else {
1358  if (cd->columnType.get_notnull()) {
1359  throw std::runtime_error("NULL for column " + cd->columnName);
1360  }
1362  }
1363  break;
1364  case kFLOAT:
1365  if (!is_null) {
1366  addFloat((float)datum.val.real_val);
1367  } else {
1368  if (cd->columnType.get_notnull()) {
1369  throw std::runtime_error("NULL for column " + cd->columnName);
1370  }
1372  }
1373  break;
1374  case kDOUBLE:
1375  if (!is_null) {
1376  addDouble(datum.val.real_val);
1377  } else {
1378  if (cd->columnType.get_notnull()) {
1379  throw std::runtime_error("NULL for column " + cd->columnName);
1380  }
1382  }
1383  break;
1384  case kTEXT:
1385  case kVARCHAR:
1386  case kCHAR: {
1387  // @TODO(wei) for now, use empty string for nulls
1388  if (is_null) {
1389  if (cd->columnType.get_notnull()) {
1390  throw std::runtime_error("NULL for column " + cd->columnName);
1391  }
1392  addString(std::string());
1393  } else {
1394  addString(datum.val.str_val);
1395  }
1396  break;
1397  }
1398  case kTIME:
1399  case kTIMESTAMP:
1400  case kDATE: {
1401  if (!is_null) {
1402  addBigint(datum.val.int_val);
1403  } else {
1404  if (cd->columnType.get_notnull()) {
1405  throw std::runtime_error("NULL for column " + cd->columnName);
1406  }
1408  }
1409  break;
1410  }
1411  case kARRAY:
1412  if (is_null && cd->columnType.get_notnull()) {
1413  throw std::runtime_error("NULL for column " + cd->columnName);
1414  }
1415  if (IS_STRING(cd->columnType.get_subtype())) {
1416  OptionalStringVector& string_vec = addStringArray();
1417  addBinaryStringArray(datum, *string_vec);
1418  } else {
1419  if (!is_null) {
1420  addArray(TDatumToArrayDatum(datum, cd->columnType));
1421  } else {
1423  }
1424  }
1425  break;
1426  case kPOINT:
1427  case kMULTIPOINT:
1428  case kLINESTRING:
1429  case kMULTILINESTRING:
1430  case kPOLYGON:
1431  case kMULTIPOLYGON:
1432  if (is_null) {
1433  if (cd->columnType.get_notnull()) {
1434  throw std::runtime_error("NULL for column " + cd->columnName);
1435  }
1436  addGeoString(std::string());
1437  } else {
1438  addGeoString(datum.val.str_val);
1439  }
1440  break;
1441  default:
1442  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1443  }
1444 }
1445 
1446 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1447  bool is_null = !cd->default_value.has_value();
1448  CHECK(!(is_null && cd->columnType.get_notnull()));
1449  const auto type = cd->columnType.get_type();
1450  auto ti = cd->columnType;
1451  auto val = cd->default_value.value_or("NULL");
1452  CopyParams cp;
1453  switch (type) {
1454  case kBOOLEAN: {
1455  if (!is_null) {
1456  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1457  } else {
1458  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1459  }
1460  break;
1461  }
1462  case kTINYINT: {
1463  if (!is_null) {
1464  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1465  } else {
1467  }
1468  break;
1469  }
1470  case kSMALLINT: {
1471  if (!is_null) {
1472  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1473  } else {
1474  smallint_buffer_->resize(num_rows,
1476  }
1477  break;
1478  }
1479  case kINT: {
1480  if (!is_null) {
1481  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1482  } else {
1483  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1484  }
1485  break;
1486  }
1487  case kBIGINT: {
1488  if (!is_null) {
1489  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1490  } else {
1492  }
1493  break;
1494  }
1495  case kDECIMAL:
1496  case kNUMERIC: {
1497  if (!is_null) {
1498  const auto converted_decimal_value = convert_decimal_value_to_scale(
1499  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1500  bigint_buffer_->resize(num_rows, converted_decimal_value);
1501  } else {
1503  }
1504  break;
1505  }
1506  case kFLOAT:
1507  if (!is_null) {
1508  float_buffer_->resize(num_rows,
1509  static_cast<float>(std::atof(std::string(val).c_str())));
1510  } else {
1511  float_buffer_->resize(num_rows, NULL_FLOAT);
1512  }
1513  break;
1514  case kDOUBLE:
1515  if (!is_null) {
1516  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1517  } else {
1518  double_buffer_->resize(num_rows, NULL_DOUBLE);
1519  }
1520  break;
1521  case kTEXT:
1522  case kVARCHAR:
1523  case kCHAR: {
1524  if (is_null) {
1525  string_buffer_->resize(num_rows, "");
1526  } else {
1527  if (val.length() > StringDictionary::MAX_STRLEN) {
1528  throw std::runtime_error("String too long for column " + cd->columnName +
1529  " was " + std::to_string(val.length()) + " max is " +
1531  }
1532  string_buffer_->resize(num_rows, val);
1533  }
1534  break;
1535  }
1536  case kTIME:
1537  case kTIMESTAMP:
1538  case kDATE:
1539  if (!is_null) {
1540  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1541  } else {
1543  }
1544  break;
1545  case kARRAY: {
1546  if (IS_STRING(ti.get_subtype())) {
1547  std::vector<std::string> string_vec;
1548  // Just parse string array, don't push it to buffer yet as we might throw
1550  std::string(val), cp, string_vec);
1551  if (!is_null) {
1552  // TODO: add support for NULL string arrays
1553  if (ti.get_size() > 0) {
1554  auto sti = ti.get_elem_type();
1555  size_t expected_size = ti.get_size() / sti.get_size();
1556  size_t actual_size = string_vec.size();
1557  if (actual_size != expected_size) {
1558  throw std::runtime_error("Fixed length array column " + cd->columnName +
1559  " expects " + std::to_string(expected_size) +
1560  " values, received " +
1561  std::to_string(actual_size));
1562  }
1563  }
1564  string_array_buffer_->resize(num_rows, string_vec);
1565  } else {
1566  if (ti.get_size() > 0) {
1567  // TODO: remove once NULL fixlen arrays are allowed
1568  throw std::runtime_error("Fixed length array column " + cd->columnName +
1569  " currently cannot accept NULL arrays");
1570  }
1571  // TODO: add support for NULL string arrays, replace with addStringArray(),
1572  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1573  string_array_buffer_->resize(num_rows, string_vec);
1574  }
1575  } else {
1576  if (!is_null) {
1577  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1578  if (d.is_null) { // val could be "NULL"
1579  array_buffer_->resize(num_rows, NullArray(ti));
1580  } else {
1581  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1582  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1583  " has incorrect length: " + std::string(val));
1584  }
1585  array_buffer_->resize(num_rows, d);
1586  }
1587  } else {
1588  array_buffer_->resize(num_rows, NullArray(ti));
1589  }
1590  }
1591  break;
1592  }
1593  case kPOINT:
1594  case kMULTIPOINT:
1595  case kLINESTRING:
1596  case kMULTILINESTRING:
1597  case kPOLYGON:
1598  case kMULTIPOLYGON:
1599  geo_string_buffer_->resize(num_rows, val);
1600  break;
1601  default:
1602  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1603  << type;
1604  }
1605 }
1606 
1607 bool importGeoFromLonLat(double lon,
1608  double lat,
1609  std::vector<double>& coords,
1610  SQLTypeInfo& ti) {
1611  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1612  return false;
1613  }
1614  if (ti.transforms()) {
1615  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1616  if (!pt.transform(ti)) {
1617  return false;
1618  }
1619  pt.getColumns(coords);
1620  return true;
1621  }
1622  coords.push_back(lon);
1623  coords.push_back(lat);
1624  return true;
1625 }
1626 
1628  const Catalog_Namespace::Catalog& catalog,
1629  const ColumnDescriptor* cd,
1630  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1631  size_t& col_idx,
1632  std::vector<double>& coords,
1633  std::vector<double>& bounds,
1634  std::vector<int>& ring_sizes,
1635  std::vector<int>& poly_rings,
1636  int render_group,
1637  const bool force_null) {
1638  const auto col_ti = cd->columnType;
1639  const auto col_type = col_ti.get_type();
1640  auto columnId = cd->columnId;
1641  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  bool is_null_geo = false;
1643  bool is_null_point = false;
1644  if (!col_ti.get_notnull()) {
1645  // Check for NULL geo
1646  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1647  is_null_point = true;
1648  coords.clear();
1649  }
1650  is_null_geo = coords.empty();
1651  if (is_null_point) {
1652  coords.push_back(NULL_ARRAY_DOUBLE);
1653  coords.push_back(NULL_DOUBLE);
1654  // Treating POINT coords as notnull, need to store actual encoding
1655  // [un]compressed+[not]null
1656  is_null_geo = false;
1657  }
1658  }
1659  if (force_null) {
1660  is_null_geo = true;
1661  }
1662  TDatum tdd_coords;
1663  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1664  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1665  // in a fixlen array, compressed and uncompressed.
1666  if (!is_null_geo) {
1667  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1668  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1669  for (auto cc : compressed_coords) {
1670  tdd_coords.val.arr_val.emplace_back();
1671  tdd_coords.val.arr_val.back().val.int_val = cc;
1672  }
1673  }
1674  tdd_coords.is_null = is_null_geo;
1675  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1676 
1677  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1678  // Create [linest]ring_sizes array value and add it to the physical column
1679  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1680  TDatum tdd_ring_sizes;
1681  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1682  if (!is_null_geo) {
1683  for (auto ring_size : ring_sizes) {
1684  tdd_ring_sizes.val.arr_val.emplace_back();
1685  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1686  }
1687  }
1688  tdd_ring_sizes.is_null = is_null_geo;
1689  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1690  }
1691 
1692  if (col_type == kMULTIPOLYGON) {
1693  // Create poly_rings array value and add it to the physical column
1694  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1695  TDatum tdd_poly_rings;
1696  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1697  if (!is_null_geo) {
1698  for (auto num_rings : poly_rings) {
1699  tdd_poly_rings.val.arr_val.emplace_back();
1700  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1701  }
1702  }
1703  tdd_poly_rings.is_null = is_null_geo;
1704  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1705  }
1706 
1707  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1708  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1709  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1710  TDatum tdd_bounds;
1711  tdd_bounds.val.arr_val.reserve(bounds.size());
1712  if (!is_null_geo) {
1713  for (auto b : bounds) {
1714  tdd_bounds.val.arr_val.emplace_back();
1715  tdd_bounds.val.arr_val.back().val.real_val = b;
1716  }
1717  }
1718  tdd_bounds.is_null = is_null_geo;
1719  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1720  }
1721 
1722  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1723  // Create render_group value and add it to the physical column
1724  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1725  TDatum td_render_group;
1726  td_render_group.val.int_val = render_group;
1727  td_render_group.is_null = is_null_geo;
1728  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1729  }
1730 }
1731 
1733  const Catalog_Namespace::Catalog& catalog,
1734  const ColumnDescriptor* cd,
1735  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1736  size_t& col_idx,
1737  std::vector<std::vector<double>>& coords_column,
1738  std::vector<std::vector<double>>& bounds_column,
1739  std::vector<std::vector<int>>& ring_sizes_column,
1740  std::vector<std::vector<int>>& poly_rings_column,
1741  std::vector<int>& render_groups_column) {
1742  const auto col_ti = cd->columnType;
1743  const auto col_type = col_ti.get_type();
1744  auto columnId = cd->columnId;
1745 
1746  auto coords_row_count = coords_column.size();
1747  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1748  for (auto& coords : coords_column) {
1749  bool is_null_geo = false;
1750  bool is_null_point = false;
1751  if (!col_ti.get_notnull()) {
1752  // Check for NULL geo
1753  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1754  is_null_point = true;
1755  coords.clear();
1756  }
1757  is_null_geo = coords.empty();
1758  if (is_null_point) {
1759  coords.push_back(NULL_ARRAY_DOUBLE);
1760  coords.push_back(NULL_DOUBLE);
1761  // Treating POINT coords as notnull, need to store actual encoding
1762  // [un]compressed+[not]null
1763  is_null_geo = false;
1764  }
1765  }
1766  std::vector<TDatum> td_coords_data;
1767  if (!is_null_geo) {
1768  std::vector<uint8_t> compressed_coords =
1769  Geospatial::compress_coords(coords, col_ti);
1770  for (auto const& cc : compressed_coords) {
1771  TDatum td_byte;
1772  td_byte.val.int_val = cc;
1773  td_coords_data.push_back(td_byte);
1774  }
1775  }
1776  TDatum tdd_coords;
1777  tdd_coords.val.arr_val = td_coords_data;
1778  tdd_coords.is_null = is_null_geo;
1779  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1780  }
1781  col_idx++;
1782 
1783  if (col_type == kMULTILINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1784  if (ring_sizes_column.size() != coords_row_count) {
1785  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1786  }
1787  // Create [linest[ring_sizes array value and add it to the physical column
1788  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1789  for (auto const& ring_sizes : ring_sizes_column) {
1790  bool is_null_geo = false;
1791  if (!col_ti.get_notnull()) {
1792  // Check for NULL geo
1793  is_null_geo = ring_sizes.empty();
1794  }
1795  std::vector<TDatum> td_ring_sizes;
1796  for (auto const& ring_size : ring_sizes) {
1797  TDatum td_ring_size;
1798  td_ring_size.val.int_val = ring_size;
1799  td_ring_sizes.push_back(td_ring_size);
1800  }
1801  TDatum tdd_ring_sizes;
1802  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1803  tdd_ring_sizes.is_null = is_null_geo;
1804  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1805  }
1806  col_idx++;
1807  }
1808 
1809  if (col_type == kMULTIPOLYGON) {
1810  if (poly_rings_column.size() != coords_row_count) {
1811  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1812  }
1813  // Create poly_rings array value and add it to the physical column
1814  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1815  for (auto const& poly_rings : poly_rings_column) {
1816  bool is_null_geo = false;
1817  if (!col_ti.get_notnull()) {
1818  // Check for NULL geo
1819  is_null_geo = poly_rings.empty();
1820  }
1821  std::vector<TDatum> td_poly_rings;
1822  for (auto const& num_rings : poly_rings) {
1823  TDatum td_num_rings;
1824  td_num_rings.val.int_val = num_rings;
1825  td_poly_rings.push_back(td_num_rings);
1826  }
1827  TDatum tdd_poly_rings;
1828  tdd_poly_rings.val.arr_val = td_poly_rings;
1829  tdd_poly_rings.is_null = is_null_geo;
1830  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1831  }
1832  col_idx++;
1833  }
1834 
1835  if (col_type == kLINESTRING || col_type == kMULTILINESTRING || col_type == kPOLYGON ||
1836  col_type == kMULTIPOLYGON || col_type == kMULTIPOINT) {
1837  if (bounds_column.size() != coords_row_count) {
1838  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1839  }
1840  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1841  for (auto const& bounds : bounds_column) {
1842  bool is_null_geo = false;
1843  if (!col_ti.get_notnull()) {
1844  // Check for NULL geo
1845  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1846  }
1847  std::vector<TDatum> td_bounds_data;
1848  for (auto const& b : bounds) {
1849  TDatum td_double;
1850  td_double.val.real_val = b;
1851  td_bounds_data.push_back(td_double);
1852  }
1853  TDatum tdd_bounds;
1854  tdd_bounds.val.arr_val = td_bounds_data;
1855  tdd_bounds.is_null = is_null_geo;
1856  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1857  }
1858  col_idx++;
1859  }
1860 
1861  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1862  // Create render_group value and add it to the physical column
1863  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1864  for (auto const& render_group : render_groups_column) {
1865  TDatum td_render_group;
1866  td_render_group.val.int_val = render_group;
1867  td_render_group.is_null = false;
1868  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1869  }
1870  col_idx++;
1871  }
1872 }
1873 
1874 namespace {
1875 
1876 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1877  const std::list<const ColumnDescriptor*>& col_descs) {
1878  // validate the columns
1879  // for now we can only explode into a single destination column
1880  // which must be of the child type (POLYGON, LINESTRING, POINT)
1881  int collection_col_idx = -1;
1882  int col_idx = 0;
1883  std::string collection_col_name;
1884  SQLTypes collection_child_type = kNULLT;
1885  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1886  auto const& cd = *cd_it;
1887  auto const col_type = cd->columnType.get_type();
1888  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1889  if (collection_col_idx >= 0) {
1890  throw std::runtime_error(
1891  "Explode Collections: Found more than one destination column");
1892  }
1893  collection_col_idx = col_idx;
1894  collection_child_type = col_type;
1895  collection_col_name = cd->columnName;
1896  }
1897  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1898  ++cd_it;
1899  }
1900  col_idx++;
1901  }
1902  if (collection_col_idx < 0) {
1903  throw std::runtime_error(
1904  "Explode Collections: Failed to find a supported column type to explode "
1905  "into");
1906  }
1907  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1908 }
1909 
1911  OGRGeometry* ogr_geometry,
1912  SQLTypes collection_child_type,
1913  const std::string& collection_col_name,
1914  size_t row_or_feature_idx,
1915  std::function<void(OGRGeometry*)> execute_import_lambda) {
1916  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1917  bool is_collection = false;
1918  switch (collection_child_type) {
1919  case kPOINT:
1920  switch (ogr_geometry_type) {
1921  case wkbMultiPoint:
1922  is_collection = true;
1923  break;
1924  case wkbPoint:
1925  break;
1926  default:
1927  throw std::runtime_error(
1928  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1929  }
1930  break;
1931  case kLINESTRING:
1932  switch (ogr_geometry_type) {
1933  case wkbMultiLineString:
1934  is_collection = true;
1935  break;
1936  case wkbLineString:
1937  break;
1938  default:
1939  throw std::runtime_error(
1940  "Explode Collections: Source geo type must be MULTILINESTRING or "
1941  "LINESTRING");
1942  }
1943  break;
1944  case kPOLYGON:
1945  switch (ogr_geometry_type) {
1946  case wkbMultiPolygon:
1947  is_collection = true;
1948  break;
1949  case wkbPolygon:
1950  break;
1951  default:
1952  throw std::runtime_error(
1953  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1954  }
1955  break;
1956  default:
1957  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1958  }
1959 
1960  int64_t us = 0LL;
1961 
1962  // explode or just import
1963  if (is_collection) {
1964  // cast to collection
1965  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1966  CHECK(collection_geometry);
1967 
1968 #if LOG_EXPLODE_COLLECTIONS
1969  // log number of children
1970  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1971  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1972  << " child rows";
1973 #endif
1974 
1975  // loop over children
1976  uint32_t child_geometry_count = 0;
1977  auto child_geometry_it = collection_geometry->begin();
1978  while (child_geometry_it != collection_geometry->end()) {
1979  // get and import this child
1980  OGRGeometry* import_geometry = *child_geometry_it;
1982  [&] { execute_import_lambda(import_geometry); });
1983 
1984  // next child
1985  child_geometry_it++;
1986  child_geometry_count++;
1987  }
1988  } else {
1989  // import non-collection row just once
1991  [&] { execute_import_lambda(ogr_geometry); });
1992  }
1993 
1994  // done
1995  return us;
1996 }
1997 
1998 } // namespace
1999 
2001  int thread_id,
2002  Importer* importer,
2003  std::unique_ptr<char[]> scratch_buffer,
2004  size_t begin_pos,
2005  size_t end_pos,
2006  size_t total_size,
2007  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2008  size_t first_row_index_this_buffer,
2009  const Catalog_Namespace::SessionInfo* session_info,
2010  Executor* executor) {
2011  ImportStatus thread_import_status;
2012  int64_t total_get_row_time_us = 0;
2013  int64_t total_str_to_val_time_us = 0;
2014  auto query_session = session_info ? session_info->get_session_id() : "";
2015  CHECK(scratch_buffer);
2016  auto buffer = scratch_buffer.get();
2017  auto load_ms = measure<>::execution([]() {});
2018 
2019  thread_import_status.thread_id = thread_id;
2020 
2021  auto ms = measure<>::execution([&]() {
2022  const CopyParams& copy_params = importer->get_copy_params();
2023  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2024  size_t begin =
2025  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2026  const char* thread_buf = buffer + begin_pos + begin;
2027  const char* thread_buf_end = buffer + end_pos;
2028  const char* buf_end = buffer + total_size;
2029  bool try_single_thread = false;
2030  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2031  importer->get_import_buffers(thread_id);
2033  int phys_cols = 0;
2034  int point_cols = 0;
2035  for (const auto cd : col_descs) {
2036  const auto& col_ti = cd->columnType;
2037  phys_cols += col_ti.get_physical_cols();
2038  if (cd->columnType.get_type() == kPOINT) {
2039  point_cols++;
2040  }
2041  }
2042  auto num_cols = col_descs.size() - phys_cols;
2043  for (const auto& p : import_buffers) {
2044  p->clear();
2045  }
2046  std::vector<std::string_view> row;
2047  size_t row_index_plus_one = 0;
2048  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2049  row.clear();
2050  std::vector<std::unique_ptr<char[]>>
2051  tmp_buffers; // holds string w/ removed escape chars, etc
2052  if (DEBUG_TIMING) {
2055  thread_buf_end,
2056  buf_end,
2057  copy_params,
2058  importer->get_is_array(),
2059  row,
2060  tmp_buffers,
2061  try_single_thread,
2062  true);
2063  });
2064  total_get_row_time_us += us;
2065  } else {
2067  thread_buf_end,
2068  buf_end,
2069  copy_params,
2070  importer->get_is_array(),
2071  row,
2072  tmp_buffers,
2073  try_single_thread,
2074  true);
2075  }
2076  row_index_plus_one++;
2077  // Each POINT could consume two separate coords instead of a single WKT
2078  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2079  thread_import_status.rows_rejected++;
2080  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2081  << row.size() << "): " << shared::printContainer(row);
2082  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2083  break;
2084  }
2085  continue;
2086  }
2087 
2088  //
2089  // lambda for importing a row (perhaps multiple times if exploding a collection)
2090  //
2091 
2092  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2093  size_t import_idx = 0;
2094  size_t col_idx = 0;
2095  try {
2096  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2097  auto cd = *cd_it;
2098  const auto& col_ti = cd->columnType;
2099 
2100  bool is_null =
2101  ImportHelpers::is_null_datum(row[import_idx], copy_params.null_str);
2102  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2103  // So initially nullness may be missed and not passed to add_value,
2104  // which then might also check and still decide it's actually a NULL, e.g.
2105  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2106  // So "NULL" is not recognized as NULL but then it's not recognized as
2107  // a valid kINT, so it's a NULL after all.
2108  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2109 
2110  // Treating empty as NULL
2111  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2112  is_null = true;
2113  }
2114  if (!cd->columnType.is_string() && !copy_params.trim_spaces) {
2115  // everything but strings should be always trimmed
2116  row[import_idx] = sv_strip(row[import_idx]);
2117  }
2118 
2119  if (col_ti.get_physical_cols() == 0) {
2120  // not geo
2121 
2122  import_buffers[col_idx]->add_value(
2123  cd, row[import_idx], is_null, copy_params);
2124 
2125  // next
2126  ++import_idx;
2127  ++col_idx;
2128  } else {
2129  // geo
2130 
2131  // store null string in the base column
2132  import_buffers[col_idx]->add_value(
2133  cd, copy_params.null_str, true, copy_params);
2134 
2135  // WKT from string we're not storing
2136  auto const& geo_string = row[import_idx];
2137 
2138  // next
2139  ++import_idx;
2140  ++col_idx;
2141 
2142  SQLTypes col_type = col_ti.get_type();
2143  CHECK(IS_GEO(col_type));
2144 
2145  std::vector<double> coords;
2146  std::vector<double> bounds;
2147  std::vector<int> ring_sizes;
2148  std::vector<int> poly_rings;
2149  int render_group = 0;
2150 
2151  // if this is a POINT column, and the field is not null, and
2152  // looks like a scalar numeric value (and not a hex blob)
2153  // attempt to import two columns as lon/lat (or lat/lon)
2154  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
2155  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2156  geo_string[0] == '-') &&
2157  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2158  double lon = std::atof(std::string(geo_string).c_str());
2159  double lat = NAN;
2160  auto lat_str = row[import_idx];
2161  ++import_idx;
2162  if (lat_str.size() > 0 &&
2163  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2164  lat = std::atof(std::string(lat_str).c_str());
2165  }
2166  // Swap coordinates if this table uses a reverse order: lat/lon
2167  if (!copy_params.lonlat) {
2168  std::swap(lat, lon);
2169  }
2170  // TODO: should check if POINT column should have been declared with
2171  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2172  // throw std::runtime_error("POINT column " + cd->columnName + " is
2173  // not WGS84, cannot insert lon/lat");
2174  // }
2175  SQLTypeInfo import_ti{col_ti};
2176  if (copy_params.source_type ==
2178  import_ti.get_output_srid() == 4326) {
2179  auto srid0 = copy_params.source_srid;
2180  if (srid0 > 0) {
2181  // srid0 -> 4326 transform is requested on import
2182  import_ti.set_input_srid(srid0);
2183  }
2184  }
2185  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2186  throw std::runtime_error(
2187  "Cannot read lon/lat to insert into POINT column " +
2188  cd->columnName);
2189  }
2190  } else {
2191  // import it
2192  SQLTypeInfo import_ti{col_ti};
2193  if (copy_params.source_type ==
2195  import_ti.get_output_srid() == 4326) {
2196  auto srid0 = copy_params.source_srid;
2197  if (srid0 > 0) {
2198  // srid0 -> 4326 transform is requested on import
2199  import_ti.set_input_srid(srid0);
2200  }
2201  }
2202  if (is_null) {
2203  if (col_ti.get_notnull()) {
2204  throw std::runtime_error("NULL geo for column " + cd->columnName);
2205  }
2207  import_ti,
2208  coords,
2209  bounds,
2210  ring_sizes,
2211  poly_rings,
2213  } else {
2214  if (import_geometry) {
2215  // geometry already exploded
2217  import_geometry,
2218  import_ti,
2219  coords,
2220  bounds,
2221  ring_sizes,
2222  poly_rings,
2224  std::string msg =
2225  "Failed to extract valid geometry from exploded row " +
2226  std::to_string(first_row_index_this_buffer +
2227  row_index_plus_one) +
2228  " for column " + cd->columnName;
2229  throw std::runtime_error(msg);
2230  }
2231  } else {
2232  // extract geometry directly from WKT
2234  std::string(geo_string),
2235  import_ti,
2236  coords,
2237  bounds,
2238  ring_sizes,
2239  poly_rings,
2241  std::string msg = "Failed to extract valid geometry from row " +
2242  std::to_string(first_row_index_this_buffer +
2243  row_index_plus_one) +
2244  " for column " + cd->columnName;
2245  throw std::runtime_error(msg);
2246  }
2247  }
2248 
2249  // validate types
2250  if (col_type != import_ti.get_type()) {
2252  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2253  col_type == SQLTypes::kMULTIPOLYGON)) {
2254  throw std::runtime_error(
2255  "Imported geometry doesn't match the type of column " +
2256  cd->columnName);
2257  }
2258  }
2259  }
2260 
2261  // assign render group?
2262  if (columnIdToRenderGroupAnalyzerMap.size()) {
2263  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2264  if (ring_sizes.size()) {
2265  // get a suitable render group for these poly coords
2266  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2267  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2268  render_group =
2269  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2270  } else {
2271  // empty poly
2272  render_group = -1;
2273  }
2274  }
2275  }
2276  }
2277 
2278  // import extracted geo
2280  cd,
2281  import_buffers,
2282  col_idx,
2283  coords,
2284  bounds,
2285  ring_sizes,
2286  poly_rings,
2287  render_group);
2288 
2289  // skip remaining physical columns
2290  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2291  ++cd_it;
2292  }
2293  }
2294  }
2295  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2296  check_session_interrupted(query_session, executor))) {
2297  thread_import_status.load_failed = true;
2298  thread_import_status.load_msg =
2299  "Table load was cancelled via Query Interrupt";
2300  return;
2301  }
2302  thread_import_status.rows_completed++;
2303  } catch (const std::exception& e) {
2304  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2305  import_buffers[col_idx_to_pop]->pop_value();
2306  }
2307  thread_import_status.rows_rejected++;
2308  LOG(ERROR) << "Input exception thrown: " << e.what()
2309  << ". Row discarded. Data: " << shared::printContainer(row);
2310  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2311  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2312  thread_import_status.load_failed = true;
2313  thread_import_status.load_msg =
2314  "Load was cancelled due to max reject rows being reached";
2315  }
2316  }
2317  }; // End of lambda
2318 
2319  if (copy_params.geo_explode_collections) {
2320  // explode and import
2321  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2322  explode_collections_step1(col_descs);
2323  // pull out the collection WKT or WKB hex
2324  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2325  auto const& collection_geo_string = row[collection_col_idx];
2326  // convert to OGR
2327  OGRGeometry* ogr_geometry = nullptr;
2328  ScopeGuard destroy_ogr_geometry = [&] {
2329  if (ogr_geometry) {
2330  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2331  }
2332  };
2334  std::string(collection_geo_string));
2335  // do the explode and import
2336  us = explode_collections_step2(ogr_geometry,
2337  collection_child_type,
2338  collection_col_name,
2339  first_row_index_this_buffer + row_index_plus_one,
2340  execute_import_row);
2341  } else {
2342  // import non-collection row just once
2344  [&] { execute_import_row(nullptr); });
2345  }
2346 
2347  if (thread_import_status.load_failed) {
2348  break;
2349  }
2350  } // end thread
2351  total_str_to_val_time_us += us;
2352  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2353  load_ms = measure<>::execution([&]() {
2354  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2355  });
2356  }
2357  }); // end execution
2358 
2359  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2360  thread_import_status.rows_completed > 0) {
2361  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2362  << thread_import_status.rows_completed << " rows inserted in "
2363  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2364  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2365  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2366  << "sec" << std::endl;
2367  }
2368 
2369  return thread_import_status;
2370 }
2371 
2372 class ColumnNotGeoError : public std::runtime_error {
2373  public:
2374  ColumnNotGeoError(const std::string& column_name)
2375  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2376 };
2377 
2379  int thread_id,
2380  Importer* importer,
2381  OGRCoordinateTransformation* coordinate_transformation,
2382  const FeaturePtrVector& features,
2383  size_t firstFeature,
2384  size_t numFeatures,
2385  const FieldNameToIndexMapType& fieldNameToIndexMap,
2386  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2387  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2388  const Catalog_Namespace::SessionInfo* session_info,
2389  Executor* executor,
2390  const MetadataColumnInfos& metadata_column_infos) {
2391  ImportStatus thread_import_status;
2392  const CopyParams& copy_params = importer->get_copy_params();
2393  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2394  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2395  importer->get_import_buffers(thread_id);
2396  auto query_session = session_info ? session_info->get_session_id() : "";
2397  for (const auto& p : import_buffers) {
2398  p->clear();
2399  }
2400 
2401  auto convert_timer = timer_start();
2402 
2403  // for all the features in this chunk...
2404  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2405  // ignore null features
2406  if (!features[iFeature]) {
2407  continue;
2408  }
2409 
2410  // get this feature's geometry
2411  // for geodatabase, we need to consider features with no geometry
2412  // as we still want to create a table, even if it has no geo column
2413  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2414  if (pGeometry && coordinate_transformation) {
2415  pGeometry->transform(coordinate_transformation);
2416  }
2417 
2418  //
2419  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2420  //
2421 
2422  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2423  size_t col_idx = 0;
2424  try {
2425  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2426  check_session_interrupted(query_session, executor))) {
2427  thread_import_status.load_failed = true;
2428  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2430  }
2431 
2432  uint32_t field_column_count{0u};
2433  uint32_t metadata_column_count{0u};
2434 
2435  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2436  auto cd = *cd_it;
2437 
2438  // is this a geo column?
2439  const auto& col_ti = cd->columnType;
2440  if (col_ti.is_geometry()) {
2441  // Note that this assumes there is one and only one geo column in the
2442  // table. Currently, the importer only supports reading a single
2443  // geospatial feature from an input shapefile / geojson file, but this
2444  // code will need to be modified if that changes
2445  SQLTypes col_type = col_ti.get_type();
2446 
2447  // store null string in the base column
2448  import_buffers[col_idx]->add_value(
2449  cd, copy_params.null_str, true, copy_params);
2450  ++col_idx;
2451 
2452  // the data we now need to extract for the other columns
2453  std::vector<double> coords;
2454  std::vector<double> bounds;
2455  std::vector<int> ring_sizes;
2456  std::vector<int> poly_rings;
2457  int render_group = 0;
2458 
2459  // extract it
2460  SQLTypeInfo import_ti{col_ti};
2461  bool is_null_geo = !import_geometry;
2462  if (is_null_geo) {
2463  if (col_ti.get_notnull()) {
2464  throw std::runtime_error("NULL geo for column " + cd->columnName);
2465  }
2467  import_ti,
2468  coords,
2469  bounds,
2470  ring_sizes,
2471  poly_rings,
2473  } else {
2475  import_geometry,
2476  import_ti,
2477  coords,
2478  bounds,
2479  ring_sizes,
2480  poly_rings,
2482  std::string msg = "Failed to extract valid geometry from feature " +
2483  std::to_string(firstFeature + iFeature + 1) +
2484  " for column " + cd->columnName;
2485  throw std::runtime_error(msg);
2486  }
2487 
2488  // validate types
2489  if (col_type != import_ti.get_type()) {
2491  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2492  col_type == SQLTypes::kMULTIPOLYGON)) {
2493  throw std::runtime_error(
2494  "Imported geometry doesn't match the type of column " +
2495  cd->columnName);
2496  }
2497  }
2498  }
2499 
2500  if (columnIdToRenderGroupAnalyzerMap.size()) {
2501  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2502  if (ring_sizes.size()) {
2503  // get a suitable render group for these poly coords
2504  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2505  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2506  render_group =
2507  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2508  } else {
2509  // empty poly
2510  render_group = -1;
2511  }
2512  }
2513  }
2514 
2515  // create coords array value and add it to the physical column
2516  ++cd_it;
2517  auto cd_coords = *cd_it;
2518  std::vector<TDatum> td_coord_data;
2519  if (!is_null_geo) {
2520  std::vector<uint8_t> compressed_coords =
2521  Geospatial::compress_coords(coords, col_ti);
2522  for (auto cc : compressed_coords) {
2523  TDatum td_byte;
2524  td_byte.val.int_val = cc;
2525  td_coord_data.push_back(td_byte);
2526  }
2527  }
2528  TDatum tdd_coords;
2529  tdd_coords.val.arr_val = td_coord_data;
2530  tdd_coords.is_null = is_null_geo;
2531  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2532  ++col_idx;
2533 
2534  if (col_type == kMULTILINESTRING || col_type == kPOLYGON ||
2535  col_type == kMULTIPOLYGON) {
2536  // Create [linest]ring_sizes array value and add it to the physical column
2537  ++cd_it;
2538  auto cd_ring_sizes = *cd_it;
2539  std::vector<TDatum> td_ring_sizes;
2540  if (!is_null_geo) {
2541  for (auto ring_size : ring_sizes) {
2542  TDatum td_ring_size;
2543  td_ring_size.val.int_val = ring_size;
2544  td_ring_sizes.push_back(td_ring_size);
2545  }
2546  }
2547  TDatum tdd_ring_sizes;
2548  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2549  tdd_ring_sizes.is_null = is_null_geo;
2550  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2551  ++col_idx;
2552  }
2553 
2554  if (col_type == kMULTIPOLYGON) {
2555  // Create poly_rings array value and add it to the physical column
2556  ++cd_it;
2557  auto cd_poly_rings = *cd_it;
2558  std::vector<TDatum> td_poly_rings;
2559  if (!is_null_geo) {
2560  for (auto num_rings : poly_rings) {
2561  TDatum td_num_rings;
2562  td_num_rings.val.int_val = num_rings;
2563  td_poly_rings.push_back(td_num_rings);
2564  }
2565  }
2566  TDatum tdd_poly_rings;
2567  tdd_poly_rings.val.arr_val = td_poly_rings;
2568  tdd_poly_rings.is_null = is_null_geo;
2569  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2570  ++col_idx;
2571  }
2572 
2573  if (col_type == kLINESTRING || col_type == kMULTILINESTRING ||
2574  col_type == kPOLYGON || col_type == kMULTIPOLYGON ||
2575  col_type == kMULTIPOINT) {
2576  // Create bounds array value and add it to the physical column
2577  ++cd_it;
2578  auto cd_bounds = *cd_it;
2579  std::vector<TDatum> td_bounds_data;
2580  if (!is_null_geo) {
2581  for (auto b : bounds) {
2582  TDatum td_double;
2583  td_double.val.real_val = b;
2584  td_bounds_data.push_back(td_double);
2585  }
2586  }
2587  TDatum tdd_bounds;
2588  tdd_bounds.val.arr_val = td_bounds_data;
2589  tdd_bounds.is_null = is_null_geo;
2590  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2591  ++col_idx;
2592  }
2593 
2594  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2595  // Create render_group value and add it to the physical column
2596  ++cd_it;
2597  auto cd_render_group = *cd_it;
2598  TDatum td_render_group;
2599  td_render_group.val.int_val = render_group;
2600  td_render_group.is_null = is_null_geo;
2601  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2602  ++col_idx;
2603  }
2604  } else if (field_column_count < fieldNameToIndexMap.size()) {
2605  //
2606  // field column
2607  //
2608  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2609  CHECK(cit != columnNameToSourceNameMap.end());
2610  auto const& field_name = cit->second;
2611 
2612  auto const fit = fieldNameToIndexMap.find(field_name);
2613  if (fit == fieldNameToIndexMap.end()) {
2614  throw ColumnNotGeoError(cd->columnName);
2615  }
2616 
2617  auto const& field_index = fit->second;
2618  CHECK(field_index < fieldNameToIndexMap.size());
2619 
2620  auto const& feature = features[iFeature];
2621 
2622  auto field_defn = feature->GetFieldDefnRef(field_index);
2623  CHECK(field_defn);
2624 
2625  // OGRFeature::GetFieldAsString() can only return 80 characters
2626  // so for array columns, we are obliged to fetch the actual values
2627  // and construct the concatenated string ourselves
2628 
2629  std::string value_string;
2630  int array_index = 0, array_size = 0;
2631 
2632  auto stringify_numeric_list = [&](auto* values) {
2633  value_string = "{";
2634  while (array_index < array_size) {
2635  auto separator = (array_index > 0) ? "," : "";
2636  value_string += separator + std::to_string(values[array_index]);
2637  array_index++;
2638  }
2639  value_string += "}";
2640  };
2641 
2642  auto field_type = field_defn->GetType();
2643  switch (field_type) {
2644  case OFTInteger:
2645  case OFTInteger64:
2646  case OFTReal:
2647  case OFTString:
2648  case OFTBinary:
2649  case OFTDate:
2650  case OFTTime:
2651  case OFTDateTime: {
2652  value_string = feature->GetFieldAsString(field_index);
2653  } break;
2654  case OFTIntegerList: {
2655  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2656  stringify_numeric_list(values);
2657  } break;
2658  case OFTInteger64List: {
2659  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2660  stringify_numeric_list(values);
2661  } break;
2662  case OFTRealList: {
2663  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2664  stringify_numeric_list(values);
2665  } break;
2666  case OFTStringList: {
2667  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2668  value_string = "{";
2669  if (array_of_strings) {
2670  while (auto* this_string = array_of_strings[array_index]) {
2671  auto separator = (array_index > 0) ? "," : "";
2672  value_string += separator + std::string(this_string);
2673  array_index++;
2674  }
2675  }
2676  value_string += "}";
2677  } break;
2678  default:
2679  throw std::runtime_error("Unsupported geo file field type (" +
2680  std::to_string(static_cast<int>(field_type)) +
2681  ")");
2682  }
2683 
2684  import_buffers[col_idx]->add_value(cd, value_string, false, copy_params);
2685  ++col_idx;
2686  field_column_count++;
2687  } else if (metadata_column_count < metadata_column_infos.size()) {
2688  //
2689  // metadata column
2690  //
2691  auto const& mci = metadata_column_infos[metadata_column_count];
2692  if (mci.column_descriptor.columnName != cd->columnName) {
2693  throw std::runtime_error("Metadata column name mismatch");
2694  }
2695  import_buffers[col_idx]->add_value(cd, mci.value, false, copy_params);
2696  ++col_idx;
2697  metadata_column_count++;
2698  } else {
2699  throw std::runtime_error("Column count mismatch");
2700  }
2701  }
2702  thread_import_status.rows_completed++;
2703  } catch (QueryExecutionError& e) {
2705  throw e;
2706  }
2707  } catch (ColumnNotGeoError& e) {
2708  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2709  throw std::runtime_error(e.what());
2710  } catch (const std::exception& e) {
2711  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2712  import_buffers[col_idx_to_pop]->pop_value();
2713  }
2714  thread_import_status.rows_rejected++;
2715  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2716  }
2717  };
2718 
2719  if (pGeometry && copy_params.geo_explode_collections) {
2720  // explode and import
2721  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2722  explode_collections_step1(col_descs);
2723  explode_collections_step2(pGeometry,
2724  collection_child_type,
2725  collection_col_name,
2726  firstFeature + iFeature + 1,
2727  execute_import_feature);
2728  } else {
2729  // import non-collection or null feature just once
2730  execute_import_feature(pGeometry);
2731  }
2732  } // end features
2733 
2734  float convert_s = TIMER_STOP(convert_timer);
2735 
2736  float load_s = 0.0f;
2737  if (thread_import_status.rows_completed > 0) {
2738  auto load_timer = timer_start();
2739  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2740  load_s = TIMER_STOP(load_timer);
2741  }
2742 
2743  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2744  LOG(INFO) << "DEBUG: Process " << convert_s << "s";
2745  LOG(INFO) << "DEBUG: Load " << load_s << "s";
2746  LOG(INFO) << "DEBUG: Total " << (convert_s + load_s) << "s";
2747  }
2748 
2749  thread_import_status.thread_id = thread_id;
2750 
2751  return thread_import_status;
2752 }
2753 
2755  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2756  size_t row_count,
2757  const Catalog_Namespace::SessionInfo* session_info) {
2758  return loadImpl(import_buffers, row_count, false, session_info);
2759 }
2760 
2761 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2762  size_t row_count,
2763  const Catalog_Namespace::SessionInfo* session_info) {
2764  return loadImpl(import_buffers, row_count, true, session_info);
2765 }
2766 
2767 namespace {
2768 
2769 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2770  const auto& ti = import_buffer.getTypeInfo();
2771  const int8_t* values_buffer{nullptr};
2772  if (ti.is_string()) {
2773  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2774  values_buffer = import_buffer.getStringDictBuffer();
2775  } else {
2776  values_buffer = import_buffer.getAsBytes();
2777  }
2778  CHECK(values_buffer);
2779  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2780  switch (logical_size) {
2781  case 1: {
2782  return values_buffer[index];
2783  }
2784  case 2: {
2785  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2786  }
2787  case 4: {
2788  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2789  }
2790  case 8: {
2791  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2792  }
2793  default:
2794  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2795  }
2796  UNREACHABLE();
2797  return 0;
2798 }
2799 
2800 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2801  const auto& ti = import_buffer.getTypeInfo();
2802  CHECK_EQ(kFLOAT, ti.get_type());
2803  const auto values_buffer = import_buffer.getAsBytes();
2804  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2805 }
2806 
2807 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2808  const auto& ti = import_buffer.getTypeInfo();
2809  CHECK_EQ(kDOUBLE, ti.get_type());
2810  const auto values_buffer = import_buffer.getAsBytes();
2811  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2812 }
2813 
2814 } // namespace
2815 
2816 void Loader::fillShardRow(const size_t row_index,
2817  OneShardBuffers& shard_output_buffers,
2818  const OneShardBuffers& import_buffers) {
2819  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2820  const auto& input_buffer = import_buffers[col_idx];
2821  const auto& col_ti = input_buffer->getTypeInfo();
2822  const auto type =
2823  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2824 
2825  switch (type) {
2826  case kBOOLEAN:
2827  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2828  break;
2829  case kTINYINT:
2830  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2831  break;
2832  case kSMALLINT:
2833  shard_output_buffers[col_idx]->addSmallint(
2834  int_value_at(*input_buffer, row_index));
2835  break;
2836  case kINT:
2837  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2838  break;
2839  case kBIGINT:
2840  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2841  break;
2842  case kFLOAT:
2843  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2844  break;
2845  case kDOUBLE:
2846  shard_output_buffers[col_idx]->addDouble(
2847  double_value_at(*input_buffer, row_index));
2848  break;
2849  case kTEXT:
2850  case kVARCHAR:
2851  case kCHAR: {
2852  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2853  shard_output_buffers[col_idx]->addString(
2854  (*input_buffer->getStringBuffer())[row_index]);
2855  break;
2856  }
2857  case kTIME:
2858  case kTIMESTAMP:
2859  case kDATE:
2860  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2861  break;
2862  case kARRAY:
2863  if (IS_STRING(col_ti.get_subtype())) {
2864  CHECK(input_buffer->getStringArrayBuffer());
2865  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2866  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2867  shard_output_buffers[col_idx]->addStringArray(input_arr);
2868  } else {
2869  shard_output_buffers[col_idx]->addArray(
2870  (*input_buffer->getArrayBuffer())[row_index]);
2871  }
2872  break;
2873  case kPOINT:
2874  case kMULTIPOINT:
2875  case kLINESTRING:
2876  case kMULTILINESTRING:
2877  case kPOLYGON:
2878  case kMULTIPOLYGON: {
2879  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2880  shard_output_buffers[col_idx]->addGeoString(
2881  (*input_buffer->getGeoStringBuffer())[row_index]);
2882  break;
2883  }
2884  default:
2885  CHECK(false);
2886  }
2887  }
2888 }
2889 
2891  std::vector<OneShardBuffers>& all_shard_import_buffers,
2892  std::vector<size_t>& all_shard_row_counts,
2893  const OneShardBuffers& import_buffers,
2894  const size_t row_count,
2895  const size_t shard_count,
2896  const Catalog_Namespace::SessionInfo* session_info) {
2897  int col_idx{0};
2898  const ColumnDescriptor* shard_col_desc{nullptr};
2899  for (const auto col_desc : column_descs_) {
2900  ++col_idx;
2901  if (col_idx == table_desc_->shardedColumnId) {
2902  shard_col_desc = col_desc;
2903  break;
2904  }
2905  }
2906  CHECK(shard_col_desc);
2907  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2908  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2909  const auto& shard_col_ti = shard_col_desc->columnType;
2910  CHECK(shard_col_ti.is_integer() ||
2911  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2912  shard_col_ti.is_time());
2913  if (shard_col_ti.is_string()) {
2914  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2915  CHECK(payloads_ptr);
2916  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2917  }
2918 
2919  for (size_t i = 0; i < row_count; ++i) {
2920  const size_t shard =
2921  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2922  auto& shard_output_buffers = all_shard_import_buffers[shard];
2923  fillShardRow(i, shard_output_buffers, import_buffers);
2924  ++all_shard_row_counts[shard];
2925  }
2926 }
2927 
2929  std::vector<OneShardBuffers>& all_shard_import_buffers,
2930  std::vector<size_t>& all_shard_row_counts,
2931  const OneShardBuffers& import_buffers,
2932  const size_t row_count,
2933  const size_t shard_count,
2934  const Catalog_Namespace::SessionInfo* session_info) {
2935  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2936  CHECK(shard_tds.size() == shard_count);
2937 
2938  for (size_t shard = 0; shard < shard_count; ++shard) {
2939  auto& shard_output_buffers = all_shard_import_buffers[shard];
2940  if (row_count != 0) {
2941  fillShardRow(0, shard_output_buffers, import_buffers);
2942  }
2943  // when replicating a column, row count of a shard == replicate count of the column
2944  // on the shard
2945  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2946  }
2947 }
2948 
2949 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2950  std::vector<size_t>& all_shard_row_counts,
2951  const OneShardBuffers& import_buffers,
2952  const size_t row_count,
2953  const size_t shard_count,
2954  const Catalog_Namespace::SessionInfo* session_info) {
2955  all_shard_row_counts.resize(shard_count);
2956  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2957  all_shard_import_buffers.emplace_back();
2958  for (const auto& typed_import_buffer : import_buffers) {
2959  all_shard_import_buffers.back().emplace_back(
2960  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2961  typed_import_buffer->getStringDictionary()));
2962  }
2963  }
2965  if (isAddingColumns()) {
2966  distributeToShardsNewColumns(all_shard_import_buffers,
2967  all_shard_row_counts,
2968  import_buffers,
2969  row_count,
2970  shard_count,
2971  session_info);
2972  } else {
2973  distributeToShardsExistingColumns(all_shard_import_buffers,
2974  all_shard_row_counts,
2975  import_buffers,
2976  row_count,
2977  shard_count,
2978  session_info);
2979  }
2980 }
2981 
2983  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2984  size_t row_count,
2985  bool checkpoint,
2986  const Catalog_Namespace::SessionInfo* session_info) {
2987  if (load_callback_) {
2988  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2989  return load_callback_(import_buffers, data_blocks, row_count);
2990  }
2991  if (table_desc_->nShards) {
2992  std::vector<OneShardBuffers> all_shard_import_buffers;
2993  std::vector<size_t> all_shard_row_counts;
2994  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2995  distributeToShards(all_shard_import_buffers,
2996  all_shard_row_counts,
2997  import_buffers,
2998  row_count,
2999  shard_tables.size(),
3000  session_info);
3001  bool success = true;
3002  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3003  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3004  all_shard_row_counts[shard_idx],
3005  shard_tables[shard_idx],
3006  checkpoint,
3007  session_info);
3008  }
3009  return success;
3010  }
3011  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3012 }
3013 
3015  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3016  std::vector<DataBlockPtr> result(import_buffers.size());
3017  std::vector<std::pair<const size_t, std::future<int8_t*>>>
3018  encoded_data_block_ptrs_futures;
3019  // make all async calls to string dictionary here and then continue execution
3020  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3021  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
3022  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
3023  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3024  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
3025 
3026  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3027  buf_idx,
3028  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
3029  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3030  return import_buffers[buf_idx]->getStringDictBuffer();
3031  })));
3032  }
3033  }
3034 
3035  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3036  DataBlockPtr p;
3037  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
3038  import_buffers[buf_idx]->getTypeInfo().is_time() ||
3039  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
3040  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
3041  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
3042  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3043  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
3044  p.stringsPtr = string_payload_ptr;
3045  } else {
3046  // This condition means we have column which is ENCODED string. We already made
3047  // Async request to gain the encoded integer values above so we should skip this
3048  // iteration and continue.
3049  continue;
3050  }
3051  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
3052  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
3053  p.stringsPtr = geo_payload_ptr;
3054  } else {
3055  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
3056  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
3057  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
3058  import_buffers[buf_idx]->addDictEncodedStringArray(
3059  *import_buffers[buf_idx]->getStringArrayBuffer());
3060  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
3061  } else {
3062  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
3063  }
3064  }
3065  result[buf_idx] = p;
3066  }
3067 
3068  // wait for the async requests we made for string dictionary
3069  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3070  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3071  }
3072  return result;
3073 }
3074 
3076  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3077  size_t row_count,
3078  const TableDescriptor* shard_table,
3079  bool checkpoint,
3080  const Catalog_Namespace::SessionInfo* session_info) {
3081  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3083  ins_data.numRows = row_count;
3084  bool success = false;
3085  try {
3086  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3087  } catch (std::exception& e) {
3088  std::ostringstream oss;
3089  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3090  << e.what();
3091 
3092  LOG(ERROR) << oss.str();
3093  error_msg_ = oss.str();
3094  return success;
3095  }
3096  if (isAddingColumns()) {
3097  // when Adding columns we omit any columns except the ones being added
3098  ins_data.columnIds.clear();
3099  ins_data.is_default.clear();
3100  for (auto& buffer : import_buffers) {
3101  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3102  ins_data.is_default.push_back(true);
3103  }
3104  } else {
3105  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3106  }
3107  // release loader_lock so that in InsertOrderFragmenter::insertDat
3108  // we can have multiple threads sort/shuffle InsertData
3109  loader_lock.unlock();
3110  success = true;
3111  {
3112  try {
3113  if (checkpoint) {
3114  shard_table->fragmenter->insertData(ins_data);
3115  } else {
3116  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3117  }
3118  } catch (std::exception& e) {
3119  std::ostringstream oss;
3120  oss << "Fragmenter Insert Exception when processing Table "
3121  << shard_table->tableName << " issue was " << e.what();
3122 
3123  LOG(ERROR) << oss.str();
3124  loader_lock.lock();
3125  error_msg_ = oss.str();
3126  success = false;
3127  }
3128  }
3129  return success;
3130 }
3131 
3132 void Loader::dropColumns(const std::vector<int>& columnIds) {
3133  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3134  if (table_desc_->nShards) {
3136  }
3137  for (auto table_desc : table_descs) {
3138  table_desc->fragmenter->dropColumns(columnIds);
3139  }
3140 }
3141 
3145  for (auto cd : column_descs_) {
3146  insert_data_.columnIds.push_back(cd->columnId);
3147  if (cd->columnType.get_compression() == kENCODING_DICT) {
3148  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3149  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
3150  CHECK(dd);
3151  dict_map_[cd->columnId] = dd->stringDict.get();
3152  }
3153  }
3154  insert_data_.numRows = 0;
3155 }
3156 
3159  split_raw_data();
3161 }
3162 
3164  const std::string& file_path,
3165  const bool decompressed,
3166  const Catalog_Namespace::SessionInfo* session_info) {
3167  // we do not check interrupt status for this detection
3168  if (!p_file) {
3169  p_file = fopen(file_path.c_str(), "rb");
3170  }
3171  if (!p_file) {
3172  throw std::runtime_error("failed to open file '" + file_path +
3173  "': " + strerror(errno));
3174  }
3175 
3176  // somehow clang does not support ext/stdio_filebuf.h, so
3177  // need to diy readline with customized copy_params.line_delim...
3178  std::string line;
3179  line.reserve(1 * 1024 * 1024);
3180  auto end_time = std::chrono::steady_clock::now() +
3181  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3182  try {
3183  while (!feof(p_file)) {
3184  int c;
3185  size_t n = 0;
3186  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3187  if (n++ >= line.capacity()) {
3188  break;
3189  }
3190  line += c;
3191  }
3192  if (0 == n) {
3193  break;
3194  }
3195  // remember the first line, which is possibly a header line, to
3196  // ignore identical header line(s) in 2nd+ files of a archive;
3197  // otherwise, 2nd+ header may be mistaken as an all-string row
3198  // and so be final column types.
3199  if (line1.empty()) {
3200  line1 = line;
3201  } else if (line == line1) {
3202  line.clear();
3203  continue;
3204  }
3205 
3206  raw_data += line;
3208  line.clear();
3210  if (std::chrono::steady_clock::now() > end_time) {
3212  // stop import when row limit reached
3213  break;
3214  }
3215  }
3216  }
3217  } catch (std::exception& e) {
3218  }
3219 
3221  import_status_.load_failed = true;
3222 
3223  fclose(p_file);
3224  p_file = nullptr;
3225  return import_status_;
3226 }
3227 
3229  // this becomes analogous to Importer::import()
3230  (void)DataStreamSink::archivePlumber(nullptr);
3231 }
3232 
3234  if (copy_params.delimiter == '\0') {
3235  copy_params.delimiter = ',';
3236  if (boost::filesystem::extension(file_path) == ".tsv") {
3237  copy_params.delimiter = '\t';
3238  }
3239  }
3240 }
3241 
3243  const char* buf = raw_data.c_str();
3244  const char* buf_end = buf + raw_data.size();
3245  bool try_single_thread = false;
3246  for (const char* p = buf; p < buf_end; p++) {
3247  std::vector<std::string> row;
3248  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3250  buf_end,
3251  buf_end,
3252  copy_params,
3253  nullptr,
3254  row,
3255  tmp_buffers,
3256  try_single_thread,
3257  true);
3258  raw_rows.push_back(row);
3259  if (try_single_thread) {
3260  break;
3261  }
3262  }
3263  if (try_single_thread) {
3264  copy_params.threads = 1;
3265  raw_rows.clear();
3266  for (const char* p = buf; p < buf_end; p++) {
3267  std::vector<std::string> row;
3268  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3270  buf_end,
3271  buf_end,
3272  copy_params,
3273  nullptr,
3274  row,
3275  tmp_buffers,
3276  try_single_thread,
3277  true);
3278  raw_rows.push_back(row);
3279  }
3280  }
3281 }
3282 
3283 template <class T>
3284 bool try_cast(const std::string& str) {
3285  try {
3286  boost::lexical_cast<T>(str);
3287  } catch (const boost::bad_lexical_cast& e) {
3288  return false;
3289  }
3290  return true;
3291 }
3292 
3293 SQLTypes Detector::detect_sqltype(const std::string& str) {
3294  SQLTypes type = kTEXT;
3295  if (try_cast<double>(str)) {
3296  type = kDOUBLE;
3297  /*if (try_cast<bool>(str)) {
3298  type = kBOOLEAN;
3299  }*/
3300  if (try_cast<int16_t>(str)) {
3301  type = kSMALLINT;
3302  } else if (try_cast<int32_t>(str)) {
3303  type = kINT;
3304  } else if (try_cast<int64_t>(str)) {
3305  type = kBIGINT;
3306  } else if (try_cast<float>(str)) {
3307  type = kFLOAT;
3308  }
3309  }
3310 
3311  // check for geo types
3312  if (type == kTEXT) {
3313  // convert to upper case
3314  std::string str_upper_case = str;
3316  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3317 
3318  // then test for leading words
3319  if (str_upper_case.find("POINT") == 0) {
3320  type = kPOINT;
3321  } else if (str_upper_case.find("MULTIPOINT") == 0) {
3322  type = kMULTIPOINT;
3323  } else if (str_upper_case.find("LINESTRING") == 0) {
3324  type = kLINESTRING;
3325  } else if (str_upper_case.find("MULTILINESTRING") == 0) {
3326  type = kMULTILINESTRING;
3327  } else if (str_upper_case.find("POLYGON") == 0) {
3329  type = kMULTIPOLYGON;
3330  } else {
3331  type = kPOLYGON;
3332  }
3333  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3334  type = kMULTIPOLYGON;
3335  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3336  std::string::npos &&
3337  (str_upper_case.size() % 2) == 0) {
3338  // simple hex blob (two characters per byte, not uu-encode or base64)
3339  if (str_upper_case.size() >= 10) {
3340  // match WKB blobs for supported geometry types
3341  // the first byte specifies if the data is big-endian or little-endian
3342  // the next four bytes are the geometry type (1 = POINT etc.)
3343  // @TODO support eWKB, which has extra bits set in the geometry type
3344  auto first_five_bytes = str_upper_case.substr(0, 10);
3345  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3346  type = kPOINT;
3347  } else if (first_five_bytes == "0000000004" || first_five_bytes == "0104000000") {
3348  type = kMULTIPOINT;
3349  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3350  type = kLINESTRING;
3351  } else if (first_five_bytes == "0000000005" || first_five_bytes == "0105000000") {
3352  type = kMULTILINESTRING;
3353  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3354  type = kPOLYGON;
3355  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3356  type = kMULTIPOLYGON;
3357  } else {
3358  // unsupported WKB type
3359  return type;
3360  }
3361  } else {
3362  // too short to be WKB
3363  return type;
3364  }
3365  }
3366  }
3367 
3368  // check for time types
3369  if (type == kTEXT) {
3370  // This won't match unix timestamp, since floats and ints were checked above.
3371  if (dateTimeParseOptional<kTIME>(str, 0)) {
3372  type = kTIME;
3373  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3374  type = kTIMESTAMP;
3375  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3376  type = kDATE;
3377  }
3378  }
3379 
3380  return type;
3381 }
3382 
3383 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3384  std::vector<SQLTypes> types(row.size());
3385  for (size_t i = 0; i < row.size(); i++) {
3386  types[i] = detect_sqltype(row[i]);
3387  }
3388  return types;
3389 }
3390 
3392  static std::array<int, kSQLTYPE_LAST> typeorder;
3393  typeorder[kCHAR] = 0;
3394  typeorder[kBOOLEAN] = 2;
3395  typeorder[kSMALLINT] = 3;
3396  typeorder[kINT] = 4;
3397  typeorder[kBIGINT] = 5;
3398  typeorder[kFLOAT] = 6;
3399  typeorder[kDOUBLE] = 7;
3400  typeorder[kTIMESTAMP] = 8;
3401  typeorder[kTIME] = 9;
3402  typeorder[kDATE] = 10;
3403  typeorder[kPOINT] = 11;
3404  typeorder[kMULTIPOINT] = 11;
3405  typeorder[kLINESTRING] = 11;
3406  typeorder[kMULTILINESTRING] = 11;
3407  typeorder[kPOLYGON] = 11;
3408  typeorder[kMULTIPOLYGON] = 11;
3409  typeorder[kTEXT] = 12;
3410 
3411  // note: b < a instead of a < b because the map is ordered most to least restrictive
3412  return typeorder[b] < typeorder[a];
3413 }
3414 
3417  best_encodings =
3418  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3419  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3420  switch (copy_params.has_header) {
3422  has_headers = detect_headers(head_types, best_sqltypes);
3423  if (has_headers) {
3425  } else {
3427  }
3428  break;
3430  has_headers = false;
3431  break;
3433  has_headers = true;
3434  break;
3435  }
3436 }
3437 
3440 }
3441 
3442 std::vector<SQLTypes> Detector::find_best_sqltypes(
3443  const std::vector<std::vector<std::string>>& raw_rows,
3444  const CopyParams& copy_params) {
3445  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3446 }
3447 
3448 std::vector<SQLTypes> Detector::find_best_sqltypes(
3449  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3450  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3451  const CopyParams& copy_params) {
3452  if (raw_rows.size() < 1) {
3453  throw std::runtime_error("No rows found in: " +
3454  boost::filesystem::basename(file_path));
3455  }
3456  auto end_time = std::chrono::steady_clock::now() + timeout;
3457  size_t num_cols = raw_rows.front().size();
3458  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3459  std::vector<size_t> non_null_col_counts(num_cols, 0);
3460  for (auto row = row_begin; row != row_end; row++) {
3461  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3462  best_types.push_back(kCHAR);
3463  non_null_col_counts.push_back(0);
3464  }
3465  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3466  // do not count nulls
3467  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3468  continue;
3469  }
3470  SQLTypes t = detect_sqltype(row->at(col_idx));
3471  non_null_col_counts[col_idx]++;
3472  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3473  best_types[col_idx] = t;
3474  }
3475  }
3476  if (std::chrono::steady_clock::now() > end_time) {
3477  break;
3478  }
3479  }
3480  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3481  // if we don't have any non-null values for this column make it text to be
3482  // safe b/c that is least restrictive type
3483  if (non_null_col_counts[col_idx] == 0) {
3484  best_types[col_idx] = kTEXT;
3485  }
3486  }
3487 
3488  return best_types;
3489 }
3490 
3491 std::vector<EncodingType> Detector::find_best_encodings(
3492  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3493  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3494  const std::vector<SQLTypes>& best_types) {
3495  if (raw_rows.size() < 1) {
3496  throw std::runtime_error("No rows found in: " +
3497  boost::filesystem::basename(file_path));
3498  }
3499  size_t num_cols = best_types.size();
3500  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3501  std::vector<size_t> num_rows_per_col(num_cols, 1);
3502  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3503  for (auto row = row_begin; row != row_end; row++) {
3504  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3505  if (IS_STRING(best_types[col_idx])) {
3506  count_set[col_idx].insert(row->at(col_idx));
3507  num_rows_per_col[col_idx]++;
3508  }
3509  }
3510  }
3511  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3512  if (IS_STRING(best_types[col_idx])) {
3513  float uniqueRatio =
3514  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3515  if (uniqueRatio < 0.75) {
3516  best_encodes[col_idx] = kENCODING_DICT;
3517  }
3518  }
3519  }
3520  return best_encodes;
3521 }
3522 
3523 // detect_headers returns true if:
3524 // - all elements of the first argument are kTEXT
3525 // - there is at least one instance where tail_types is more restrictive than head_types
3526 // (ie, not kTEXT)
3527 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3528  const std::vector<SQLTypes>& tail_types) {
3529  if (head_types.size() != tail_types.size()) {
3530  return false;
3531  }
3532  bool has_headers = false;
3533  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3534  if (head_types[col_idx] != kTEXT) {
3535  return false;
3536  }
3537  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3538  }
3539  return has_headers;
3540 }
3541 
3542 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3543 #if defined(ENABLE_IMPORT_PARQUET)
3544  if (data_preview_.has_value()) {
3545  return data_preview_.value().sample_rows;
3546  } else
3547 #endif
3548  {
3549  n = std::min(n, raw_rows.size());
3550  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3551  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3552  raw_rows.begin() + n);
3553  return sample_rows;
3554  }
3555 }
3556 
3557 std::vector<std::string> Detector::get_headers() {
3558 #if defined(ENABLE_IMPORT_PARQUET)
3559  if (data_preview_.has_value()) {
3560  return data_preview_.value().column_names;
3561  } else
3562 #endif
3563  {
3564  std::vector<std::string> headers(best_sqltypes.size());
3565  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3566  if (has_headers && i < raw_rows[0].size()) {
3567  headers[i] = raw_rows[0][i];
3568  } else {
3569  headers[i] = "column_" + std::to_string(i + 1);
3570  }
3571  }
3572  return headers;
3573  }
3574 }
3575 
3576 std::vector<SQLTypeInfo> Detector::getBestColumnTypes() const {
3577 #if defined(ENABLE_IMPORT_PARQUET)
3578  if (data_preview_.has_value()) {
3579  return data_preview_.value().column_types;
3580  } else
3581 #endif
3582  {
3583  std::vector<SQLTypeInfo> types;
3584  CHECK_EQ(best_sqltypes.size(), best_encodings.size());
3585  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3586  types.emplace_back(best_sqltypes[i], false, best_encodings[i]);
3587  }
3588  return types;
3589  }
3590 }
3591 
3592 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3593  size_t row_count,
3594  const Catalog_Namespace::SessionInfo* session_info) {
3595  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3597  import_status_.load_failed = true;
3598  import_status_.load_msg = loader->getErrorMessage();
3599  }
3600 }
3601 
3603  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3604  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3607  // rollback to starting epoch - undo all the added records
3608  loader->setTableEpochs(table_epochs);
3609  } else {
3610  loader->checkpoint();
3611  }
3612  }
3613 
3614  if (loader->getTableDesc()->persistenceLevel ==
3615  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3616  // tables
3617  auto ms = measure<>::execution([&]() {
3619  if (!import_status_.load_failed) {
3620  for (auto& p : import_buffers_vec[0]) {
3621  if (!p->stringDictCheckpoint()) {
3622  LOG(ERROR) << "Checkpointing Dictionary for Column "
3623  << p->getColumnDesc()->columnName << " failed.";
3624  import_status_.load_failed = true;
3625  import_status_.load_msg = "Dictionary checkpoint failed";
3626  break;
3627  }
3628  }
3629  }
3630  });
3631  if (DEBUG_TIMING) {
3632  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3633  << std::endl;
3634  }
3635  }
3636 }
3637 
3639  const Catalog_Namespace::SessionInfo* session_info) {
3640  // in generalized importing scheme, reaching here file_path may
3641  // contain a file path, a url or a wildcard of file paths.
3642  // see CopyTableStmt::execute.
3643 
3644  std::vector<std::string> file_paths;
3645  try {
3650  file_paths = shared::local_glob_filter_sort_files(file_path, options);
3651  } catch (const shared::FileNotFoundException& e) {
3652  // After finding no matching files locally, file_path may still be an s3 url
3653  file_paths.push_back(file_path);
3654  }
3655 
3656  // sum up sizes of all local files -- only for local files. if
3657  // file_path is a s3 url, sizes will be obtained via S3Archive.
3658  for (const auto& file_path : file_paths) {
3660  }
3661 
3662  // s3 parquet goes different route because the files do not use libarchive
3663  // but parquet api, and they need to landed like .7z files.
3664  //
3665  // note: parquet must be explicitly specified by a WITH parameter
3666  // "source_type='parquet_file'", because for example spark sql users may specify a
3667  // output url w/o file extension like this:
3668  // df.write
3669  // .mode("overwrite")
3670  // .parquet("s3://bucket/folder/parquet/mydata")
3671  // without the parameter, it means plain or compressed csv files.
3672  // note: .ORC and AVRO files should follow a similar path to Parquet?
3674 #ifdef ENABLE_IMPORT_PARQUET
3675  import_parquet(file_paths, session_info);
3676 #else
3677  throw std::runtime_error("Parquet not supported!");
3678 #endif
3679  } else {
3680  import_compressed(file_paths, session_info);
3681  }
3682 
3683  return import_status_;
3684 }
3685 
3686 namespace {
3687 #ifdef ENABLE_IMPORT_PARQUET
3688 
3689 #ifdef HAVE_AWS_S3
3690 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3691 create_parquet_s3_detect_filesystem_config(const foreign_storage::ForeignServer* server,
3692  const CopyParams& copy_params) {
3693  foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3694  config;
3695 
3696  if (!copy_params.s3_access_key.empty()) {
3697  config.s3_access_key = copy_params.s3_access_key;
3698  }
3699  if (!copy_params.s3_secret_key.empty()) {
3700  config.s3_secret_key = copy_params.s3_secret_key;
3701  }
3702  if (!copy_params.s3_session_token.empty()) {
3703  config.s3_session_token = copy_params.s3_session_token;
3704  }
3705 
3706  return config;
3707 }
3708 #endif
3709 
3710 foreign_storage::DataPreview get_parquet_data_preview(const std::string& file_name,
3711  const CopyParams& copy_params) {
3712  TableDescriptor td;
3713  td.tableName = "parquet-detect-table";
3714  td.tableId = -1;
3716  auto [foreign_server, user_mapping, foreign_table] =
3717  foreign_storage::create_proxy_fsi_objects(file_name, copy_params, &td);
3718 
3719  std::shared_ptr<arrow::fs::FileSystem> file_system;
3720  auto& server_options = foreign_server->options;
3721  if (server_options
3723  ->second ==
3725  file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3726 #ifdef HAVE_AWS_S3
3727  } else if (server_options
3729  ->second ==
3732  create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3733 #endif
3734  } else {
3735  UNREACHABLE();
3736  }
3737 
3738  auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3739  foreign_table.get(), file_system);
3740  return parquet_data_wrapper->getDataPreview(g_detect_test_sample_size.has_value()
3741  ? g_detect_test_sample_size.value()
3743 }
3744 #endif
3745 
3746 } // namespace
3747 
3748 Detector::Detector(const boost::filesystem::path& fp, CopyParams& cp)
3749  : DataStreamSink(cp, fp.string()), file_path(fp) {
3750 #ifdef ENABLE_IMPORT_PARQUET
3752  !g_enable_legacy_parquet_import) {
3753  data_preview_ = get_parquet_data_preview(fp.string(), cp);
3754  } else
3755 #endif
3756  {
3757  read_file();
3758  init();
3759  }
3760 }
3761 
3762 #ifdef ENABLE_IMPORT_PARQUET
3763 inline auto open_parquet_table(const std::string& file_path,
3764  std::shared_ptr<arrow::io::ReadableFile>& infile,
3765  std::unique_ptr<parquet::arrow::FileReader>& reader,
3766  std::shared_ptr<arrow::Table>& table) {
3767  using namespace parquet::arrow;
3768  auto file_result = arrow::io::ReadableFile::Open(file_path);
3769  PARQUET_THROW_NOT_OK(file_result.status());
3770  infile = file_result.ValueOrDie();
3771 
3772  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3773  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3774  const auto num_row_groups = reader->num_row_groups();
3775  const auto num_columns = table->num_columns();
3776  const auto num_rows = table->num_rows();
3777  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3778  << " columns in " << num_row_groups << " groups.";
3779  return std::make_tuple(num_row_groups, num_columns, num_rows);
3780 } // namespace import_export
3781 
3782 void Detector::import_local_parquet(const std::string& file_path,
3783  const Catalog_Namespace::SessionInfo* session_info) {
3784  /*Skip interrupt checking in detector*/
3785  std::shared_ptr<arrow::io::ReadableFile> infile;
3786  std::unique_ptr<parquet::arrow::FileReader> reader;
3787  std::shared_ptr<arrow::Table> table;
3788  int num_row_groups, num_columns;
3789  int64_t num_rows;
3790  std::tie(num_row_groups, num_columns, num_rows) =
3791  open_parquet_table(file_path, infile, reader, table);
3792  // make up header line if not yet
3793  if (0 == raw_data.size()) {
3794  copy_params.has_header = ImportHeaderRow::kHasHeader;
3795  copy_params.line_delim = '\n';
3796  copy_params.delimiter = ',';
3797  // must quote values to skip any embedded delimiter
3798  copy_params.quoted = true;
3799  copy_params.quote = '"';
3800  copy_params.escape = '"';
3801  for (int c = 0; c < num_columns; ++c) {
3802  if (c) {
3803  raw_data += copy_params.delimiter;
3804  }
3805  raw_data += table->ColumnNames().at(c);
3806  }
3807  raw_data += copy_params.line_delim;
3808  }
3809  // make up raw data... rowwize...
3810  const ColumnDescriptor cd;
3811  for (int g = 0; g < num_row_groups; ++g) {
3812  // data is columnwise
3813  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3814  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3815  arrays.resize(num_columns);
3816  for (int c = 0; c < num_columns; ++c) {
3817  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3818  for (auto chunk : arrays[c]->chunks()) {
3819  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3820  }
3821  }
3822  for (int r = 0; r < num_rows; ++r) {
3823  for (int c = 0; c < num_columns; ++c) {
3824  std::vector<std::string> buffer;
3825  for (auto chunk : arrays[c]->chunks()) {
3826  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3827  if (c) {
3828  raw_data += copy_params.delimiter;
3829  }
3830  if (!chunk->IsNull(r)) {
3831  raw_data += copy_params.quote;
3832  raw_data += boost::replace_all_copy(
3833  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3834  raw_data += copy_params.quote;
3835  }
3836  }
3837  }
3838  raw_data += copy_params.line_delim;
3840  if (++import_status_.rows_completed >= 10000) {
3841  // as if load truncated
3842  import_status_.load_failed = true;
3843  import_status_.load_msg = "Detector processed 10000 records";
3844  return;
3845  }
3846  }
3847  }
3848 }
3849 
3850 template <typename DATA_TYPE>
3852  std::vector<DATA_TYPE>& buffer,
3853  import_export::BadRowsTracker* const bad_rows_tracker) {
3854  const auto old_size = buffer.size();
3855  // erase backward to minimize memory movement overhead
3856  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3857  ++rit) {
3858  buffer.erase(buffer.begin() + *rit);
3859  }
3860  return std::make_tuple(old_size, buffer.size());
3861 }
3862 
3864  BadRowsTracker* const bad_rows_tracker) {
3865  switch (type) {
3866  case kBOOLEAN:
3867  return del_values(*bool_buffer_, bad_rows_tracker);
3868  case kTINYINT:
3869  return del_values(*tinyint_buffer_, bad_rows_tracker);
3870  case kSMALLINT:
3871  return del_values(*smallint_buffer_, bad_rows_tracker);
3872  case kINT:
3873  return del_values(*int_buffer_, bad_rows_tracker);
3874  case kBIGINT:
3875  case kNUMERIC:
3876  case kDECIMAL:
3877  case kTIME:
3878  case kTIMESTAMP:
3879  case kDATE:
3880  return del_values(*bigint_buffer_, bad_rows_tracker);
3881  case kFLOAT:
3882  return del_values(*float_buffer_, bad_rows_tracker);
3883  case kDOUBLE:
3884  return del_values(*double_buffer_, bad_rows_tracker);
3885  case kTEXT:
3886  case kVARCHAR:
3887  case kCHAR:
3888  return del_values(*string_buffer_, bad_rows_tracker);
3889  case kPOINT:
3890  case kMULTIPOINT:
3891  case kLINESTRING:
3892  case kMULTILINESTRING:
3893  case kPOLYGON:
3894  case kMULTIPOLYGON:
3895  return del_values(*geo_string_buffer_, bad_rows_tracker);
3896  case kARRAY:
3897  return del_values(*array_buffer_, bad_rows_tracker);
3898  default:
3899  throw std::runtime_error("Invalid Type");
3900  }
3901 }
3902 
3903 void Importer::import_local_parquet(const std::string& file_path,
3904  const Catalog_Namespace::SessionInfo* session_info) {
3905  std::shared_ptr<arrow::io::ReadableFile> infile;
3906  std::unique_ptr<parquet::arrow::FileReader> reader;
3907  std::shared_ptr<arrow::Table> table;
3908  int num_row_groups, num_columns;
3909  int64_t nrow_in_file;
3910  std::tie(num_row_groups, num_columns, nrow_in_file) =
3911  open_parquet_table(file_path, infile, reader, table);
3912  // column_list has no $deleted
3913  const auto& column_list = get_column_descs();
3914  // for now geo columns expect a wkt or wkb hex string
3915  std::vector<const ColumnDescriptor*> cds;
3916  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3917  int num_physical_cols = 0;
3918  for (auto& cd : column_list) {
3919  cds.push_back(cd);
3920  num_physical_cols += cd->columnType.get_physical_cols();
3921  }
3922  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3923  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3924  std::to_string(num_columns) + " columns in file vs " +
3925  std::to_string(column_list.size() - num_physical_cols) +
3926  " columns in table.");
3927  // slice each group to import slower columns faster, eg. geo or string
3928  max_threads = num_import_threads(copy_params.threads);
3929  VLOG(1) << "Parquet import # threads: " << max_threads;
3930  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3931  // init row estimate for this file
3932  const auto filesize = get_filesize(file_path);
3933  size_t nrow_completed{0};
3934  file_offsets.push_back(0);
3935  // map logic column index to physical column index
3936  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3937  int physical_col_idx = 0;
3938  for (int i = 0; i < logic_col_idx; ++i) {
3939  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3940  }
3941  return physical_col_idx;
3942  };
3943  // load a file = nested iteration of row groups, row slices and logical columns
3944  auto query_session = session_info ? session_info->get_session_id() : "";
3945  auto ms_load_a_file = measure<>::execution([&]() {
3946  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3947  {
3950  break;
3951  }
3952  }
3953  // a sliced row group will be handled like a (logic) parquet file, with
3954  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3955  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3957  import_status_.load_failed = true;
3958  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3959  }
3960  import_buffers_vec.resize(num_slices);
3961  for (int slice = 0; slice < num_slices; slice++) {
3962  import_buffers_vec[slice].clear();
3963  for (const auto cd : cds) {
3964  import_buffers_vec[slice].emplace_back(
3965  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3966  }
3967  }
3968  /*
3969  * A caveat here is: Parquet files or arrow data is imported column wise.
3970  * Unlike importing row-wise csv files, a error on any row of any column
3971  * forces to give up entire row group of all columns, unless there is a
3972  * sophisticated method to trace erroneous rows in individual columns so
3973  * that we can union bad rows and drop them from corresponding
3974  * import_buffers_vec; otherwise, we may exceed maximum number of
3975  * truncated rows easily even with very sparse errors in the files.
3976  */
3977  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3978  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3979  auto& bad_rows_tracker = bad_rows_trackers[slice];
3980  bad_rows_tracker.file_name = file_path;
3981  bad_rows_tracker.row_group = slice;
3982  bad_rows_tracker.importer = this;
3983  }
3984  // process arrow arrays to import buffers
3985  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3986  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3987  const auto cd = cds[physical_col_idx];
3988  std::shared_ptr<arrow::ChunkedArray> array;
3989  PARQUET_THROW_NOT_OK(
3990  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3991  const size_t array_size = array->length();
3992  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3993  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3994  for (int slice = 0; slice < num_slices; ++slice) {
3995  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3997  import_status_.load_failed = true;
3998  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3999  }
4000  thread_controller.startThread([&, slice] {
4001  const auto slice_offset = slice % num_slices;
4002  ArraySliceRange slice_range(
4003  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
4004  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
4005  auto& bad_rows_tracker = bad_rows_trackers[slice];
4006  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4007  import_buffer->import_buffers = &import_buffers_vec[slice];
4008  import_buffer->col_idx = physical_col_idx + 1;
4009  for (auto chunk : array->chunks()) {
4010  import_buffer->add_arrow_values(
4011  cd, *chunk, false, slice_range, &bad_rows_tracker);
4012  }
4013  });
4014  }
4015  thread_controller.finish();
4016  }
4017  std::vector<size_t> nrow_in_slice_raw(num_slices);
4018  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
4019  // trim bad rows from import buffers
4020  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
4021  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
4022  const auto cd = cds[physical_col_idx];
4023  for (int slice = 0; slice < num_slices; ++slice) {
4024  auto& bad_rows_tracker = bad_rows_trackers[slice];
4025  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
4026  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
4027  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
4028  }
4029  }
4030  // flush slices of this row group to chunks
4031  for (int slice = 0; slice < num_slices; ++slice) {
4032  load(import_buffers_vec[slice],
4033  nrow_in_slice_successfully_loaded[slice],
4034  session_info);
4035  }
4036  // update import stats
4037  const auto nrow_original =
4038  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
4039  const auto nrow_imported =
4040  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
4041  nrow_in_slice_successfully_loaded.end(),
4042  0);
4043  const auto nrow_dropped = nrow_original - nrow_imported;
4044  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
4045  << " rows, drop " << nrow_dropped << " rows.";
4046  {
4048  import_status_.rows_completed += nrow_imported;
4049  import_status_.rows_rejected += nrow_dropped;
4050  if (import_status_.rows_rejected > copy_params.max_reject) {
4051  import_status_.load_failed = true;
4052  import_status_.load_msg = "Maximum (" + std::to_string(copy_params.max_reject) +
4053  ") rows rejected exceeded. Halting load.";
4054  LOG(ERROR) << "Maximum (" << copy_params.max_reject
4055  << ") rows rejected exceeded. Halting load.";
4056  }
4057  }
4058  // row estimate
4059  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4060  nrow_completed += nrow_imported;
4061  file_offsets.back() =
4062  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
4063  // sum up current total file offsets
4064  const auto total_file_offset =
4065  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
4066  // estimate number of rows per current total file offset
4067  if (total_file_offset) {
4069  (float)total_file_size / total_file_offset * import_status_.rows_completed;
4070  VLOG(3) << "rows_completed " << import_status_.rows_completed
4071  << ", rows_estimated " << import_status_.rows_estimated
4072  << ", total_file_size " << total_file_size << ", total_file_offset "
4073  << total_file_offset;
4074  }
4075  }
4076  });
4077  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
4078  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
4079 }
4080 
4081 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4082  const Catalog_Namespace::SessionInfo* session_info) {
4083  auto importer = dynamic_cast<Importer*>(this);
4084  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4085  : std::vector<Catalog_Namespace::TableEpochInfo>{};
4086  try {
4087  std::exception_ptr teptr;
4088  // file_paths may contain one local file path, a list of local file paths
4089  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4090  for (auto const& file_path : file_paths) {
4091  std::map<int, std::string> url_parts;
4092  Archive::parse_url(file_path, url_parts);
4093 
4094  // for a s3 url we need to know the obj keys that it comprises
4095  std::vector<std::string> objkeys;
4096  std::unique_ptr<S3ParquetArchive> us3arch;
4097  if ("s3" == url_parts[2]) {
4098 #ifdef HAVE_AWS_S3
4099  us3arch.reset(new S3ParquetArchive(file_path,
4100  copy_params.s3_access_key,
4101  copy_params.s3_secret_key,
4102  copy_params.s3_session_token,
4103  copy_params.s3_region,
4104  copy_params.s3_endpoint,
4105  copy_params.plain_text,
4106  copy_params.regex_path_filter,
4107  copy_params.file_sort_order_by,
4108  copy_params.file_sort_regex));
4109  us3arch->init_for_read();
4110  total_file_size += us3arch->get_total_file_size();
4111  objkeys = us3arch->get_objkeys();
4112 #else
4113  throw std::runtime_error("AWS S3 support not available");
4114 #endif // HAVE_AWS_S3
4115  } else {
4116  objkeys.emplace_back(file_path);
4117  }
4118 
4119  // for each obj key of a s3 url we need to land it before
4120  // importing it like doing with a 'local file'.
4121  for (auto const& objkey : objkeys) {
4122  try {
4123  auto file_path =
4124  us3arch
4125  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4126  : objkey;
4127  import_local_parquet(file_path, session_info);
4128  if (us3arch) {
4129  us3arch->vacuum(objkey);
4130  }
4131  } catch (...) {
4132  if (us3arch) {
4133  us3arch->vacuum(objkey);
4134  }
4135  throw;
4136  }
4139  break;
4140  }
4141  }
4142  }
4143  // rethrow any exception happened herebefore
4144  if (teptr) {
4145  std::rethrow_exception(teptr);
4146  }
4147  } catch (const shared::NoRegexFilterMatchException& e) {
4149  import_status_.load_failed = true;
4150  import_status_.load_msg = e.what();
4151  throw e;
4152  } catch (const std::exception& e) {
4154  import_status_.load_failed = true;
4155  import_status_.load_msg = e.what();
4156  }
4157 
4158  if (importer) {
4159  importer->checkpoint(table_epochs);
4160  }
4161 }
4162 #endif // ENABLE_IMPORT_PARQUET
4163 
4165  std::vector<std::string>& file_paths,
4166  const Catalog_Namespace::SessionInfo* session_info) {
4167  // a new requirement is to have one single input stream into
4168  // Importer::importDelimited, so need to move pipe related
4169  // stuff to the outmost block.
4170  int fd[2];
4171 #ifdef _WIN32
4172  // For some reason when folly is used to create the pipe, reader can
4173  // read nothing.
4174  auto pipe_res =
4175  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4176 #else
4177  auto pipe_res = pipe(fd);
4178 #endif
4179  if (pipe_res < 0) {
4180  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4181  }
4182 #ifndef _WIN32
4183  signal(SIGPIPE, SIG_IGN);
4184 #endif
4185 
4186  std::exception_ptr teptr;
4187  // create a thread to read uncompressed byte stream out of pipe and
4188  // feed into importDelimited()
4189  ImportStatus ret1;
4190  auto th_pipe_reader = std::thread([&]() {
4191  try {
4192  // importDelimited will read from FILE* p_file
4193  if (0 == (p_file = fdopen(fd[0], "r"))) {
4194  throw std::runtime_error(std::string("failed to open a pipe: ") +
4195  strerror(errno));
4196  }
4197 
4198  // in future, depending on data types of this uncompressed stream
4199  // it can be feed into other function such like importParquet, etc
4200  ret1 = importDelimited(file_path, true, session_info);
4201 
4202  } catch (...) {
4203  if (!teptr) { // no replace
4204  teptr = std::current_exception();
4205  }
4206  }
4207 
4208  if (p_file) {
4209  fclose(p_file);
4210  }
4211  p_file = 0;
4212  });
4213 
4214  // create a thread to iterate all files (in all archives) and
4215  // forward the uncompressed byte stream to fd[1] which is
4216  // then feed into importDelimited, importParquet, and etc.
4217  auto th_pipe_writer = std::thread([&]() {
4218  std::unique_ptr<S3Archive> us3arch;
4219  bool stop = false;
4220  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4221  try {
4222  auto file_path = file_paths[fi];
4223  std::unique_ptr<Archive> uarch;
4224  std::map<int, std::string> url_parts;
4225  Archive::parse_url(file_path, url_parts);
4226  const std::string S3_objkey_url_scheme = "s3ok";
4227  if ("file" == url_parts[2] || "" == url_parts[2]) {
4228  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4229  } else if ("s3" == url_parts[2]) {
4230 #ifdef HAVE_AWS_S3
4231  // new a S3Archive with a shared s3client.
4232  // should be safe b/c no wildcard with s3 url
4233  us3arch.reset(new S3Archive(file_path,
4234  copy_params.s3_access_key,
4235  copy_params.s3_secret_key,
4236  copy_params.s3_session_token,
4237  copy_params.s3_region,
4238  copy_params.s3_endpoint,
4239  copy_params.plain_text,
4240  copy_params.regex_path_filter,
4241  copy_params.file_sort_order_by,
4242  copy_params.file_sort_regex));
4243  us3arch->init_for_read();
4244  total_file_size += us3arch->get_total_file_size();
4245  // not land all files here but one by one in following iterations
4246  for (const auto& objkey : us3arch->get_objkeys()) {
4247  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4248  }
4249  continue;
4250 #else
4251  throw std::runtime_error("AWS S3 support not available");
4252 #endif // HAVE_AWS_S3
4253  } else if (S3_objkey_url_scheme == url_parts[2]) {
4254 #ifdef HAVE_AWS_S3
4255  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4256  auto file_path =
4257  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4258  if (0 == file_path.size()) {
4259  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4260  }
4261  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4262  // file not removed until file closed
4263  us3arch->vacuum(objkey);
4264 #else
4265  throw std::runtime_error("AWS S3 support not available");
4266 #endif // HAVE_AWS_S3
4267  }
4268 #if 0 // TODO(ppan): implement and enable any other archive class
4269  else
4270  if ("hdfs" == url_parts[2])
4271  uarch.reset(new HdfsArchive(file_path));
4272 #endif
4273  else {
4274  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4275  }
4276 
4277  // init the archive for read
4278  auto& arch = *uarch;
4279 
4280  // coming here, the archive of url should be ready to be read, unarchived
4281  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4282  const void* buf;
4283  size_t size;
4284  bool just_saw_archive_header;
4285  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4286  bool first_text_header_skipped = false;
4287  // start reading uncompressed bytes of this archive from libarchive
4288  // note! this archive may contain more than one files!
4289  file_offsets.push_back(0);
4290  size_t num_block_read = 0;
4291  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4292  bool insert_line_delim_after_this_file = false;
4293  while (!stop) {
4294  int64_t offset{-1};
4295  auto ok = arch.read_data_block(&buf, &size, &offset);
4296  // can't use (uncompressed) size, so track (max) file offset.
4297  // also we want to capture offset even on e.o.f.
4298  if (offset > 0) {
4299  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4300  file_offsets.back() = offset;
4301  }
4302  if (!ok) {
4303  break;
4304  }
4305  // one subtle point here is now we concatenate all files
4306  // to a single FILE stream with which we call importDelimited
4307  // only once. this would make it misunderstand that only one
4308  // header line is with this 'single' stream, while actually
4309  // we may have one header line for each of the files.
4310  // so we need to skip header lines here instead in importDelimited.
4311  const char* buf2 = (const char*)buf;
4312  int size2 = size;
4313  if (copy_params.has_header != import_export::ImportHeaderRow::kNoHeader &&
4314  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4315  while (size2-- > 0) {
4316  if (*buf2++ == copy_params.line_delim) {
4317  break;
4318  }
4319  }
4320  if (size2 <= 0) {
4321  LOG(WARNING) << "No line delimiter in block." << std::endl;
4322  } else {
4323  just_saw_archive_header = false;
4324  first_text_header_skipped = true;
4325  }
4326  }
4327  // In very rare occasions the write pipe somehow operates in a mode similar
4328  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4329  // means blocking mode. On such a unreliable blocking mode, a possible fix
4330  // is to loop reading till no bytes left, otherwise the annoying `failed to
4331  // write pipe: Success`...
4332  if (size2 > 0) {
4333  int nremaining = size2;
4334  while (nremaining > 0) {
4335  // try to write the entire remainder of the buffer to the pipe
4336  int nwritten = write(fd[1], buf2, nremaining);
4337  // how did we do?
4338  if (nwritten < 0) {
4339  // something bad happened
4340  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4341  // ignore these, assume nothing written, try again
4342  nwritten = 0;
4343  } else if (errno == EPIPE &&
4345  // the reader thread has shut down the pipe from the read end
4346  stop = true;
4347  break;
4348  } else {
4349  // a real error
4350  throw std::runtime_error(
4351  std::string("failed or interrupted write to pipe: ") +
4352  strerror(errno));
4353  }
4354  } else if (nwritten == nremaining) {
4355  // we wrote everything; we're done
4356  break;
4357  }
4358  // only wrote some (or nothing), try again
4359  nremaining -= nwritten;
4360  buf2 += nwritten;
4361  // no exception when too many rejected
4364  stop = true;
4365  break;
4366  }
4367  }
4368  // check that this file (buf for size) ended with a line delim
4369  if (size > 0) {
4370  const char* plast = static_cast<const char*>(buf) + (size - 1);
4371  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4372  }
4373  }
4374  ++num_block_read;
4375  }
4376 
4377  // if that file didn't end with a line delim, we insert one here to terminate
4378  // that file's stream use a loop for the same reason as above
4379  if (insert_line_delim_after_this_file) {
4380  while (true) {
4381  // write the delim char to the pipe
4382  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4383  // how did we do?
4384  if (nwritten < 0) {
4385  // something bad happened
4386  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4387  // ignore these, assume nothing written, try again
4388  nwritten = 0;
4389  } else if (errno == EPIPE &&
4391  // the reader thread has shut down the pipe from the read end
4392  stop = true;
4393  break;
4394  } else {
4395  // a real error
4396  throw std::runtime_error(
4397  std::string("failed or interrupted write to pipe: ") +
4398  strerror(errno));
4399  }
4400  } else if (nwritten == 1) {
4401  // we wrote it; we're done
4402  break;
4403  }
4404  }
4405  }
4406  }
4407  } catch (...) {
4408  // when import is aborted because too many data errors or because end of a
4409  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4410  // suppressed.
4413  break;
4414  }
4415  if (import_status_.rows_completed > 0) {
4416  if (nullptr != dynamic_cast<Detector*>(this)) {
4417  break;
4418  }
4419  }
4420  if (!teptr) { // no replace
4421  teptr = std::current_exception();
4422  }
4423  break;
4424  }
4425  }
4426  // close writer end
4427  close(fd[1]);
4428  });
4429 
4430  th_pipe_reader.join();
4431  th_pipe_writer.join();
4432 
4433  // rethrow any exception happened herebefore
4434  if (teptr) {
4435  std::rethrow_exception(teptr);
4436  }
4437 }
4438 
4440  return DataStreamSink::archivePlumber(session_info);
4441 }
4442 
4444  const std::string& file_path,
4445  const bool decompressed,
4446  const Catalog_Namespace::SessionInfo* session_info) {
4448  auto query_session = session_info ? session_info->get_session_id() : "";
4449 
4450  if (!p_file) {
4451  p_file = fopen(file_path.c_str(), "rb");
4452  }
4453  if (!p_file) {
4454  throw std::runtime_error("failed to open file '" + file_path +
4455  "': " + strerror(errno));
4456  }
4457 
4458  if (!decompressed) {
4459  (void)fseek(p_file, 0, SEEK_END);
4460  file_size = ftell(p_file);
4461  }
4462 
4463  max_threads = num_import_threads(copy_params.threads);
4464  VLOG(1) << "Delimited import # threads: " << max_threads;
4465 
4466  // deal with small files
4467  size_t alloc_size = copy_params.buffer_size;
4468  if (!decompressed && file_size < alloc_size) {
4469  alloc_size = file_size;
4470  }
4471 
4472  for (size_t i = 0; i < max_threads; i++) {
4473  import_buffers_vec.emplace_back();
4474  for (const auto cd : loader->get_column_descs()) {
4475  import_buffers_vec[i].emplace_back(
4476  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4477  }
4478  }
4479 
4480  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4481  size_t current_pos = 0;
4482  size_t end_pos;
4483  size_t begin_pos = 0;
4484 
4485  (void)fseek(p_file, current_pos, SEEK_SET);
4486  size_t size =
4487  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4488 
4489  // make render group analyzers for each poly column
4490  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4491  if (copy_params.geo_assign_render_groups) {
4493  auto& cat = loader->getCatalog();
4494  auto* td = loader->getTableDesc();
4495  CHECK(td);
4496  auto column_descriptors =
4497  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4498  for (auto const& cd : column_descriptors) {
4499  if (IS_GEO_POLY(cd->columnType.get_type())) {
4500  auto rga = std::make_shared<RenderGroupAnalyzer>();
4501  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4502  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4503  }
4504  }
4505  } else {
4506  fclose(p_file);
4507  throw std::runtime_error(
4508  "Render Group Assignment requested in CopyParams but disabled in Server "
4509  "Config. Set enable_assign_render_groups=true in Server Config to override.");
4510  }
4511  }
4512 
4513  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4514  loader->getTableDesc()->tableId};
4515  auto table_epochs = loader->getTableEpochs();
4517  {
4518  std::list<std::future<ImportStatus>> threads;
4519 
4520  // use a stack to track thread_ids which must not overlap among threads
4521  // because thread_id is used to index import_buffers_vec[]
4522  std::stack<size_t> stack_thread_ids;
4523  for (size_t i = 0; i < max_threads; i++) {
4524  stack_thread_ids.push(i);
4525  }
4526  // added for true row index on error
4527  size_t first_row_index_this_buffer = 0;
4528 
4529  while (size > 0) {
4530  unsigned int num_rows_this_buffer = 0;
4531  CHECK(scratch_buffer);
4532  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4533  scratch_buffer,
4534  size,
4535  copy_params,
4536  first_row_index_this_buffer,
4537  num_rows_this_buffer,
4538  p_file);
4539 
4540  // unput residual
4541  int nresidual = size - end_pos;
4542  std::unique_ptr<char[]> unbuf;
4543  if (nresidual > 0) {
4544  unbuf = std::make_unique<char[]>(nresidual);
4545  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4546  }
4547 
4548  // get a thread_id not in use
4549  auto thread_id = stack_thread_ids.top();
4550  stack_thread_ids.pop();
4551  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4552 
4553  threads.push_back(std::async(std::launch::async,
4555  thread_id,
4556  this,
4557  std::move(scratch_buffer),
4558  begin_pos,
4559  end_pos,
4560  end_pos,
4561  columnIdToRenderGroupAnalyzerMap,
4562  first_row_index_this_buffer,
4563  session_info,
4564  executor));
4565 
4566  first_row_index_this_buffer += num_rows_this_buffer;
4567 
4568  current_pos += end_pos;
4569  scratch_buffer = std::make_unique<char[]>(alloc_size);
4570  CHECK(scratch_buffer);
4571  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4572  size = nresidual +
4573  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4574 
4575  begin_pos = 0;
4576  while (threads.size() > 0) {
4577  int nready = 0;
4578  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4579  it != threads.end();) {
4580  auto& p = *it;
4581  std::chrono::milliseconds span(0);
4582  if (p.wait_for(span) == std::future_status::ready) {
4583  auto ret_import_status = p.get();
4584  {
4586  import_status_ += ret_import_status;
4587  if (ret_import_status.load_failed) {
4589  }
4590  }
4591  // sum up current total file offsets
4592  size_t total_file_offset{0};
4593  if (decompressed) {
4594  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4595  for (const auto file_offset : file_offsets) {
4596  total_file_offset += file_offset;
4597  }
4598  }
4599  // estimate number of rows per current total file offset
4600  if (decompressed ? total_file_offset : current_pos) {
4602  (decompressed ? (float)total_file_size / total_file_offset
4603  : (float)file_size / current_pos) *
4605  }
4606  VLOG(3) << "rows_completed " << import_status_.rows_completed
4607  << ", rows_estimated " << import_status_.rows_estimated
4608  << ", total_file_size " << total_file_size << ", total_file_offset "
4609  << total_file_offset;
4611  // recall thread_id for reuse
4612  stack_thread_ids.push(ret_import_status.thread_id);
4613  threads.erase(it++);
4614  ++nready;
4615  } else {
4616  ++it;
4617  }
4618  }
4619 
4620  if (nready == 0) {
4621  std::this_thread::yield();
4622  }
4623 
4624  // on eof, wait all threads to finish
4625  if (0 == size) {
4626  continue;
4627  }
4628 
4629  // keep reading if any free thread slot
4630  // this is one of the major difference from old threading model !!
4631  if (threads.size() < max_threads) {
4632  break;
4633  }
4636  break;
4637  }
4638  }
4640  if (import_status_.rows_rejected > copy_params.max_reject) {
4641  import_status_.load_failed = true;
4642  // todo use better message
4643  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4644  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4645  break;
4646  }
4648  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4649  break;
4650  }
4651  }
4652 
4653  // join dangling threads in case of LOG(ERROR) above
4654  for (auto& p : threads) {
4655  p.wait();
4656  }
4657  }
4658 
4659  checkpoint(table_epochs);
4660 
4661  fclose(p_file);
4662  p_file = nullptr;
4663  return import_status_;
4664 }
4665 
4667  if (getTableDesc()->persistenceLevel ==
4668  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4669  // tables
4671  }
4672 }
4673 
4674 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4675  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4676  getTableDesc()->tableId);
4677 }
4678 
4680  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4681  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4682 }
4683 
4684 /* static */
4686  const std::string& file_name,
4687  const CopyParams& copy_params) {
4690  copy_params.s3_endpoint,
4691  copy_params.s3_access_key,
4692  copy_params.s3_secret_key,
4693  copy_params.s3_session_token);
4694  if (copy_params.source_type != import_export::SourceType::kGeoFile) {
4695  throw std::runtime_error("Unexpected CopyParams.source_type (" +
4696  std::to_string(static_cast<int>(copy_params.source_type)) +
4697  ")");
4698  }
4700 }
4701 
4702 namespace {
4703 
4704 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4706  const std::string& file_name) {
4707  // get layer with specified name, or default to first layer
4708  OGRLayer* poLayer = nullptr;
4709  if (geo_layer_name.size()) {
4710  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4711  if (poLayer == nullptr) {
4712  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4713  file_name);
4714  }
4715  } else {
4716  poLayer = poDS->GetLayer(0);
4717  if (poLayer == nullptr) {
4718  throw std::runtime_error("No layers found in " + file_name);
4719  }
4720  }
4721  return *poLayer;
4722 }
4723 
4724 } // namespace
4725 
4726 /* static */
4728  const std::string& file_name,
4729  const std::string& geo_column_name,
4730  std::map<std::string, std::vector<std::string>>& sample_data,
4731  int row_limit,
4732  const CopyParams& copy_params) {
4734  openGDALDataSource(file_name, copy_params));
4735  if (datasource == nullptr) {
4736  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
4737  file_name);
4738  }
4739 
4740  OGRLayer& layer =
4741  getLayerWithSpecifiedName(copy_params.geo_layer_name, datasource, file_name);
4742 
4743  auto const* feature_defn = layer.GetLayerDefn();
4744  CHECK(feature_defn);
4745 
4746  // metadata columns?
4747  auto const metadata_column_infos =
4748  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4749 
4750  // get limited feature count
4751  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4752  auto const feature_count = static_cast<uint64_t>(layer.GetFeatureCount());
4753  auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4754 
4755  // prepare sample data map
4756  for (int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4757  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4758  CHECK(column_name);
4759  sample_data[column_name] = {};
4760  }
4761  sample_data[geo_column_name] = {};
4762  for (auto const& mci : metadata_column_infos) {
4763  sample_data[mci.column_descriptor.columnName] = {};
4764  }
4765 
4766  // prepare to read
4767  layer.ResetReading();
4768 
4769  // read features (up to limited count)
4770  uint64_t feature_index{0u};
4771  while (feature_index < num_features) {
4772  // get (and take ownership of) feature
4773  Geospatial::GDAL::FeatureUqPtr feature(layer.GetNextFeature());
4774  if (!feature) {
4775  break;
4776  }
4777 
4778  // get feature geometry
4779  auto const* geometry = feature->GetGeometryRef();
4780  if (geometry == nullptr) {
4781  break;
4782  }
4783 
4784  // validate geom type (again?)
4785  switch (wkbFlatten(geometry->getGeometryType())) {
4786  case wkbPoint:
4787  case wkbMultiPoint:
4788  case wkbLineString:
4789  case wkbMultiLineString:
4790  case wkbPolygon:
4791  case wkbMultiPolygon:
4792  break;
4793  default:
4794  throw std::runtime_error("Unsupported geometry type: " +
4795  std::string(geometry->getGeometryName()));
4796  }
4797 
4798  // populate sample data for regular field columns
4799  for (int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4800  auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4801  sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4802  }
4803 
4804  // populate sample data for metadata columns?
4805  for (auto const& mci : metadata_column_infos) {
4806  sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4807  }
4808 
4809  // populate sample data for geo column with WKT string
4810  char* wkts = nullptr;
4811  geometry->exportToWkt(&wkts);
4812  CHECK(wkts);
4813  sample_data[geo_column_name].push_back(wkts);
4814  CPLFree(wkts);
4815 
4816  // next feature
4817  feature_index++;
4818  }
4819 }
4820 
4821 namespace {
4822 
4823 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4824  switch (ogr_type) {
4825  case OFTInteger:
4826  return std::make_pair(kINT, false);
4827  case OFTIntegerList:
4828  return std::make_pair(kINT, true);
4829 #if GDAL_VERSION_MAJOR > 1
4830  case OFTInteger64:
4831  return std::make_pair(kBIGINT, false);
4832  case OFTInteger64List:
4833  return std::make_pair(kBIGINT, true);
4834 #endif
4835  case OFTReal:
4836  return std::make_pair(kDOUBLE, false);
4837  case OFTRealList:
4838  return std::make_pair(kDOUBLE, true);
4839  case OFTString:
4840  return std::make_pair(kTEXT, false);
4841  case OFTStringList:
4842  return std::make_pair(kTEXT, true);
4843  case OFTDate:
4844  return std::make_pair(kDATE, false);
4845  case OFTTime:
4846  return std::make_pair(kTIME, false);
4847  case OFTDateTime:
4848  return std::make_pair(kTIMESTAMP, false);
4849  case OFTBinary:
4850  // Interpret binary blobs as byte arrays here
4851  // but actual import will store NULL as GDAL will not
4852  // extract the blob (OGRFeature::GetFieldAsString will
4853  // result in the import buffers having an empty string)
4854  return std::make_pair(kTINYINT, true);
4855  default:
4856  break;
4857  }
4858  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4859 }
4860 
4861 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4862  switch (ogr_type) {
4863  case wkbPoint:
4864  return kPOINT;
4865  case wkbMultiPoint:
4866  return kMULTIPOINT;
4867  case wkbLineString:
4868  return kLINESTRING;
4869  case wkbMultiLineString:
4870  return kMULTILINESTRING;
4871  case wkbPolygon:
4872  return kPOLYGON;
4873  case wkbMultiPolygon:
4874  return kMULTIPOLYGON;
4875  default:
4876  break;
4877  }
4878  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4879 }
4880 
4882  const import_export::RasterPointType raster_point_type) {
4883  switch (raster_point_type) {
4898  }
4899  UNREACHABLE();
4901 }
4902 
4904  const import_export::RasterPointTransform raster_point_transform) {
4905  switch (raster_point_transform) {
4914  }
4915  UNREACHABLE();
4917 }
4918 
4919 } // namespace
4920 
4921 /* static */
4922 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4923  const std::string& file_name,
4924  const bool is_raster,
4925  const std::string& geo_column_name,
4926  const CopyParams& copy_params) {
4927  if (is_raster) {
4928  return gdalToColumnDescriptorsRaster(file_name, geo_column_name, copy_params);
4929  }
4930  return gdalToColumnDescriptorsGeo(file_name, geo_column_name, copy_params);
4931 }
4932 
4933 /* static */
4934 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsRaster(
4935  const std::string& file_name,
4936  const std::string& geo_column_name,
4937  const CopyParams& copy_params) {
4938  // lazy init GDAL
4941  copy_params.s3_endpoint,
4942  copy_params.s3_access_key,
4943  copy_params.s3_secret_key,
4944  copy_params.s3_session_token);
4945 
4946  // prepare for metadata column
4947  auto metadata_column_infos =
4948  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
4949 
4950  // create a raster importer and do the detect
4951  RasterImporter raster_importer;
4952  raster_importer.detect(
4953  file_name,
4954  copy_params.raster_import_bands,
4955  copy_params.raster_import_dimensions,
4958  copy_params.raster_point_compute_angle,
4959  false,
4960  metadata_column_infos);
4961 
4962  // prepare to capture column descriptors
4963  std::list<ColumnDescriptor> cds;
4964 
4965  // get the point column info
4966  auto const point_names_and_sql_types = raster_importer.getPointNamesAndSQLTypes();
4967 
4968  // create the columns for the point in the specified type
4969  for (auto const& [col_name, sql_type] : point_names_and_sql_types) {
4970  ColumnDescriptor cd;
4971  cd.columnName = cd.sourceName = col_name;
4972  cd.columnType.set_type(sql_type);
4973  // hardwire other POINT attributes for now
4974  if (sql_type == kPOINT) {
4976  cd.columnType.set_input_srid(4326);
4977  cd.columnType.set_output_srid(4326);
4979  cd.columnType.set_comp_param(32);
4980  }
4981  cds.push_back(cd);
4982  }
4983 
4984  // get the names and types for the band column(s)
4985  auto const band_names_and_types = raster_importer.getBandNamesAndSQLTypes();
4986 
4987  // add column descriptors for each band
4988  for (auto const& [band_name, sql_type] : band_names_and_types) {
4989  ColumnDescriptor cd;
4990  cd.columnName = cd.sourceName = band_name;
4991  cd.columnType.set_type(sql_type);
4993  cds.push_back(cd);
4994  }
4995 
4996  // metadata columns?
4997  for (auto& mci : metadata_column_infos) {
4998  cds.push_back(std::move(mci.column_descriptor));
4999  }
5000 
5001  // return the results
5002  return cds;
5003 }
5004 
5005 /* static */
5006 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptorsGeo(
5007  const std::string& file_name,
5008  const std::string& geo_column_name,
5009  const CopyParams& copy_params) {
5010  std::list<ColumnDescriptor> cds;
5011 
5012  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5013  if (poDS == nullptr) {
5014  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5015  file_name);
5016  }
5017  if (poDS->GetLayerCount() == 0) {
5018  throw std::runtime_error("gdalToColumnDescriptors Error: Geo file " + file_name +
5019  " has no layers");
5020  }
5021 
5022  OGRLayer& layer =
5023  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
5024 
5025  layer.ResetReading();
5026  // TODO(andrewseidl): support multiple features
5027  Geospatial::GDAL::FeatureUqPtr poFeature(layer.GetNextFeature());
5028  if (poFeature == nullptr) {
5029  throw std::runtime_error("No features found in " + file_name);
5030  }
5031  // get fields as regular columns
5032  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5033  CHECK(poFDefn);
5034  int iField;
5035  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5036  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5037  auto typePair = ogr_to_type(poFieldDefn->GetType());
5038  ColumnDescriptor cd;
5039  cd.columnName = poFieldDefn->GetNameRef();
5040  cd.sourceName = poFieldDefn->GetNameRef();
5041  SQLTypeInfo ti;
5042  if (typePair.second) {
5043  ti.set_type(kARRAY);
5044  ti.set_subtype(typePair.first);
5045  } else {
5046  ti.set_type(typePair.first);
5047  }
5048  if (typePair.first == kTEXT) {
5050  ti.set_comp_param(32);
5051  }
5052  ti.set_fixed_size();
5053  cd.columnType = ti;
5054  cds.push_back(cd);
5055  }
5056  // try getting the geo column type from the layer
5057  auto ogr_type = wkbFlatten(layer.GetGeomType());
5058  if (ogr_type == wkbUnknown) {
5059  // layer geo type unknown, so try reading from the first feature
5060  Geospatial::GDAL::FeatureUqPtr first_feature(layer.GetNextFeature());
5061  CHECK(first_feature);
5062  auto const* ogr_geometry = first_feature->GetGeometryRef();
5063  if (ogr_geometry) {
5064  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5065  } else {
5066  ogr_type = wkbNone;
5067  }
5068  }
5069  // do we have a geo column?
5070  if (ogr_type != wkbNone) {
5071  ColumnDescriptor cd;
5072  cd.columnName = geo_column_name;
5073  cd.sourceName = geo_column_name;
5074 
5075  // if exploding, override any collection type to child type
5076  if (copy_params.geo_explode_collections) {
5077  if (ogr_type == wkbMultiPolygon) {
5078  ogr_type = wkbPolygon;
5079  } else if (ogr_type == wkbMultiLineString) {
5080  ogr_type = wkbLineString;
5081  } else if (ogr_type == wkbMultiPoint) {
5082  ogr_type = wkbPoint;
5083  }
5084  }
5085 
5086  // convert to internal type
5087  // this will throw if the type is unsupported
5088  SQLTypes geoType = ogr_to_type(ogr_type);
5089 
5090  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
5092  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
5093  }
5094 
5095  // build full internal type
5096  SQLTypeInfo ti;
5097  ti.set_type(geoType);
5098  ti.set_subtype(copy_params.geo_coords_type);
5099  ti.set_input_srid(copy_params.geo_coords_srid);
5100  ti.set_output_srid(copy_params.geo_coords_srid);
5101  ti.set_compression(copy_params.geo_coords_encoding);
5102  ti.set_comp_param(copy_params.geo_coords_comp_param);
5103  cd.columnType = ti;
5104 
5105  cds.push_back(cd);
5106  }
5107 
5108  // metadata columns?
5109  auto metadata_column_infos =
5110  parse_add_metadata_columns(copy_params.add_metadata_columns, file_name);
5111  for (auto& mci : metadata_column_infos) {
5112  cds.push_back(std::move(mci.column_descriptor));
5113  }
5114 
5115  return cds;
5116 }
5117 
5118 bool Importer::gdalStatInternal(const std::string& path,
5119  const CopyParams& copy_params,
5120  bool also_dir) {
5121  // lazy init GDAL
5124  copy_params.s3_endpoint,
5125  copy_params.s3_access_key,
5126  copy_params.s3_secret_key,
5127  copy_params.s3_session_token);
5128 
5129 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5130  // clear GDAL stat cache
5131  // without this, file existence will be cached, even if authentication changes
5132  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
5133  VSICurlClearCache();
5134 #endif
5135 
5136  // stat path
5137  VSIStatBufL sb;
5138  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5139  if (result < 0) {
5140  return false;
5141  }
5142 
5143  // exists?
5144  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5145  return true;
5146  } else if (VSI_ISREG(sb.st_mode)) {
5147  return true;
5148  }
5149  return false;
5150 }
5151 
5152 /* static */
5153 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
5154  return gdalStatInternal(path, copy_params, false);
5155 }
5156 
5157 /* static */
5158 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
5159  const CopyParams& copy_params) {
5160  return gdalStatInternal(path, copy_params, true);
5161 }
5162 
5163 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
5164  std::vector<std::string>& files) {
5165  // prepare to gather subdirectories
5166  std::vector<std::string> subdirectories;
5167 
5168  // get entries
5169  char** entries = VSIReadDir(archive_path.c_str());
5170  if (!entries) {
5171  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
5172  return;
5173  }
5174 
5175  // force scope
5176  {
5177  // request clean-up
5178  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5179 
5180  // check all the entries
5181  int index = 0;
5182  while (true) {
5183  // get next entry, or drop out if there isn't one
5184  char* entry_c = entries[index++];
5185  if (!entry_c) {
5186  break;
5187  }
5188  std::string entry(entry_c);
5189 
5190  // ignore '.' and '..'
5191  if (entry == "." || entry == "..") {
5192  continue;
5193  }
5194 
5195  // build the full path
5196  std::string entry_path = archive_path + std::string("/") + entry;
5197 
5198  // is it a file or a sub-folder
5199  VSIStatBufL sb;
5200  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5201  if (result < 0) {
5202  break;
5203  }
5204 
5205  if (VSI_ISDIR(sb.st_mode)) {
5206  // a directory that ends with .gdb could be a Geodatabase bundle
5207  // arguably dangerous to decide this purely by name, but any further
5208  // validation would be very complex especially at this scope
5209  if (boost::iends_with(entry_path, ".gdb")) {
5210  // add the directory as if it was a file and don't recurse into it
5211  files.push_back(entry_path);
5212  } else {
5213  // add subdirectory to be recursed into
5214  subdirectories.push_back(entry_path);
5215  }
5216  } else {
5217  // add this file
5218  files.push_back(entry_path);
5219  }
5220  }
5221  }
5222 
5223  // recurse into each subdirectories we found
5224  for (const auto& subdirectory : subdirectories) {
5225  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5226  }
5227 }
5228 
5229 /* static */
5230 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5231  const std::string& archive_path,
5232  const CopyParams& copy_params) {
5233  // lazy init GDAL
5236  copy_params.s3_endpoint,
5237  copy_params.s3_access_key,
5238  copy_params.s3_secret_key,
5239  copy_params.s3_session_token);
5240 
5241  // prepare to gather files
5242  std::vector<std::string> files;
5243 
5244  // gather the files recursively
5245  gdalGatherFilesInArchiveRecursive(archive_path, files);
5246 
5247  // convert to relative paths inside archive
5248  for (auto& file : files) {
5249  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5250  }
5251 
5252  // done
5253  return files;
5254 }
5255 
5256 /* static */
5257 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5258  const std::string& file_name,
5259  const CopyParams& copy_params) {
5260  // lazy init GDAL
5263  copy_params.s3_endpoint,
5264  copy_params.s3_access_key,
5265  copy_params.s3_secret_key,
5266  copy_params.s3_session_token);
5267 
5268  // prepare to gather layer info
5269  std::vector<GeoFileLayerInfo> layer_info;
5270 
5271  // open the data set
5272  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_name, copy_params));
5273  if (poDS == nullptr) {
5274  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5275  file_name);
5276  }
5277 
5278  // enumerate the layers
5279  for (auto&& poLayer : poDS->GetLayers()) {
5281  // prepare to read this layer
5282  poLayer->ResetReading();
5283  // skip layer if empty
5284  if (poLayer->GetFeatureCount() > 0) {
5285  // first read layer geo type
5286  auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5287  if (ogr_type == wkbUnknown) {
5288  // layer geo type unknown, so try reading from the first feature
5289  Geospatial::GDAL::FeatureUqPtr first_feature(poLayer->GetNextFeature());
5290  CHECK(first_feature);
5291  auto const* ogr_geometry = first_feature->GetGeometryRef();
5292  if (ogr_geometry) {
5293  ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5294  } else {
5295  ogr_type = wkbNone;
5296  }
5297  }
5298  switch (ogr_type) {
5299  case wkbNone:
5300  // no geo
5301  contents = GeoFileLayerContents::NON_GEO;
5302  break;
5303  case wkbPoint:
5304  case wkbMultiPoint:
5305  case wkbLineString:
5306  case wkbMultiLineString:
5307  case wkbPolygon:
5308  case wkbMultiPolygon:
5309  // layer has supported geo
5310  contents = GeoFileLayerContents::GEO;
5311  break;
5312  default:
5313  // layer has unsupported geometry
5315  break;
5316  }
5317  }
5318  // store info for this layer
5319  layer_info.emplace_back(poLayer->GetName(), contents);
5320  }
5321 
5322  // done
5323  return layer_info;
5324 }
5325 
5327  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5328  const Catalog_Namespace::SessionInfo* session_info,
5329  const bool is_raster) {
5330  if (is_raster) {
5331  return importGDALRaster(session_info);
5332  }
5333  return importGDALGeo(columnNameToSourceNameMap, session_info);
5334 }
5335 
5337  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
5338  const Catalog_Namespace::SessionInfo* session_info) {
5339  // initial status
5341  Geospatial::GDAL::DataSourceUqPtr poDS(openGDALDataSource(file_path, copy_params));
5342  if (poDS == nullptr) {
5343  throw std::runtime_error("openGDALDataSource Error: Unable to open geo file " +
5344  file_path);
5345  }
5346 
5347  OGRLayer& layer =
5348  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_path);
5349 
5350  // get the number of features in this layer
5351  size_t numFeatures = layer.GetFeatureCount();
5352 
5353  // build map of metadata field (additional columns) name to index
5354  // use shared_ptr since we need to pass it to the worker
5355  FieldNameToIndexMapType fieldNameToIndexMap;
5356  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5357  CHECK(poFDefn);
5358  size_t numFields = poFDefn->GetFieldCount();
5359  for (size_t iField = 0; iField < numFields; iField++) {
5360  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5361  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5362  }
5363 
5364  // the geographic spatial reference we want to put everything in
5365  Geospatial::GDAL::SpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5366  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5367 
5368 #if GDAL_VERSION_MAJOR >= 3
5369  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5370  // this results in X and Y being transposed for angle-based
5371  // coordinate systems. This restores the previous behavior.
5372  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5373 #endif
5374 
5375 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5376  // just one "thread"
5377  max_threads = 1;
5378 #else
5379  // how many threads to use
5380  max_threads = num_import_threads(copy_params.threads);
5381 #endif
5382  VLOG(1) << "GDAL import # threads: " << max_threads;
5383 
5384  // metadata columns?
5385  auto const metadata_column_infos =
5386  parse_add_metadata_columns(copy_params.add_metadata_columns, file_path);
5387 
5388  // import geo table is specifically handled in both DBHandler and QueryRunner
5389  // that is separate path against a normal SQL execution
5390  // so we here explicitly enroll the import session to allow interruption
5391  // while importing geo table
5392  auto query_session = session_info ? session_info->get_session_id() : "";
5393  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5395  auto is_session_already_registered = false;
5396  {
5398  executor->getSessionLock());
5399  is_session_already_registered =
5400  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5401  }
5402  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5403  !is_session_already_registered) {
5404  executor->enrollQuerySession(query_session,
5405  "IMPORT_GEO_TABLE",
5406  query_submitted_time,
5408  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5409  }
5410  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5411  // reset the runtime query interrupt status
5412  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5413  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5414  }
5415  };
5416 
5417  // make an import buffer for each thread
5418  CHECK_EQ(import_buffers_vec.size(), 0u);
5419  import_buffers_vec.resize(max_threads);
5420  for (size_t i = 0; i < max_threads; i++) {
5421  for (const auto cd : loader->get_column_descs()) {
5422  import_buffers_vec[i].emplace_back(
5423  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5424  }
5425  }
5426 
5427  // make render group analyzers for each poly column
5428  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5429  if (copy_params.geo_assign_render_groups) {
5431  auto& cat = loader->getCatalog();
5432  auto* td = loader->getTableDesc();
5433  CHECK(td);
5434  auto column_descriptors =
5435  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5436  for (auto const& cd : column_descriptors) {
5437  if (IS_GEO_POLY(cd->columnType.get_type())) {
5438  auto rga = std::make_shared<RenderGroupAnalyzer>();
5439  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5440  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5441  }
5442  }
5443  } else {
5444  throw std::runtime_error(
5445  "Render Group Assignment requested in CopyParams but disabled in Server "
5446  "Config. Set enable_assign_render_groups=true in Server Config to override.");
5447  }
5448  }
5449 
5450 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5451  // threads
5452  std::list<std::future<ImportStatus>> threads;
5453 
5454  // use a stack to track thread_ids which must not overlap among threads
5455  // because thread_id is used to index import_buffers_vec[]
5456  std::stack<size_t> stack_thread_ids;
5457  for (size_t i = 0; i < max_threads; i++) {
5458  stack_thread_ids.push(i);
5459  }
5460 #endif
5461 
5462  // checkpoint the table
5463  auto table_epochs = loader->getTableEpochs();
5464 
5465  // reset the layer
5466  layer.ResetReading();
5467 
5468  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5469 
5470  // make a features buffer for each thread
5471  std::vector<FeaturePtrVector> features(max_threads);
5472 
5473  // make one of these for each thread, based on the first feature's SR
5474  std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5475  max_threads);
5476 
5477  // for each feature...
5478  <