OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/dynamic_bitset.hpp>
31 #include <boost/filesystem.hpp>
32 #include <boost/geometry.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
56 #include "Geospatial/Compression.h"
57 #include "Geospatial/GDAL.h"
58 #include "Geospatial/Transforms.h"
59 #include "Geospatial/Types.h"
61 #include "Logger/Logger.h"
63 #include "QueryEngine/Execute.h"
65 #include "RenderGroupAnalyzer.h"
66 #include "Shared/DateTimeParser.h"
67 #include "Shared/SqlTypesLayout.h"
68 #include "Shared/file_glob.h"
69 #include "Shared/import_helpers.h"
70 #include "Shared/likely.h"
71 #include "Shared/measure.h"
72 #include "Shared/misc.h"
73 #include "Shared/scope.h"
74 #include "Shared/shard_key.h"
75 #include "Shared/thread_count.h"
77 
78 #include "gen-cpp/OmniSci.h"
79 
80 #ifdef _WIN32
81 #include <fcntl.h>
82 #include <io.h>
83 #endif
84 
86  32; // Max number of default import threads to use (num hardware threads will be used
87  // if lower, and can also be explicitly overriden in copy statement with threads
88  // option)
89 size_t g_archive_read_buf_size = 1 << 20;
90 
91 inline auto get_filesize(const std::string& file_path) {
92  boost::filesystem::path boost_file_path{file_path};
93  boost::system::error_code ec;
94  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
95  return ec ? 0 : filesize;
96 }
97 
98 namespace {
99 bool check_session_interrupted(const QuerySessionId& query_session, Executor* executor) {
100  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
101  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
102  return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
103  }
104  return false;
105 }
106 
108  void operator()(OGRDataSource* datasource) {
109  if (datasource) {
110  GDALClose(datasource);
111  }
112  }
113 };
114 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
115 
117  void operator()(OGRFeature* feature) {
118  if (feature) {
119  OGRFeature::DestroyFeature(feature);
120  }
121  }
122 };
123 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
124 
126  void operator()(OGRSpatialReference* ref) {
127  if (ref) {
128  OGRSpatialReference::DestroySpatialReference(ref);
129  }
130  }
131 };
133  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
134 
135 } // namespace
136 
137 // For logging std::vector<std::string> row.
138 namespace boost {
139 namespace log {
140 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
141  out << '[';
142  for (size_t i = 0; i < row.size(); ++i) {
143  out << (i ? ", " : "") << row[i];
144  }
145  out << ']';
146  return out;
147 }
148 } // namespace log
149 } // namespace boost
150 
151 namespace import_export {
152 
153 using FieldNameToIndexMapType = std::map<std::string, size_t>;
154 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
156  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
157 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
158 
159 #define DEBUG_TIMING false
160 #define DEBUG_RENDER_GROUP_ANALYZER 0
161 #define DEBUG_AWS_AUTHENTICATION 0
162 
163 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
164 
165 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
166 
168 static std::map<std::string, ImportStatus> import_status_map;
169 
171  const TableDescriptor* t,
172  const std::string& f,
173  const CopyParams& p)
174  : Importer(new Loader(c, t), f, p) {}
175 
176 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
177  : DataStreamSink(p, f), loader(providedLoader) {
178  import_id = boost::filesystem::path(file_path).filename().string();
179  file_size = 0;
180  max_threads = 0;
181  p_file = nullptr;
182  buffer[0] = nullptr;
183  buffer[1] = nullptr;
184  // we may be overallocating a little more memory here due to dropping phy cols.
185  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
186  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
187  int i = 0;
188  bool has_array = false;
189  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
190  int skip_physical_cols = 0;
191  for (auto& p : loader->get_column_descs()) {
192  // phy geo columns can't be in input file
193  if (skip_physical_cols-- > 0) {
194  continue;
195  }
196  // neither are rowid or $deleted$
197  // note: columns can be added after rowid/$deleted$
198  if (p->isVirtualCol || p->isDeletedCol) {
199  continue;
200  }
201  skip_physical_cols = p->columnType.get_physical_cols();
202  if (p->columnType.get_type() == kARRAY) {
203  is_array.get()[i] = true;
204  has_array = true;
205  } else {
206  is_array.get()[i] = false;
207  }
208  ++i;
209  }
210  if (has_array) {
211  is_array_a = std::unique_ptr<bool[]>(is_array.release());
212  } else {
213  is_array_a = std::unique_ptr<bool[]>(nullptr);
214  }
215 }
216 
218  if (p_file != nullptr) {
219  fclose(p_file);
220  }
221  if (buffer[0] != nullptr) {
222  free(buffer[0]);
223  }
224  if (buffer[1] != nullptr) {
225  free(buffer[1]);
226  }
227 }
228 
229 ImportStatus Importer::get_import_status(const std::string& import_id) {
230  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
231  return import_status_map.at(import_id);
232 }
233 
234 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
235  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
236  is.end = std::chrono::steady_clock::now();
237  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
239 }
240 
241 static const std::string trim_space(const char* field, const size_t len) {
242  size_t i = 0;
243  size_t j = len;
244  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
245  i++;
246  }
247  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
248  j--;
249  }
250  return std::string(field + i, j - i);
251 }
252 
254  Datum d;
255  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
256  switch (type) {
257  case kBOOLEAN:
259  break;
260  case kBIGINT:
262  break;
263  case kINT:
265  break;
266  case kSMALLINT:
268  break;
269  case kTINYINT:
271  break;
272  case kFLOAT:
273  d.floatval = NULL_FLOAT;
274  break;
275  case kDOUBLE:
277  break;
278  case kTIME:
279  case kTIMESTAMP:
280  case kDATE:
282  break;
283  case kPOINT:
284  case kLINESTRING:
285  case kPOLYGON:
286  case kMULTIPOLYGON:
287  throw std::runtime_error("Internal error: geometry type in NullDatum.");
288  default:
289  throw std::runtime_error("Internal error: invalid type in NullDatum.");
290  }
291  return d;
292 }
293 
295  Datum d;
296  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
297  switch (type) {
298  case kBOOLEAN:
300  break;
301  case kBIGINT:
303  break;
304  case kINT:
306  break;
307  case kSMALLINT:
309  break;
310  case kTINYINT:
312  break;
313  case kFLOAT:
315  break;
316  case kDOUBLE:
318  break;
319  case kTIME:
320  case kTIMESTAMP:
321  case kDATE:
323  break;
324  case kPOINT:
325  case kLINESTRING:
326  case kPOLYGON:
327  case kMULTIPOLYGON:
328  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
329  default:
330  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
331  }
332  return d;
333 }
334 
335 ArrayDatum StringToArray(const std::string& s,
336  const SQLTypeInfo& ti,
337  const CopyParams& copy_params) {
338  SQLTypeInfo elem_ti = ti.get_elem_type();
339  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
340  return ArrayDatum(0, NULL, true);
341  }
342  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
343  LOG(WARNING) << "Malformed array: " << s;
344  return ArrayDatum(0, NULL, true);
345  }
346  std::vector<std::string> elem_strs;
347  size_t last = 1;
348  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
349  i = s.find(copy_params.array_delim, last)) {
350  elem_strs.push_back(s.substr(last, i - last));
351  last = i + 1;
352  }
353  if (last + 1 <= s.size()) {
354  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
355  }
356  if (elem_strs.size() == 1) {
357  auto str = elem_strs.front();
358  auto str_trimmed = trim_space(str.c_str(), str.length());
359  if (str_trimmed == "") {
360  elem_strs.clear(); // Empty array
361  }
362  }
363  if (!elem_ti.is_string()) {
364  size_t len = elem_strs.size() * elem_ti.get_size();
365  int8_t* buf = (int8_t*)checked_malloc(len);
366  int8_t* p = buf;
367  for (auto& es : elem_strs) {
368  auto e = trim_space(es.c_str(), es.length());
369  bool is_null = (e == copy_params.null_str) || e == "NULL";
370  if (!elem_ti.is_string() && e == "") {
371  is_null = true;
372  }
373  if (elem_ti.is_number() || elem_ti.is_time()) {
374  if (!isdigit(e[0]) && e[0] != '-') {
375  is_null = true;
376  }
377  }
378  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
379  p = appendDatum(p, d, elem_ti);
380  }
381  return ArrayDatum(len, buf, false);
382  }
383  // must not be called for array of strings
384  CHECK(false);
385  return ArrayDatum(0, NULL, true);
386 }
387 
389  SQLTypeInfo elem_ti = ti.get_elem_type();
390  auto len = ti.get_size();
391 
392  if (elem_ti.is_string()) {
393  // must not be called for array of strings
394  CHECK(false);
395  return ArrayDatum(0, NULL, true);
396  }
397 
398  if (len > 0) {
399  // Compose a NULL fixlen array
400  int8_t* buf = (int8_t*)checked_malloc(len);
401  // First scalar is a NULL_ARRAY sentinel
402  Datum d = NullArrayDatum(elem_ti);
403  int8_t* p = appendDatum(buf, d, elem_ti);
404  // Rest is filled with normal NULL sentinels
405  Datum d0 = NullDatum(elem_ti);
406  while ((p - buf) < len) {
407  p = appendDatum(p, d0, elem_ti);
408  }
409  CHECK((p - buf) == len);
410  return ArrayDatum(len, buf, true);
411  }
412  // NULL varlen array
413  return ArrayDatum(0, NULL, true);
414 }
415 
417  return NullArray(ti);
418 }
419 
421  const SQLTypeInfo& geo_ti) {
422  if (geo_ti.get_compression() == kENCODING_GEOINT) {
423  CHECK(geo_ti.get_comp_param() == 32);
424  std::vector<double> null_point_coords = {NULL_ARRAY_DOUBLE, NULL_DOUBLE};
425  auto compressed_null_coords = Geospatial::compress_coords(null_point_coords, geo_ti);
426  const size_t len = compressed_null_coords.size();
427  int8_t* buf = (int8_t*)checked_malloc(len);
428  memcpy(buf, compressed_null_coords.data(), len);
429  return ArrayDatum(len, buf, false);
430  }
431  auto modified_ti = coords_ti;
432  modified_ti.set_subtype(kDOUBLE);
434 }
435 
436 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
437  const auto& arr = datum.val.arr_val;
438  for (const auto& elem_datum : arr) {
439  string_vec.push_back(elem_datum.val.str_val);
440  }
441 }
442 
443 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
444  Datum d;
445  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
446  switch (type) {
447  case kBOOLEAN:
448  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
449  break;
450  case kBIGINT:
451  d.bigintval =
452  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
453  break;
454  case kINT:
455  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
456  break;
457  case kSMALLINT:
458  d.smallintval =
459  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
460  break;
461  case kTINYINT:
462  d.tinyintval =
463  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
464  break;
465  case kFLOAT:
466  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
467  break;
468  case kDOUBLE:
469  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
470  break;
471  case kTIME:
472  case kTIMESTAMP:
473  case kDATE:
474  d.bigintval =
475  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
476  break;
477  case kPOINT:
478  case kLINESTRING:
479  case kPOLYGON:
480  case kMULTIPOLYGON:
481  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
482  default:
483  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
484  }
485  return d;
486 }
487 
488 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
489  SQLTypeInfo elem_ti = ti.get_elem_type();
490 
491  CHECK(!elem_ti.is_string());
492 
493  if (datum.is_null) {
494  return NullArray(ti);
495  }
496 
497  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
498  int8_t* buf = (int8_t*)checked_malloc(len);
499  int8_t* p = buf;
500  for (auto& e : datum.val.arr_val) {
501  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
502  }
503 
504  return ArrayDatum(len, buf, false);
505 }
506 
507 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
509  std::vector<std::string_view> string_view_vec;
510  string_view_vec.reserve(string_vec.size());
511  for (const auto& str : string_vec) {
512  if (str.size() > StringDictionary::MAX_STRLEN) {
513  std::ostringstream oss;
514  oss << "while processing dictionary for column " << getColumnDesc()->columnName
515  << " a string was detected too long for encoding, string length = "
516  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
517  throw std::runtime_error(oss.str());
518  }
519  string_view_vec.push_back(str);
520  }
521  try {
522  switch (column_desc_->columnType.get_size()) {
523  case 1:
524  string_dict_i8_buffer_->resize(string_view_vec.size());
525  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
526  break;
527  case 2:
528  string_dict_i16_buffer_->resize(string_view_vec.size());
529  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
530  break;
531  case 4:
532  string_dict_i32_buffer_->resize(string_view_vec.size());
533  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
534  break;
535  default:
536  CHECK(false);
537  }
538  } catch (std::exception& e) {
539  std::ostringstream oss;
540  oss << "while processing dictionary for column " << getColumnDesc()->columnName
541  << " : " << e.what();
542  LOG(ERROR) << oss.str();
543  throw std::runtime_error(oss.str());
544  }
545 }
546 
548  const std::string_view val,
549  const bool is_null,
550  const CopyParams& copy_params) {
551  const auto type = cd->columnType.get_type();
552  switch (type) {
553  case kBOOLEAN: {
554  if (is_null) {
555  if (cd->columnType.get_notnull()) {
556  throw std::runtime_error("NULL for column " + cd->columnName);
557  }
559  } else {
560  auto ti = cd->columnType;
561  Datum d = StringToDatum(val, ti);
562  addBoolean(static_cast<int8_t>(d.boolval));
563  }
564  break;
565  }
566  case kTINYINT: {
567  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
568  auto ti = cd->columnType;
569  Datum d = StringToDatum(val, ti);
571  } else {
572  if (cd->columnType.get_notnull()) {
573  throw std::runtime_error("NULL for column " + cd->columnName);
574  }
576  }
577  break;
578  }
579  case kSMALLINT: {
580  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
581  auto ti = cd->columnType;
582  Datum d = StringToDatum(val, ti);
584  } else {
585  if (cd->columnType.get_notnull()) {
586  throw std::runtime_error("NULL for column " + cd->columnName);
587  }
589  }
590  break;
591  }
592  case kINT: {
593  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
594  auto ti = cd->columnType;
595  Datum d = StringToDatum(val, ti);
596  addInt(d.intval);
597  } else {
598  if (cd->columnType.get_notnull()) {
599  throw std::runtime_error("NULL for column " + cd->columnName);
600  }
602  }
603  break;
604  }
605  case kBIGINT: {
606  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
607  auto ti = cd->columnType;
608  Datum d = StringToDatum(val, ti);
609  addBigint(d.bigintval);
610  } else {
611  if (cd->columnType.get_notnull()) {
612  throw std::runtime_error("NULL for column " + cd->columnName);
613  }
615  }
616  break;
617  }
618  case kDECIMAL:
619  case kNUMERIC: {
620  if (!is_null) {
621  auto ti = cd->columnType;
622  Datum d = StringToDatum(val, ti);
623  addBigint(d.bigintval);
624  } else {
625  if (cd->columnType.get_notnull()) {
626  throw std::runtime_error("NULL for column " + cd->columnName);
627  }
629  }
630  break;
631  }
632  case kFLOAT:
633  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
634  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
635  } else {
636  if (cd->columnType.get_notnull()) {
637  throw std::runtime_error("NULL for column " + cd->columnName);
638  }
640  }
641  break;
642  case kDOUBLE:
643  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
644  addDouble(std::atof(std::string(val).c_str()));
645  } else {
646  if (cd->columnType.get_notnull()) {
647  throw std::runtime_error("NULL for column " + cd->columnName);
648  }
650  }
651  break;
652  case kTEXT:
653  case kVARCHAR:
654  case kCHAR: {
655  // @TODO(wei) for now, use empty string for nulls
656  if (is_null) {
657  if (cd->columnType.get_notnull()) {
658  throw std::runtime_error("NULL for column " + cd->columnName);
659  }
660  addString(std::string());
661  } else {
662  if (val.length() > StringDictionary::MAX_STRLEN) {
663  throw std::runtime_error("String too long for column " + cd->columnName +
664  " was " + std::to_string(val.length()) + " max is " +
666  }
667  addString(val);
668  }
669  break;
670  }
671  case kTIME:
672  case kTIMESTAMP:
673  case kDATE:
674  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
675  SQLTypeInfo ti = cd->columnType;
676  Datum d = StringToDatum(val, ti);
677  addBigint(d.bigintval);
678  } else {
679  if (cd->columnType.get_notnull()) {
680  throw std::runtime_error("NULL for column " + cd->columnName);
681  }
683  }
684  break;
685  case kARRAY: {
686  if (is_null && cd->columnType.get_notnull()) {
687  throw std::runtime_error("NULL for column " + cd->columnName);
688  }
689  SQLTypeInfo ti = cd->columnType;
690  if (IS_STRING(ti.get_subtype())) {
691  std::vector<std::string> string_vec;
692  // Just parse string array, don't push it to buffer yet as we might throw
694  std::string(val), copy_params, string_vec);
695  if (!is_null) {
696  // TODO: add support for NULL string arrays
697  if (ti.get_size() > 0) {
698  auto sti = ti.get_elem_type();
699  size_t expected_size = ti.get_size() / sti.get_size();
700  size_t actual_size = string_vec.size();
701  if (actual_size != expected_size) {
702  throw std::runtime_error("Fixed length array column " + cd->columnName +
703  " expects " + std::to_string(expected_size) +
704  " values, received " +
705  std::to_string(actual_size));
706  }
707  }
708  addStringArray(string_vec);
709  } else {
710  if (ti.get_size() > 0) {
711  // TODO: remove once NULL fixlen arrays are allowed
712  throw std::runtime_error("Fixed length array column " + cd->columnName +
713  " currently cannot accept NULL arrays");
714  }
715  // TODO: add support for NULL string arrays, replace with addStringArray(),
716  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
717  addStringArray(string_vec);
718  }
719  } else {
720  if (!is_null) {
721  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
722  if (d.is_null) { // val could be "NULL"
723  addArray(NullArray(ti));
724  } else {
725  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
726  throw std::runtime_error("Fixed length array for column " + cd->columnName +
727  " has incorrect length: " + std::string(val));
728  }
729  addArray(d);
730  }
731  } else {
732  addArray(NullArray(ti));
733  }
734  }
735  break;
736  }
737  case kPOINT:
738  case kLINESTRING:
739  case kPOLYGON:
740  case kMULTIPOLYGON:
741  addGeoString(val);
742  break;
743  default:
744  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
745  }
746 }
747 
749  const auto type = column_desc_->columnType.is_decimal()
752  switch (type) {
753  case kBOOLEAN:
754  bool_buffer_->pop_back();
755  break;
756  case kTINYINT:
757  tinyint_buffer_->pop_back();
758  break;
759  case kSMALLINT:
760  smallint_buffer_->pop_back();
761  break;
762  case kINT:
763  int_buffer_->pop_back();
764  break;
765  case kBIGINT:
766  bigint_buffer_->pop_back();
767  break;
768  case kFLOAT:
769  float_buffer_->pop_back();
770  break;
771  case kDOUBLE:
772  double_buffer_->pop_back();
773  break;
774  case kTEXT:
775  case kVARCHAR:
776  case kCHAR:
777  string_buffer_->pop_back();
778  break;
779  case kDATE:
780  case kTIME:
781  case kTIMESTAMP:
782  bigint_buffer_->pop_back();
783  break;
784  case kARRAY:
786  string_array_buffer_->pop_back();
787  } else {
788  array_buffer_->pop_back();
789  }
790  break;
791  case kPOINT:
792  case kLINESTRING:
793  case kPOLYGON:
794  case kMULTIPOLYGON:
795  geo_string_buffer_->pop_back();
796  break;
797  default:
798  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
799  }
800 }
801 
802 struct GeoImportException : std::runtime_error {
803  using std::runtime_error::runtime_error;
804 };
805 
806 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
807 template <typename DATA_TYPE>
809  const ColumnDescriptor* cd,
810  const Array& array,
811  std::vector<DATA_TYPE>& buffer,
812  const ArraySliceRange& slice_range,
813  import_export::BadRowsTracker* const bad_rows_tracker) {
814  auto data =
815  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
816  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
817  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
818  if (bad_rows_tracker && cd->columnType.is_geometry()) {
819  f_add_geo_phy_cols = [&](const int64_t row) {
820  // Populate physical columns (ref. DBHandler::load_table)
821  std::vector<double> coords, bounds;
822  std::vector<int> ring_sizes, poly_rings;
823  int render_group = 0;
824  SQLTypeInfo ti;
825  // replace any unexpected exception from getGeoColumns or other
826  // on this path with a GeoImportException so that we wont over
827  // push a null to the logical column...
828  try {
829  SQLTypeInfo import_ti{ti};
830  if (array.IsNull(row)) {
832  import_ti, coords, bounds, ring_sizes, poly_rings, false);
833  } else {
834  arrow_throw_if<GeoImportException>(
836  ti,
837  coords,
838  bounds,
839  ring_sizes,
840  poly_rings,
841  false),
842  error_context(cd, bad_rows_tracker) + "Invalid geometry");
843  arrow_throw_if<GeoImportException>(
844  cd->columnType.get_type() != ti.get_type(),
845  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
846  }
847  auto col_idx_workpad = col_idx; // what a pitfall!!
849  bad_rows_tracker->importer->getCatalog(),
850  cd,
852  col_idx_workpad,
853  coords,
854  bounds,
855  ring_sizes,
856  poly_rings,
857  render_group);
858  } catch (GeoImportException&) {
859  throw;
860  } catch (std::runtime_error& e) {
861  throw GeoImportException(e.what());
862  } catch (const std::exception& e) {
863  throw GeoImportException(e.what());
864  } catch (...) {
865  throw GeoImportException("unknown exception");
866  }
867  };
868  }
869  auto f_mark_a_bad_row = [&](const auto row) {
870  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
871  bad_rows_tracker->rows.insert(row - slice_range.first);
872  };
873  buffer.reserve(slice_range.second - slice_range.first);
874  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
875  try {
876  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
877  f_add_geo_phy_cols(row);
878  } catch (GeoImportException&) {
879  f_mark_a_bad_row(row);
880  } catch (ArrowImporterException&) {
881  // trace bad rows of each column; otherwise rethrow.
882  if (bad_rows_tracker) {
883  *data << nullptr;
884  f_mark_a_bad_row(row);
885  } else {
886  throw;
887  }
888  }
889  }
890  return buffer.size();
891 }
892 
894  const Array& col,
895  const bool exact_type_match,
896  const ArraySliceRange& slice_range,
897  BadRowsTracker* const bad_rows_tracker) {
898  const auto type = cd->columnType.get_type();
899  if (cd->columnType.get_notnull()) {
900  // We can't have any null values for this column; to have them is an error
901  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
902  }
903 
904  switch (type) {
905  case kBOOLEAN:
906  if (exact_type_match) {
907  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
908  }
910  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
911  case kTINYINT:
912  if (exact_type_match) {
913  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
914  }
916  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
917  case kSMALLINT:
918  if (exact_type_match) {
919  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
920  }
922  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
923  case kINT:
924  if (exact_type_match) {
925  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
926  }
928  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
929  case kBIGINT:
930  case kNUMERIC:
931  case kDECIMAL:
932  if (exact_type_match) {
933  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
934  }
936  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
937  case kFLOAT:
938  if (exact_type_match) {
939  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
940  }
942  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
943  case kDOUBLE:
944  if (exact_type_match) {
945  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
946  }
948  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
949  case kTEXT:
950  case kVARCHAR:
951  case kCHAR:
952  if (exact_type_match) {
953  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
954  "Expected string type");
955  }
957  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
958  case kTIME:
959  if (exact_type_match) {
960  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
961  "Expected time32 or time64 type");
962  }
964  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
965  case kTIMESTAMP:
966  if (exact_type_match) {
967  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
968  }
970  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
971  case kDATE:
972  if (exact_type_match) {
973  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
974  "Expected date32 or date64 type");
975  }
977  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
978  case kPOINT:
979  case kLINESTRING:
980  case kPOLYGON:
981  case kMULTIPOLYGON:
982  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
983  "Expected string type");
985  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
986  case kARRAY:
987  throw std::runtime_error("Arrow array appends not yet supported");
988  default:
989  throw std::runtime_error("Invalid Type");
990  }
991 }
992 
993 // this is exclusively used by load_table_binary_columnar
994 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
995  size_t dataSize = 0;
996  if (cd->columnType.get_notnull()) {
997  // We can't have any null values for this column; to have them is an error
998  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
999  throw std::runtime_error("NULL for column " + cd->columnName);
1000  }
1001  }
1002 
1003  switch (cd->columnType.get_type()) {
1004  case kBOOLEAN: {
1005  dataSize = col.data.int_col.size();
1006  bool_buffer_->reserve(dataSize);
1007  for (size_t i = 0; i < dataSize; i++) {
1008  if (col.nulls[i]) {
1010  } else {
1011  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
1012  }
1013  }
1014  break;
1015  }
1016  case kTINYINT: {
1017  dataSize = col.data.int_col.size();
1018  tinyint_buffer_->reserve(dataSize);
1019  for (size_t i = 0; i < dataSize; i++) {
1020  if (col.nulls[i]) {
1022  } else {
1023  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1024  }
1025  }
1026  break;
1027  }
1028  case kSMALLINT: {
1029  dataSize = col.data.int_col.size();
1030  smallint_buffer_->reserve(dataSize);
1031  for (size_t i = 0; i < dataSize; i++) {
1032  if (col.nulls[i]) {
1034  } else {
1035  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1036  }
1037  }
1038  break;
1039  }
1040  case kINT: {
1041  dataSize = col.data.int_col.size();
1042  int_buffer_->reserve(dataSize);
1043  for (size_t i = 0; i < dataSize; i++) {
1044  if (col.nulls[i]) {
1046  } else {
1047  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1048  }
1049  }
1050  break;
1051  }
1052  case kBIGINT:
1053  case kNUMERIC:
1054  case kDECIMAL: {
1055  dataSize = col.data.int_col.size();
1056  bigint_buffer_->reserve(dataSize);
1057  for (size_t i = 0; i < dataSize; i++) {
1058  if (col.nulls[i]) {
1060  } else {
1061  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1062  }
1063  }
1064  break;
1065  }
1066  case kFLOAT: {
1067  dataSize = col.data.real_col.size();
1068  float_buffer_->reserve(dataSize);
1069  for (size_t i = 0; i < dataSize; i++) {
1070  if (col.nulls[i]) {
1071  float_buffer_->push_back(NULL_FLOAT);
1072  } else {
1073  float_buffer_->push_back((float)col.data.real_col[i]);
1074  }
1075  }
1076  break;
1077  }
1078  case kDOUBLE: {
1079  dataSize = col.data.real_col.size();
1080  double_buffer_->reserve(dataSize);
1081  for (size_t i = 0; i < dataSize; i++) {
1082  if (col.nulls[i]) {
1083  double_buffer_->push_back(NULL_DOUBLE);
1084  } else {
1085  double_buffer_->push_back((double)col.data.real_col[i]);
1086  }
1087  }
1088  break;
1089  }
1090  case kTEXT:
1091  case kVARCHAR:
1092  case kCHAR: {
1093  // TODO: for now, use empty string for nulls
1094  dataSize = col.data.str_col.size();
1095  string_buffer_->reserve(dataSize);
1096  for (size_t i = 0; i < dataSize; i++) {
1097  if (col.nulls[i]) {
1098  string_buffer_->push_back(std::string());
1099  } else {
1100  string_buffer_->push_back(col.data.str_col[i]);
1101  }
1102  }
1103  break;
1104  }
1105  case kTIME:
1106  case kTIMESTAMP:
1107  case kDATE: {
1108  dataSize = col.data.int_col.size();
1109  bigint_buffer_->reserve(dataSize);
1110  for (size_t i = 0; i < dataSize; i++) {
1111  if (col.nulls[i]) {
1113  } else {
1114  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1115  }
1116  }
1117  break;
1118  }
1119  case kPOINT:
1120  case kLINESTRING:
1121  case kPOLYGON:
1122  case kMULTIPOLYGON: {
1123  dataSize = col.data.str_col.size();
1124  geo_string_buffer_->reserve(dataSize);
1125  for (size_t i = 0; i < dataSize; i++) {
1126  if (col.nulls[i]) {
1127  // TODO: add support for NULL geo
1128  geo_string_buffer_->push_back(std::string());
1129  } else {
1130  geo_string_buffer_->push_back(col.data.str_col[i]);
1131  }
1132  }
1133  break;
1134  }
1135  case kARRAY: {
1136  dataSize = col.data.arr_col.size();
1137  if (IS_STRING(cd->columnType.get_subtype())) {
1138  for (size_t i = 0; i < dataSize; i++) {
1139  std::vector<std::string>& string_vec = addStringArray();
1140  if (!col.nulls[i]) {
1141  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1142  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1143  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1144  }
1145  }
1146  }
1147  } else {
1148  auto elem_ti = cd->columnType.get_subtype();
1149  switch (elem_ti) {
1150  case kBOOLEAN: {
1151  for (size_t i = 0; i < dataSize; i++) {
1152  if (col.nulls[i]) {
1154  } else {
1155  size_t len = col.data.arr_col[i].data.int_col.size();
1156  size_t byteSize = len * sizeof(int8_t);
1157  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1158  int8_t* p = buf;
1159  for (size_t j = 0; j < len; ++j) {
1160  // Explicitly checking the item for null because
1161  // casting null value (-128) to bool results
1162  // incorrect value 1.
1163  if (col.data.arr_col[i].nulls[j]) {
1164  *p = static_cast<int8_t>(
1166  } else {
1167  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1168  }
1169  p += sizeof(bool);
1170  }
1171  addArray(ArrayDatum(byteSize, buf, false));
1172  }
1173  }
1174  break;
1175  }
1176  case kTINYINT: {
1177  for (size_t i = 0; i < dataSize; i++) {
1178  if (col.nulls[i]) {
1180  } else {
1181  size_t len = col.data.arr_col[i].data.int_col.size();
1182  size_t byteSize = len * sizeof(int8_t);
1183  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1184  int8_t* p = buf;
1185  for (size_t j = 0; j < len; ++j) {
1186  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1187  p += sizeof(int8_t);
1188  }
1189  addArray(ArrayDatum(byteSize, buf, false));
1190  }
1191  }
1192  break;
1193  }
1194  case kSMALLINT: {
1195  for (size_t i = 0; i < dataSize; i++) {
1196  if (col.nulls[i]) {
1198  } else {
1199  size_t len = col.data.arr_col[i].data.int_col.size();
1200  size_t byteSize = len * sizeof(int16_t);
1201  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1202  int8_t* p = buf;
1203  for (size_t j = 0; j < len; ++j) {
1204  *(int16_t*)p =
1205  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1206  p += sizeof(int16_t);
1207  }
1208  addArray(ArrayDatum(byteSize, buf, false));
1209  }
1210  }
1211  break;
1212  }
1213  case kINT: {
1214  for (size_t i = 0; i < dataSize; i++) {
1215  if (col.nulls[i]) {
1217  } else {
1218  size_t len = col.data.arr_col[i].data.int_col.size();
1219  size_t byteSize = len * sizeof(int32_t);
1220  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1221  int8_t* p = buf;
1222  for (size_t j = 0; j < len; ++j) {
1223  *(int32_t*)p =
1224  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1225  p += sizeof(int32_t);
1226  }
1227  addArray(ArrayDatum(byteSize, buf, false));
1228  }
1229  }
1230  break;
1231  }
1232  case kBIGINT:
1233  case kNUMERIC:
1234  case kDECIMAL: {
1235  for (size_t i = 0; i < dataSize; i++) {
1236  if (col.nulls[i]) {
1238  } else {
1239  size_t len = col.data.arr_col[i].data.int_col.size();
1240  size_t byteSize = len * sizeof(int64_t);
1241  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1242  int8_t* p = buf;
1243  for (size_t j = 0; j < len; ++j) {
1244  *(int64_t*)p =
1245  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1246  p += sizeof(int64_t);
1247  }
1248  addArray(ArrayDatum(byteSize, buf, false));
1249  }
1250  }
1251  break;
1252  }
1253  case kFLOAT: {
1254  for (size_t i = 0; i < dataSize; i++) {
1255  if (col.nulls[i]) {
1257  } else {
1258  size_t len = col.data.arr_col[i].data.real_col.size();
1259  size_t byteSize = len * sizeof(float);
1260  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1261  int8_t* p = buf;
1262  for (size_t j = 0; j < len; ++j) {
1263  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1264  p += sizeof(float);
1265  }
1266  addArray(ArrayDatum(byteSize, buf, false));
1267  }
1268  }
1269  break;
1270  }
1271  case kDOUBLE: {
1272  for (size_t i = 0; i < dataSize; i++) {
1273  if (col.nulls[i]) {
1275  } else {
1276  size_t len = col.data.arr_col[i].data.real_col.size();
1277  size_t byteSize = len * sizeof(double);
1278  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1279  int8_t* p = buf;
1280  for (size_t j = 0; j < len; ++j) {
1281  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1282  p += sizeof(double);
1283  }
1284  addArray(ArrayDatum(byteSize, buf, false));
1285  }
1286  }
1287  break;
1288  }
1289  case kTIME:
1290  case kTIMESTAMP:
1291  case kDATE: {
1292  for (size_t i = 0; i < dataSize; i++) {
1293  if (col.nulls[i]) {
1295  } else {
1296  size_t len = col.data.arr_col[i].data.int_col.size();
1297  size_t byteWidth = sizeof(int64_t);
1298  size_t byteSize = len * byteWidth;
1299  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1300  int8_t* p = buf;
1301  for (size_t j = 0; j < len; ++j) {
1302  *reinterpret_cast<int64_t*>(p) =
1303  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1304  p += sizeof(int64_t);
1305  }
1306  addArray(ArrayDatum(byteSize, buf, false));
1307  }
1308  }
1309  break;
1310  }
1311  default:
1312  throw std::runtime_error("Invalid Array Type");
1313  }
1314  }
1315  break;
1316  }
1317  default:
1318  throw std::runtime_error("Invalid Type");
1319  }
1320  return dataSize;
1321 }
1322 
1324  const TDatum& datum,
1325  const bool is_null) {
1326  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1327  : cd->columnType.get_type();
1328  switch (type) {
1329  case kBOOLEAN: {
1330  if (is_null) {
1331  if (cd->columnType.get_notnull()) {
1332  throw std::runtime_error("NULL for column " + cd->columnName);
1333  }
1335  } else {
1336  addBoolean((int8_t)datum.val.int_val);
1337  }
1338  break;
1339  }
1340  case kTINYINT:
1341  if (!is_null) {
1342  addTinyint((int8_t)datum.val.int_val);
1343  } else {
1344  if (cd->columnType.get_notnull()) {
1345  throw std::runtime_error("NULL for column " + cd->columnName);
1346  }
1348  }
1349  break;
1350  case kSMALLINT:
1351  if (!is_null) {
1352  addSmallint((int16_t)datum.val.int_val);
1353  } else {
1354  if (cd->columnType.get_notnull()) {
1355  throw std::runtime_error("NULL for column " + cd->columnName);
1356  }
1358  }
1359  break;
1360  case kINT:
1361  if (!is_null) {
1362  addInt((int32_t)datum.val.int_val);
1363  } else {
1364  if (cd->columnType.get_notnull()) {
1365  throw std::runtime_error("NULL for column " + cd->columnName);
1366  }
1368  }
1369  break;
1370  case kBIGINT:
1371  if (!is_null) {
1372  addBigint(datum.val.int_val);
1373  } else {
1374  if (cd->columnType.get_notnull()) {
1375  throw std::runtime_error("NULL for column " + cd->columnName);
1376  }
1378  }
1379  break;
1380  case kFLOAT:
1381  if (!is_null) {
1382  addFloat((float)datum.val.real_val);
1383  } else {
1384  if (cd->columnType.get_notnull()) {
1385  throw std::runtime_error("NULL for column " + cd->columnName);
1386  }
1388  }
1389  break;
1390  case kDOUBLE:
1391  if (!is_null) {
1392  addDouble(datum.val.real_val);
1393  } else {
1394  if (cd->columnType.get_notnull()) {
1395  throw std::runtime_error("NULL for column " + cd->columnName);
1396  }
1398  }
1399  break;
1400  case kTEXT:
1401  case kVARCHAR:
1402  case kCHAR: {
1403  // @TODO(wei) for now, use empty string for nulls
1404  if (is_null) {
1405  if (cd->columnType.get_notnull()) {
1406  throw std::runtime_error("NULL for column " + cd->columnName);
1407  }
1408  addString(std::string());
1409  } else {
1410  addString(datum.val.str_val);
1411  }
1412  break;
1413  }
1414  case kTIME:
1415  case kTIMESTAMP:
1416  case kDATE: {
1417  if (!is_null) {
1418  addBigint(datum.val.int_val);
1419  } else {
1420  if (cd->columnType.get_notnull()) {
1421  throw std::runtime_error("NULL for column " + cd->columnName);
1422  }
1424  }
1425  break;
1426  }
1427  case kARRAY:
1428  if (is_null && cd->columnType.get_notnull()) {
1429  throw std::runtime_error("NULL for column " + cd->columnName);
1430  }
1431  if (IS_STRING(cd->columnType.get_subtype())) {
1432  std::vector<std::string>& string_vec = addStringArray();
1433  addBinaryStringArray(datum, string_vec);
1434  } else {
1435  if (!is_null) {
1436  addArray(TDatumToArrayDatum(datum, cd->columnType));
1437  } else {
1439  }
1440  }
1441  break;
1442  case kPOINT:
1443  case kLINESTRING:
1444  case kPOLYGON:
1445  case kMULTIPOLYGON:
1446  if (is_null) {
1447  if (cd->columnType.get_notnull()) {
1448  throw std::runtime_error("NULL for column " + cd->columnName);
1449  }
1450  addGeoString(std::string());
1451  } else {
1452  addGeoString(datum.val.str_val);
1453  }
1454  break;
1455  default:
1456  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1457  }
1458 }
1459 
1460 void TypedImportBuffer::addDefaultValues(const ColumnDescriptor* cd, size_t num_rows) {
1461  bool is_null = !cd->default_value.has_value();
1462  CHECK(!(is_null && cd->columnType.get_notnull()));
1463  const auto type = cd->columnType.get_type();
1464  auto ti = cd->columnType;
1465  auto val = cd->default_value.value_or("NULL");
1466  CopyParams cp;
1467  switch (type) {
1468  case kBOOLEAN: {
1469  if (!is_null) {
1470  bool_buffer_->resize(num_rows, StringToDatum(val, ti).boolval);
1471  } else {
1472  bool_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1473  }
1474  break;
1475  }
1476  case kTINYINT: {
1477  if (!is_null) {
1478  tinyint_buffer_->resize(num_rows, StringToDatum(val, ti).tinyintval);
1479  } else {
1481  }
1482  break;
1483  }
1484  case kSMALLINT: {
1485  if (!is_null) {
1486  smallint_buffer_->resize(num_rows, StringToDatum(val, ti).smallintval);
1487  } else {
1488  smallint_buffer_->resize(num_rows,
1490  }
1491  break;
1492  }
1493  case kINT: {
1494  if (!is_null) {
1495  int_buffer_->resize(num_rows, StringToDatum(val, ti).intval);
1496  } else {
1497  int_buffer_->resize(num_rows, inline_fixed_encoding_null_val(cd->columnType));
1498  }
1499  break;
1500  }
1501  case kBIGINT: {
1502  if (!is_null) {
1503  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1504  } else {
1506  }
1507  break;
1508  }
1509  case kDECIMAL:
1510  case kNUMERIC: {
1511  if (!is_null) {
1512  const auto converted_decimal_value = convert_decimal_value_to_scale(
1513  StringToDatum(val, ti).bigintval, ti, cd->columnType);
1514  bigint_buffer_->resize(num_rows, converted_decimal_value);
1515  } else {
1517  }
1518  break;
1519  }
1520  case kFLOAT:
1521  if (!is_null) {
1522  float_buffer_->resize(num_rows,
1523  static_cast<float>(std::atof(std::string(val).c_str())));
1524  } else {
1525  float_buffer_->resize(num_rows, NULL_FLOAT);
1526  }
1527  break;
1528  case kDOUBLE:
1529  if (!is_null) {
1530  double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1531  } else {
1532  double_buffer_->resize(num_rows, NULL_DOUBLE);
1533  }
1534  break;
1535  case kTEXT:
1536  case kVARCHAR:
1537  case kCHAR: {
1538  if (is_null) {
1539  string_buffer_->resize(num_rows, "");
1540  } else {
1541  if (val.length() > StringDictionary::MAX_STRLEN) {
1542  throw std::runtime_error("String too long for column " + cd->columnName +
1543  " was " + std::to_string(val.length()) + " max is " +
1545  }
1546  string_buffer_->resize(num_rows, val);
1547  }
1548  break;
1549  }
1550  case kTIME:
1551  case kTIMESTAMP:
1552  case kDATE:
1553  if (!is_null) {
1554  bigint_buffer_->resize(num_rows, StringToDatum(val, ti).bigintval);
1555  } else {
1557  }
1558  break;
1559  case kARRAY: {
1560  if (IS_STRING(ti.get_subtype())) {
1561  std::vector<std::string> string_vec;
1562  // Just parse string array, don't push it to buffer yet as we might throw
1564  std::string(val), cp, string_vec);
1565  if (!is_null) {
1566  // TODO: add support for NULL string arrays
1567  if (ti.get_size() > 0) {
1568  auto sti = ti.get_elem_type();
1569  size_t expected_size = ti.get_size() / sti.get_size();
1570  size_t actual_size = string_vec.size();
1571  if (actual_size != expected_size) {
1572  throw std::runtime_error("Fixed length array column " + cd->columnName +
1573  " expects " + std::to_string(expected_size) +
1574  " values, received " +
1575  std::to_string(actual_size));
1576  }
1577  }
1578  string_array_buffer_->resize(num_rows, string_vec);
1579  } else {
1580  if (ti.get_size() > 0) {
1581  // TODO: remove once NULL fixlen arrays are allowed
1582  throw std::runtime_error("Fixed length array column " + cd->columnName +
1583  " currently cannot accept NULL arrays");
1584  }
1585  // TODO: add support for NULL string arrays, replace with addStringArray(),
1586  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
1587  string_array_buffer_->resize(num_rows, string_vec);
1588  }
1589  } else {
1590  if (!is_null) {
1591  ArrayDatum d = StringToArray(std::string(val), ti, cp);
1592  if (d.is_null) { // val could be "NULL"
1593  array_buffer_->resize(num_rows, NullArray(ti));
1594  } else {
1595  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
1596  throw std::runtime_error("Fixed length array for column " + cd->columnName +
1597  " has incorrect length: " + std::string(val));
1598  }
1599  array_buffer_->resize(num_rows, d);
1600  }
1601  } else {
1602  array_buffer_->resize(num_rows, NullArray(ti));
1603  }
1604  }
1605  break;
1606  }
1607  case kPOINT:
1608  case kLINESTRING:
1609  case kPOLYGON:
1610  case kMULTIPOLYGON:
1611  geo_string_buffer_->resize(num_rows, val);
1612  break;
1613  default:
1614  CHECK(false) << "TypedImportBuffer::addDefaultValues() does not support type "
1615  << type;
1616  }
1617 }
1618 
1619 bool importGeoFromLonLat(double lon,
1620  double lat,
1621  std::vector<double>& coords,
1622  SQLTypeInfo& ti) {
1623  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1624  return false;
1625  }
1626  if (ti.transforms()) {
1627  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1628  if (!pt.transform(ti)) {
1629  return false;
1630  }
1631  pt.getColumns(coords);
1632  return true;
1633  }
1634  coords.push_back(lon);
1635  coords.push_back(lat);
1636  return true;
1637 }
1638 
1640  const Catalog_Namespace::Catalog& catalog,
1641  const ColumnDescriptor* cd,
1642  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1643  size_t& col_idx,
1644  std::vector<double>& coords,
1645  std::vector<double>& bounds,
1646  std::vector<int>& ring_sizes,
1647  std::vector<int>& poly_rings,
1648  int render_group) {
1649  const auto col_ti = cd->columnType;
1650  const auto col_type = col_ti.get_type();
1651  auto columnId = cd->columnId;
1652  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1653  bool is_null_geo = false;
1654  bool is_null_point = false;
1655  if (!col_ti.get_notnull()) {
1656  // Check for NULL geo
1657  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1658  is_null_point = true;
1659  coords.clear();
1660  }
1661  is_null_geo = coords.empty();
1662  if (is_null_point) {
1663  coords.push_back(NULL_ARRAY_DOUBLE);
1664  coords.push_back(NULL_DOUBLE);
1665  // Treating POINT coords as notnull, need to store actual encoding
1666  // [un]compressed+[not]null
1667  is_null_geo = false;
1668  }
1669  }
1670  TDatum tdd_coords;
1671  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1672  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1673  // in a fixlen array, compressed and uncompressed.
1674  if (!is_null_geo) {
1675  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1676  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677  for (auto cc : compressed_coords) {
1678  tdd_coords.val.arr_val.emplace_back();
1679  tdd_coords.val.arr_val.back().val.int_val = cc;
1680  }
1681  }
1682  tdd_coords.is_null = is_null_geo;
1683  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1684 
1685  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1686  // Create ring_sizes array value and add it to the physical column
1687  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1688  TDatum tdd_ring_sizes;
1689  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1690  if (!is_null_geo) {
1691  for (auto ring_size : ring_sizes) {
1692  tdd_ring_sizes.val.arr_val.emplace_back();
1693  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1694  }
1695  }
1696  tdd_ring_sizes.is_null = is_null_geo;
1697  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1698  }
1699 
1700  if (col_type == kMULTIPOLYGON) {
1701  // Create poly_rings array value and add it to the physical column
1702  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  TDatum tdd_poly_rings;
1704  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1705  if (!is_null_geo) {
1706  for (auto num_rings : poly_rings) {
1707  tdd_poly_rings.val.arr_val.emplace_back();
1708  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1709  }
1710  }
1711  tdd_poly_rings.is_null = is_null_geo;
1712  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1713  }
1714 
1715  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1716  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1717  TDatum tdd_bounds;
1718  tdd_bounds.val.arr_val.reserve(bounds.size());
1719  if (!is_null_geo) {
1720  for (auto b : bounds) {
1721  tdd_bounds.val.arr_val.emplace_back();
1722  tdd_bounds.val.arr_val.back().val.real_val = b;
1723  }
1724  }
1725  tdd_bounds.is_null = is_null_geo;
1726  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1727  }
1728 
1729  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1730  // Create render_group value and add it to the physical column
1731  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1732  TDatum td_render_group;
1733  td_render_group.val.int_val = render_group;
1734  td_render_group.is_null = is_null_geo;
1735  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1736  }
1737 }
1738 
1740  const Catalog_Namespace::Catalog& catalog,
1741  const ColumnDescriptor* cd,
1742  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1743  size_t& col_idx,
1744  std::vector<std::vector<double>>& coords_column,
1745  std::vector<std::vector<double>>& bounds_column,
1746  std::vector<std::vector<int>>& ring_sizes_column,
1747  std::vector<std::vector<int>>& poly_rings_column,
1748  std::vector<int>& render_groups_column) {
1749  const auto col_ti = cd->columnType;
1750  const auto col_type = col_ti.get_type();
1751  auto columnId = cd->columnId;
1752 
1753  auto coords_row_count = coords_column.size();
1754  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1755  for (auto& coords : coords_column) {
1756  bool is_null_geo = false;
1757  bool is_null_point = false;
1758  if (!col_ti.get_notnull()) {
1759  // Check for NULL geo
1760  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1761  is_null_point = true;
1762  coords.clear();
1763  }
1764  is_null_geo = coords.empty();
1765  if (is_null_point) {
1766  coords.push_back(NULL_ARRAY_DOUBLE);
1767  coords.push_back(NULL_DOUBLE);
1768  // Treating POINT coords as notnull, need to store actual encoding
1769  // [un]compressed+[not]null
1770  is_null_geo = false;
1771  }
1772  }
1773  std::vector<TDatum> td_coords_data;
1774  if (!is_null_geo) {
1775  std::vector<uint8_t> compressed_coords =
1776  Geospatial::compress_coords(coords, col_ti);
1777  for (auto const& cc : compressed_coords) {
1778  TDatum td_byte;
1779  td_byte.val.int_val = cc;
1780  td_coords_data.push_back(td_byte);
1781  }
1782  }
1783  TDatum tdd_coords;
1784  tdd_coords.val.arr_val = td_coords_data;
1785  tdd_coords.is_null = is_null_geo;
1786  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1787  }
1788  col_idx++;
1789 
1790  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1791  if (ring_sizes_column.size() != coords_row_count) {
1792  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1793  }
1794  // Create ring_sizes array value and add it to the physical column
1795  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1796  for (auto const& ring_sizes : ring_sizes_column) {
1797  bool is_null_geo = false;
1798  if (!col_ti.get_notnull()) {
1799  // Check for NULL geo
1800  is_null_geo = ring_sizes.empty();
1801  }
1802  std::vector<TDatum> td_ring_sizes;
1803  for (auto const& ring_size : ring_sizes) {
1804  TDatum td_ring_size;
1805  td_ring_size.val.int_val = ring_size;
1806  td_ring_sizes.push_back(td_ring_size);
1807  }
1808  TDatum tdd_ring_sizes;
1809  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1810  tdd_ring_sizes.is_null = is_null_geo;
1811  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1812  }
1813  col_idx++;
1814  }
1815 
1816  if (col_type == kMULTIPOLYGON) {
1817  if (poly_rings_column.size() != coords_row_count) {
1818  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1819  }
1820  // Create poly_rings array value and add it to the physical column
1821  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1822  for (auto const& poly_rings : poly_rings_column) {
1823  bool is_null_geo = false;
1824  if (!col_ti.get_notnull()) {
1825  // Check for NULL geo
1826  is_null_geo = poly_rings.empty();
1827  }
1828  std::vector<TDatum> td_poly_rings;
1829  for (auto const& num_rings : poly_rings) {
1830  TDatum td_num_rings;
1831  td_num_rings.val.int_val = num_rings;
1832  td_poly_rings.push_back(td_num_rings);
1833  }
1834  TDatum tdd_poly_rings;
1835  tdd_poly_rings.val.arr_val = td_poly_rings;
1836  tdd_poly_rings.is_null = is_null_geo;
1837  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1838  }
1839  col_idx++;
1840  }
1841 
1842  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1843  if (bounds_column.size() != coords_row_count) {
1844  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1845  }
1846  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1847  for (auto const& bounds : bounds_column) {
1848  bool is_null_geo = false;
1849  if (!col_ti.get_notnull()) {
1850  // Check for NULL geo
1851  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1852  }
1853  std::vector<TDatum> td_bounds_data;
1854  for (auto const& b : bounds) {
1855  TDatum td_double;
1856  td_double.val.real_val = b;
1857  td_bounds_data.push_back(td_double);
1858  }
1859  TDatum tdd_bounds;
1860  tdd_bounds.val.arr_val = td_bounds_data;
1861  tdd_bounds.is_null = is_null_geo;
1862  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1863  }
1864  col_idx++;
1865  }
1866 
1867  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1868  // Create render_group value and add it to the physical column
1869  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1870  for (auto const& render_group : render_groups_column) {
1871  TDatum td_render_group;
1872  td_render_group.val.int_val = render_group;
1873  td_render_group.is_null = false;
1874  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1875  }
1876  col_idx++;
1877  }
1878 }
1879 
1880 namespace {
1881 
1882 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1883  const std::list<const ColumnDescriptor*>& col_descs) {
1884  // validate the columns
1885  // for now we can only explode into a single destination column
1886  // which must be of the child type (POLYGON, LINESTRING, POINT)
1887  int collection_col_idx = -1;
1888  int col_idx = 0;
1889  std::string collection_col_name;
1890  SQLTypes collection_child_type = kNULLT;
1891  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1892  auto const& cd = *cd_it;
1893  auto const col_type = cd->columnType.get_type();
1894  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1895  if (collection_col_idx >= 0) {
1896  throw std::runtime_error(
1897  "Explode Collections: Found more than one destination column");
1898  }
1899  collection_col_idx = col_idx;
1900  collection_child_type = col_type;
1901  collection_col_name = cd->columnName;
1902  }
1903  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1904  ++cd_it;
1905  }
1906  col_idx++;
1907  }
1908  if (collection_col_idx < 0) {
1909  throw std::runtime_error(
1910  "Explode Collections: Failed to find a supported column type to explode "
1911  "into");
1912  }
1913  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1914 }
1915 
1917  OGRGeometry* ogr_geometry,
1918  SQLTypes collection_child_type,
1919  const std::string& collection_col_name,
1920  size_t row_or_feature_idx,
1921  std::function<void(OGRGeometry*)> execute_import_lambda) {
1922  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1923  bool is_collection = false;
1924  switch (collection_child_type) {
1925  case kPOINT:
1926  switch (ogr_geometry_type) {
1927  case wkbMultiPoint:
1928  is_collection = true;
1929  break;
1930  case wkbPoint:
1931  break;
1932  default:
1933  throw std::runtime_error(
1934  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1935  }
1936  break;
1937  case kLINESTRING:
1938  switch (ogr_geometry_type) {
1939  case wkbMultiLineString:
1940  is_collection = true;
1941  break;
1942  case wkbLineString:
1943  break;
1944  default:
1945  throw std::runtime_error(
1946  "Explode Collections: Source geo type must be MULTILINESTRING or "
1947  "LINESTRING");
1948  }
1949  break;
1950  case kPOLYGON:
1951  switch (ogr_geometry_type) {
1952  case wkbMultiPolygon:
1953  is_collection = true;
1954  break;
1955  case wkbPolygon:
1956  break;
1957  default:
1958  throw std::runtime_error(
1959  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1960  }
1961  break;
1962  default:
1963  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1964  }
1965 
1966  int64_t us = 0LL;
1967 
1968  // explode or just import
1969  if (is_collection) {
1970  // cast to collection
1971  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1972  CHECK(collection_geometry);
1973 
1974 #if LOG_EXPLODE_COLLECTIONS
1975  // log number of children
1976  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1977  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1978  << " child rows";
1979 #endif
1980 
1981  // loop over children
1982  uint32_t child_geometry_count = 0;
1983  auto child_geometry_it = collection_geometry->begin();
1984  while (child_geometry_it != collection_geometry->end()) {
1985  // get and import this child
1986  OGRGeometry* import_geometry = *child_geometry_it;
1988  [&] { execute_import_lambda(import_geometry); });
1989 
1990  // next child
1991  child_geometry_it++;
1992  child_geometry_count++;
1993  }
1994  } else {
1995  // import non-collection row just once
1997  [&] { execute_import_lambda(ogr_geometry); });
1998  }
1999 
2000  // done
2001  return us;
2002 }
2003 
2004 } // namespace
2005 
2007  int thread_id,
2008  Importer* importer,
2009  std::unique_ptr<char[]> scratch_buffer,
2010  size_t begin_pos,
2011  size_t end_pos,
2012  size_t total_size,
2013  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2014  size_t first_row_index_this_buffer,
2015  const Catalog_Namespace::SessionInfo* session_info,
2016  Executor* executor) {
2017  ImportStatus thread_import_status;
2018  int64_t total_get_row_time_us = 0;
2019  int64_t total_str_to_val_time_us = 0;
2020  auto query_session = session_info ? session_info->get_session_id() : "";
2021  CHECK(scratch_buffer);
2022  auto buffer = scratch_buffer.get();
2023  auto load_ms = measure<>::execution([]() {});
2024 
2025  thread_import_status.thread_id = thread_id;
2026 
2027  auto ms = measure<>::execution([&]() {
2028  const CopyParams& copy_params = importer->get_copy_params();
2029  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2030  size_t begin =
2031  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
2032  const char* thread_buf = buffer + begin_pos + begin;
2033  const char* thread_buf_end = buffer + end_pos;
2034  const char* buf_end = buffer + total_size;
2035  bool try_single_thread = false;
2036  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2037  importer->get_import_buffers(thread_id);
2039  int phys_cols = 0;
2040  int point_cols = 0;
2041  for (const auto cd : col_descs) {
2042  const auto& col_ti = cd->columnType;
2043  phys_cols += col_ti.get_physical_cols();
2044  if (cd->columnType.get_type() == kPOINT) {
2045  point_cols++;
2046  }
2047  }
2048  auto num_cols = col_descs.size() - phys_cols;
2049  for (const auto& p : import_buffers) {
2050  p->clear();
2051  }
2052  std::vector<std::string_view> row;
2053  size_t row_index_plus_one = 0;
2054  for (const char* p = thread_buf; p < thread_buf_end; p++) {
2055  row.clear();
2056  std::vector<std::unique_ptr<char[]>>
2057  tmp_buffers; // holds string w/ removed escape chars, etc
2058  if (DEBUG_TIMING) {
2061  thread_buf_end,
2062  buf_end,
2063  copy_params,
2064  importer->get_is_array(),
2065  row,
2066  tmp_buffers,
2067  try_single_thread,
2068  true);
2069  });
2070  total_get_row_time_us += us;
2071  } else {
2073  thread_buf_end,
2074  buf_end,
2075  copy_params,
2076  importer->get_is_array(),
2077  row,
2078  tmp_buffers,
2079  try_single_thread,
2080  true);
2081  }
2082  row_index_plus_one++;
2083  // Each POINT could consume two separate coords instead of a single WKT
2084  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2085  thread_import_status.rows_rejected++;
2086  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
2087  << row.size() << "): " << shared::printContainer(row);
2088  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2089  break;
2090  }
2091  continue;
2092  }
2093 
2094  //
2095  // lambda for importing a row (perhaps multiple times if exploding a collection)
2096  //
2097 
2098  auto execute_import_row = [&](OGRGeometry* import_geometry) {
2099  size_t import_idx = 0;
2100  size_t col_idx = 0;
2101  try {
2102  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2103  auto cd = *cd_it;
2104  const auto& col_ti = cd->columnType;
2105 
2106  bool is_null =
2107  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
2108  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
2109  // So initially nullness may be missed and not passed to add_value,
2110  // which then might also check and still decide it's actually a NULL, e.g.
2111  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
2112  // So "NULL" is not recognized as NULL but then it's not recognized as
2113  // a valid kINT, so it's a NULL after all.
2114  // Checking for "NULL" here too, as a widely accepted notation for NULL.
2115 
2116  // Treating empty as NULL
2117  if (!cd->columnType.is_string() && row[import_idx].empty()) {
2118  is_null = true;
2119  }
2120 
2121  if (col_ti.get_physical_cols() == 0) {
2122  // not geo
2123 
2124  import_buffers[col_idx]->add_value(
2125  cd, row[import_idx], is_null, copy_params);
2126 
2127  // next
2128  ++import_idx;
2129  ++col_idx;
2130  } else {
2131  // geo
2132 
2133  // store null string in the base column
2134  import_buffers[col_idx]->add_value(
2135  cd, copy_params.null_str, true, copy_params);
2136 
2137  // WKT from string we're not storing
2138  auto const& geo_string = row[import_idx];
2139 
2140  // next
2141  ++import_idx;
2142  ++col_idx;
2143 
2144  SQLTypes col_type = col_ti.get_type();
2145  CHECK(IS_GEO(col_type));
2146 
2147  std::vector<double> coords;
2148  std::vector<double> bounds;
2149  std::vector<int> ring_sizes;
2150  std::vector<int> poly_rings;
2151  int render_group = 0;
2152 
2153  // if this is a POINT column, and the field is not null, and
2154  // looks like a scalar numeric value (and not a hex blob)
2155  // attempt to import two columns as lon/lat (or lat/lon)
2156  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
2157  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
2158  geo_string[0] == '-') &&
2159  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
2160  double lon = std::atof(std::string(geo_string).c_str());
2161  double lat = NAN;
2162  auto lat_str = row[import_idx];
2163  ++import_idx;
2164  if (lat_str.size() > 0 &&
2165  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
2166  lat = std::atof(std::string(lat_str).c_str());
2167  }
2168  // Swap coordinates if this table uses a reverse order: lat/lon
2169  if (!copy_params.lonlat) {
2170  std::swap(lat, lon);
2171  }
2172  // TODO: should check if POINT column should have been declared with
2173  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
2174  // throw std::runtime_error("POINT column " + cd->columnName + " is
2175  // not WGS84, cannot insert lon/lat");
2176  // }
2177  SQLTypeInfo import_ti{col_ti};
2178  if (copy_params.file_type == FileType::DELIMITED &&
2179  import_ti.get_output_srid() == 4326) {
2180  auto srid0 = copy_params.source_srid;
2181  if (srid0 > 0) {
2182  // srid0 -> 4326 transform is requested on import
2183  import_ti.set_input_srid(srid0);
2184  }
2185  }
2186  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2187  throw std::runtime_error(
2188  "Cannot read lon/lat to insert into POINT column " +
2189  cd->columnName);
2190  }
2191  } else {
2192  // import it
2193  SQLTypeInfo import_ti{col_ti};
2194  if (copy_params.file_type == FileType::DELIMITED &&
2195  import_ti.get_output_srid() == 4326) {
2196  auto srid0 = copy_params.source_srid;
2197  if (srid0 > 0) {
2198  // srid0 -> 4326 transform is requested on import
2199  import_ti.set_input_srid(srid0);
2200  }
2201  }
2202  if (is_null) {
2203  if (col_ti.get_notnull()) {
2204  throw std::runtime_error("NULL geo for column " + cd->columnName);
2205  }
2207  import_ti,
2208  coords,
2209  bounds,
2210  ring_sizes,
2211  poly_rings,
2213  } else {
2214  if (import_geometry) {
2215  // geometry already exploded
2217  import_geometry,
2218  import_ti,
2219  coords,
2220  bounds,
2221  ring_sizes,
2222  poly_rings,
2224  std::string msg =
2225  "Failed to extract valid geometry from exploded row " +
2226  std::to_string(first_row_index_this_buffer +
2227  row_index_plus_one) +
2228  " for column " + cd->columnName;
2229  throw std::runtime_error(msg);
2230  }
2231  } else {
2232  // extract geometry directly from WKT
2234  std::string(geo_string),
2235  import_ti,
2236  coords,
2237  bounds,
2238  ring_sizes,
2239  poly_rings,
2241  std::string msg = "Failed to extract valid geometry from row " +
2242  std::to_string(first_row_index_this_buffer +
2243  row_index_plus_one) +
2244  " for column " + cd->columnName;
2245  throw std::runtime_error(msg);
2246  }
2247  }
2248 
2249  // validate types
2250  if (col_type != import_ti.get_type()) {
2252  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2253  col_type == SQLTypes::kMULTIPOLYGON)) {
2254  throw std::runtime_error(
2255  "Imported geometry doesn't match the type of column " +
2256  cd->columnName);
2257  }
2258  }
2259  }
2260 
2261  // assign render group?
2262  if (columnIdToRenderGroupAnalyzerMap.size()) {
2263  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2264  if (ring_sizes.size()) {
2265  // get a suitable render group for these poly coords
2266  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2267  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2268  render_group =
2269  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2270  } else {
2271  // empty poly
2272  render_group = -1;
2273  }
2274  }
2275  }
2276  }
2277 
2278  // import extracted geo
2280  cd,
2281  import_buffers,
2282  col_idx,
2283  coords,
2284  bounds,
2285  ring_sizes,
2286  poly_rings,
2287  render_group);
2288 
2289  // skip remaining physical columns
2290  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2291  ++cd_it;
2292  }
2293  }
2294  }
2295  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2296  check_session_interrupted(query_session, executor))) {
2297  thread_import_status.load_failed = true;
2298  thread_import_status.load_msg =
2299  "Table load was cancelled via Query Interrupt";
2300  return;
2301  }
2302  thread_import_status.rows_completed++;
2303  } catch (const std::exception& e) {
2304  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2305  import_buffers[col_idx_to_pop]->pop_value();
2306  }
2307  thread_import_status.rows_rejected++;
2308  LOG(ERROR) << "Input exception thrown: " << e.what()
2309  << ". Row discarded. Data: " << shared::printContainer(row);
2310  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2311  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2312  thread_import_status.load_failed = true;
2313  thread_import_status.load_msg =
2314  "Load was cancelled due to max reject rows being reached";
2315  }
2316  }
2317  }; // End of lambda
2318 
2319  if (copy_params.geo_explode_collections) {
2320  // explode and import
2321  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2322  explode_collections_step1(col_descs);
2323  // pull out the collection WKT or WKB hex
2324  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2325  auto const& collection_geo_string = row[collection_col_idx];
2326  // convert to OGR
2327  OGRGeometry* ogr_geometry = nullptr;
2328  ScopeGuard destroy_ogr_geometry = [&] {
2329  if (ogr_geometry) {
2330  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2331  }
2332  };
2334  std::string(collection_geo_string));
2335  // do the explode and import
2336  us = explode_collections_step2(ogr_geometry,
2337  collection_child_type,
2338  collection_col_name,
2339  first_row_index_this_buffer + row_index_plus_one,
2340  execute_import_row);
2341  } else {
2342  // import non-collection row just once
2344  [&] { execute_import_row(nullptr); });
2345  }
2346 
2347  if (thread_import_status.load_failed) {
2348  break;
2349  }
2350  } // end thread
2351  total_str_to_val_time_us += us;
2352  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2353  load_ms = measure<>::execution([&]() {
2354  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2355  });
2356  }
2357  }); // end execution
2358 
2359  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2360  thread_import_status.rows_completed > 0) {
2361  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2362  << thread_import_status.rows_completed << " rows inserted in "
2363  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2364  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2365  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2366  << "sec" << std::endl;
2367  }
2368 
2369  return thread_import_status;
2370 }
2371 
2372 class ColumnNotGeoError : public std::runtime_error {
2373  public:
2374  ColumnNotGeoError(const std::string& column_name)
2375  : std::runtime_error("Column '" + column_name + "' is not a geo column") {}
2376 };
2377 
2379  int thread_id,
2380  Importer* importer,
2381  OGRSpatialReference* poGeographicSR,
2382  const FeaturePtrVector& features,
2383  size_t firstFeature,
2384  size_t numFeatures,
2385  const FieldNameToIndexMapType& fieldNameToIndexMap,
2386  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2387  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2388  const Catalog_Namespace::SessionInfo* session_info,
2389  Executor* executor) {
2390  ImportStatus thread_import_status;
2391  const CopyParams& copy_params = importer->get_copy_params();
2392  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2393  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2394  importer->get_import_buffers(thread_id);
2395  auto query_session = session_info ? session_info->get_session_id() : "";
2396  for (const auto& p : import_buffers) {
2397  p->clear();
2398  }
2399 
2400  auto convert_timer = timer_start();
2401 
2402  // we create this on the fly based on the first feature's SR
2403  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2404  bool checked_transformation{false};
2405 
2406  // for all the features in this chunk...
2407  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2408  // ignore null features
2409  if (!features[iFeature]) {
2410  continue;
2411  }
2412 
2413  // get this feature's geometry
2414  // for geodatabase, we need to consider features with no geometry
2415  // as we still want to create a table, even if it has no geo column
2416  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2417  if (pGeometry) {
2418  // check if we need to make a CoordinateTransformation
2419  // we assume that all features in this chunk will have
2420  // the same source SR, so the CT will be valid for all
2421  // transforming to a reusable CT is faster than to an SR
2422  if (!checked_transformation) {
2423  // get the SR of the incoming geo
2424  auto geometry_sr = pGeometry->getSpatialReference();
2425  // if the SR is non-null and non-empty and different from what we want
2426  // we need to make a reusable CoordinateTransformation
2427  if (geometry_sr &&
2428 #if GDAL_VERSION_MAJOR >= 3
2429  !geometry_sr->IsEmpty() &&
2430 #endif
2431  !geometry_sr->IsSame(poGeographicSR)) {
2432  // validate the SR before trying to use it
2433  if (geometry_sr->Validate() != OGRERR_NONE) {
2434  throw std::runtime_error("Incoming geo has invalid Spatial Reference");
2435  }
2436  // create the OGRCoordinateTransformation that will be used for
2437  // all the features in this chunk
2438  if (coordinate_transformation == nullptr) {
2439  coordinate_transformation.reset(
2440  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2441  if (coordinate_transformation == nullptr) {
2442  throw std::runtime_error(
2443  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2444  }
2445  }
2446  }
2447  checked_transformation = true;
2448  }
2449 
2450  // if we have a transformation, use it
2451  if (coordinate_transformation) {
2452  pGeometry->transform(coordinate_transformation.get());
2453  }
2454  }
2455 
2456  //
2457  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2458  //
2459 
2460  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2461  size_t col_idx = 0;
2462  try {
2463  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2464  check_session_interrupted(query_session, executor))) {
2465  thread_import_status.load_failed = true;
2466  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2468  }
2469  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2470  auto cd = *cd_it;
2471 
2472  // is this a geo column?
2473  const auto& col_ti = cd->columnType;
2474  if (col_ti.is_geometry()) {
2475  // Note that this assumes there is one and only one geo column in the
2476  // table. Currently, the importer only supports reading a single
2477  // geospatial feature from an input shapefile / geojson file, but this
2478  // code will need to be modified if that changes
2479  SQLTypes col_type = col_ti.get_type();
2480 
2481  // store null string in the base column
2482  import_buffers[col_idx]->add_value(
2483  cd, copy_params.null_str, true, copy_params);
2484  ++col_idx;
2485 
2486  // the data we now need to extract for the other columns
2487  std::vector<double> coords;
2488  std::vector<double> bounds;
2489  std::vector<int> ring_sizes;
2490  std::vector<int> poly_rings;
2491  int render_group = 0;
2492 
2493  // extract it
2494  SQLTypeInfo import_ti{col_ti};
2495  bool is_null_geo = !import_geometry;
2496  if (is_null_geo) {
2497  if (col_ti.get_notnull()) {
2498  throw std::runtime_error("NULL geo for column " + cd->columnName);
2499  }
2501  import_ti,
2502  coords,
2503  bounds,
2504  ring_sizes,
2505  poly_rings,
2507  } else {
2509  import_geometry,
2510  import_ti,
2511  coords,
2512  bounds,
2513  ring_sizes,
2514  poly_rings,
2516  std::string msg = "Failed to extract valid geometry from feature " +
2517  std::to_string(firstFeature + iFeature + 1) +
2518  " for column " + cd->columnName;
2519  throw std::runtime_error(msg);
2520  }
2521 
2522  // validate types
2523  if (col_type != import_ti.get_type()) {
2525  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2526  col_type == SQLTypes::kMULTIPOLYGON)) {
2527  throw std::runtime_error(
2528  "Imported geometry doesn't match the type of column " +
2529  cd->columnName);
2530  }
2531  }
2532  }
2533 
2534  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2535  if (ring_sizes.size()) {
2536  // get a suitable render group for these poly coords
2537  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2538  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2539  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2540  } else {
2541  // empty poly
2542  render_group = -1;
2543  }
2544  }
2545 
2546  // create coords array value and add it to the physical column
2547  ++cd_it;
2548  auto cd_coords = *cd_it;
2549  std::vector<TDatum> td_coord_data;
2550  if (!is_null_geo) {
2551  std::vector<uint8_t> compressed_coords =
2552  Geospatial::compress_coords(coords, col_ti);
2553  for (auto cc : compressed_coords) {
2554  TDatum td_byte;
2555  td_byte.val.int_val = cc;
2556  td_coord_data.push_back(td_byte);
2557  }
2558  }
2559  TDatum tdd_coords;
2560  tdd_coords.val.arr_val = td_coord_data;
2561  tdd_coords.is_null = is_null_geo;
2562  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2563  ++col_idx;
2564 
2565  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2566  // Create ring_sizes array value and add it to the physical column
2567  ++cd_it;
2568  auto cd_ring_sizes = *cd_it;
2569  std::vector<TDatum> td_ring_sizes;
2570  if (!is_null_geo) {
2571  for (auto ring_size : ring_sizes) {
2572  TDatum td_ring_size;
2573  td_ring_size.val.int_val = ring_size;
2574  td_ring_sizes.push_back(td_ring_size);
2575  }
2576  }
2577  TDatum tdd_ring_sizes;
2578  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2579  tdd_ring_sizes.is_null = is_null_geo;
2580  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2581  ++col_idx;
2582  }
2583 
2584  if (col_type == kMULTIPOLYGON) {
2585  // Create poly_rings array value and add it to the physical column
2586  ++cd_it;
2587  auto cd_poly_rings = *cd_it;
2588  std::vector<TDatum> td_poly_rings;
2589  if (!is_null_geo) {
2590  for (auto num_rings : poly_rings) {
2591  TDatum td_num_rings;
2592  td_num_rings.val.int_val = num_rings;
2593  td_poly_rings.push_back(td_num_rings);
2594  }
2595  }
2596  TDatum tdd_poly_rings;
2597  tdd_poly_rings.val.arr_val = td_poly_rings;
2598  tdd_poly_rings.is_null = is_null_geo;
2599  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2600  ++col_idx;
2601  }
2602 
2603  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2604  col_type == kMULTIPOLYGON) {
2605  // Create bounds array value and add it to the physical column
2606  ++cd_it;
2607  auto cd_bounds = *cd_it;
2608  std::vector<TDatum> td_bounds_data;
2609  if (!is_null_geo) {
2610  for (auto b : bounds) {
2611  TDatum td_double;
2612  td_double.val.real_val = b;
2613  td_bounds_data.push_back(td_double);
2614  }
2615  }
2616  TDatum tdd_bounds;
2617  tdd_bounds.val.arr_val = td_bounds_data;
2618  tdd_bounds.is_null = is_null_geo;
2619  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2620  ++col_idx;
2621  }
2622 
2623  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2624  // Create render_group value and add it to the physical column
2625  ++cd_it;
2626  auto cd_render_group = *cd_it;
2627  TDatum td_render_group;
2628  td_render_group.val.int_val = render_group;
2629  td_render_group.is_null = is_null_geo;
2630  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2631  ++col_idx;
2632  }
2633  } else {
2634  // regular column
2635  // pull from GDAL metadata
2636  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2637  CHECK(cit != columnNameToSourceNameMap.end());
2638  auto const& field_name = cit->second;
2639 
2640  auto const fit = fieldNameToIndexMap.find(field_name);
2641  if (fit == fieldNameToIndexMap.end()) {
2642  throw ColumnNotGeoError(cd->columnName);
2643  }
2644 
2645  auto const& field_index = fit->second;
2646  CHECK(field_index < fieldNameToIndexMap.size());
2647 
2648  auto const& feature = features[iFeature];
2649 
2650  auto field_defn = feature->GetFieldDefnRef(field_index);
2651  CHECK(field_defn);
2652 
2653  // OGRFeature::GetFieldAsString() can only return 80 characters
2654  // so for array columns, we are obliged to fetch the actual values
2655  // and construct the concatenated string ourselves
2656 
2657  std::string value_string;
2658  int array_index = 0, array_size = 0;
2659 
2660  auto stringify_numeric_list = [&](auto* values) {
2661  value_string = "{";
2662  while (array_index < array_size) {
2663  auto separator = (array_index > 0) ? "," : "";
2664  value_string += separator + std::to_string(values[array_index]);
2665  array_index++;
2666  }
2667  value_string += "}";
2668  };
2669 
2670  auto field_type = field_defn->GetType();
2671  switch (field_type) {
2672  case OFTInteger:
2673  case OFTInteger64:
2674  case OFTReal:
2675  case OFTString:
2676  case OFTBinary:
2677  case OFTDate:
2678  case OFTTime:
2679  case OFTDateTime: {
2680  value_string = feature->GetFieldAsString(field_index);
2681  } break;
2682  case OFTIntegerList: {
2683  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2684  stringify_numeric_list(values);
2685  } break;
2686  case OFTInteger64List: {
2687  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2688  stringify_numeric_list(values);
2689  } break;
2690  case OFTRealList: {
2691  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2692  stringify_numeric_list(values);
2693  } break;
2694  case OFTStringList: {
2695  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2696  value_string = "{";
2697  if (array_of_strings) {
2698  while (auto* this_string = array_of_strings[array_index]) {
2699  auto separator = (array_index > 0) ? "," : "";
2700  value_string += separator + std::string(this_string);
2701  array_index++;
2702  }
2703  }
2704  value_string += "}";
2705  } break;
2706  default:
2707  throw std::runtime_error("Unsupported geo file field type (" +
2708  std::to_string(static_cast<int>(field_type)) +
2709  ")");
2710  }
2711 
2712  static CopyParams default_copy_params;
2713  import_buffers[col_idx]->add_value(
2714  cd, value_string, false, default_copy_params);
2715  ++col_idx;
2716  }
2717  }
2718  thread_import_status.rows_completed++;
2719  } catch (QueryExecutionError& e) {
2721  throw e;
2722  }
2723  } catch (ColumnNotGeoError& e) {
2724  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Aborting import.";
2725  throw std::runtime_error(e.what());
2726  } catch (const std::exception& e) {
2727  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2728  import_buffers[col_idx_to_pop]->pop_value();
2729  }
2730  thread_import_status.rows_rejected++;
2731  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2732  }
2733  };
2734 
2735  if (pGeometry && copy_params.geo_explode_collections) {
2736  // explode and import
2737  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2738  explode_collections_step1(col_descs);
2739  explode_collections_step2(pGeometry,
2740  collection_child_type,
2741  collection_col_name,
2742  firstFeature + iFeature + 1,
2743  execute_import_feature);
2744  } else {
2745  // import non-collection or null feature just once
2746  execute_import_feature(pGeometry);
2747  }
2748  } // end features
2749 
2750  float convert_ms =
2751  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2752  convert_timer)) /
2753  1000.0f;
2754 
2755  float load_ms = 0.0f;
2756  if (thread_import_status.rows_completed > 0) {
2757  auto load_timer = timer_start();
2758  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2759  load_ms =
2760  float(
2761  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2762  load_timer)) /
2763  1000.0f;
2764  }
2765 
2766  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2767  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2768  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2769  }
2770 
2771  thread_import_status.thread_id = thread_id;
2772 
2773  if (DEBUG_TIMING) {
2774  LOG(INFO) << "DEBUG: Total "
2775  << float(timer_stop<std::chrono::steady_clock::time_point,
2776  std::chrono::microseconds>(convert_timer)) /
2777  1000.0f
2778  << "ms";
2779  }
2780 
2781  return thread_import_status;
2782 }
2783 
2785  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2786  size_t row_count,
2787  const Catalog_Namespace::SessionInfo* session_info) {
2788  return loadImpl(import_buffers, row_count, false, session_info);
2789 }
2790 
2791 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2792  size_t row_count,
2793  const Catalog_Namespace::SessionInfo* session_info) {
2794  return loadImpl(import_buffers, row_count, true, session_info);
2795 }
2796 
2797 namespace {
2798 
2799 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2800  const auto& ti = import_buffer.getTypeInfo();
2801  const int8_t* values_buffer{nullptr};
2802  if (ti.is_string()) {
2803  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2804  values_buffer = import_buffer.getStringDictBuffer();
2805  } else {
2806  values_buffer = import_buffer.getAsBytes();
2807  }
2808  CHECK(values_buffer);
2809  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2810  switch (logical_size) {
2811  case 1: {
2812  return values_buffer[index];
2813  }
2814  case 2: {
2815  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2816  }
2817  case 4: {
2818  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2819  }
2820  case 8: {
2821  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2822  }
2823  default:
2824  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2825  }
2826  UNREACHABLE();
2827  return 0;
2828 }
2829 
2830 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2831  const auto& ti = import_buffer.getTypeInfo();
2832  CHECK_EQ(kFLOAT, ti.get_type());
2833  const auto values_buffer = import_buffer.getAsBytes();
2834  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2835 }
2836 
2837 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2838  const auto& ti = import_buffer.getTypeInfo();
2839  CHECK_EQ(kDOUBLE, ti.get_type());
2840  const auto values_buffer = import_buffer.getAsBytes();
2841  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2842 }
2843 
2844 } // namespace
2845 
2846 void Loader::fillShardRow(const size_t row_index,
2847  OneShardBuffers& shard_output_buffers,
2848  const OneShardBuffers& import_buffers) {
2849  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2850  const auto& input_buffer = import_buffers[col_idx];
2851  const auto& col_ti = input_buffer->getTypeInfo();
2852  const auto type =
2853  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2854 
2855  switch (type) {
2856  case kBOOLEAN:
2857  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2858  break;
2859  case kTINYINT:
2860  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2861  break;
2862  case kSMALLINT:
2863  shard_output_buffers[col_idx]->addSmallint(
2864  int_value_at(*input_buffer, row_index));
2865  break;
2866  case kINT:
2867  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2868  break;
2869  case kBIGINT:
2870  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2871  break;
2872  case kFLOAT:
2873  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2874  break;
2875  case kDOUBLE:
2876  shard_output_buffers[col_idx]->addDouble(
2877  double_value_at(*input_buffer, row_index));
2878  break;
2879  case kTEXT:
2880  case kVARCHAR:
2881  case kCHAR: {
2882  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2883  shard_output_buffers[col_idx]->addString(
2884  (*input_buffer->getStringBuffer())[row_index]);
2885  break;
2886  }
2887  case kTIME:
2888  case kTIMESTAMP:
2889  case kDATE:
2890  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2891  break;
2892  case kARRAY:
2893  if (IS_STRING(col_ti.get_subtype())) {
2894  CHECK(input_buffer->getStringArrayBuffer());
2895  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2896  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2897  shard_output_buffers[col_idx]->addStringArray(input_arr);
2898  } else {
2899  shard_output_buffers[col_idx]->addArray(
2900  (*input_buffer->getArrayBuffer())[row_index]);
2901  }
2902  break;
2903  case kPOINT:
2904  case kLINESTRING:
2905  case kPOLYGON:
2906  case kMULTIPOLYGON: {
2907  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2908  shard_output_buffers[col_idx]->addGeoString(
2909  (*input_buffer->getGeoStringBuffer())[row_index]);
2910  break;
2911  }
2912  default:
2913  CHECK(false);
2914  }
2915  }
2916 }
2917 
2919  std::vector<OneShardBuffers>& all_shard_import_buffers,
2920  std::vector<size_t>& all_shard_row_counts,
2921  const OneShardBuffers& import_buffers,
2922  const size_t row_count,
2923  const size_t shard_count,
2924  const Catalog_Namespace::SessionInfo* session_info) {
2925  int col_idx{0};
2926  const ColumnDescriptor* shard_col_desc{nullptr};
2927  for (const auto col_desc : column_descs_) {
2928  ++col_idx;
2929  if (col_idx == table_desc_->shardedColumnId) {
2930  shard_col_desc = col_desc;
2931  break;
2932  }
2933  }
2934  CHECK(shard_col_desc);
2935  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2936  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2937  const auto& shard_col_ti = shard_col_desc->columnType;
2938  CHECK(shard_col_ti.is_integer() ||
2939  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2940  shard_col_ti.is_time());
2941  if (shard_col_ti.is_string()) {
2942  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2943  CHECK(payloads_ptr);
2944  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2945  }
2946 
2947  for (size_t i = 0; i < row_count; ++i) {
2948  const size_t shard =
2949  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2950  auto& shard_output_buffers = all_shard_import_buffers[shard];
2951  fillShardRow(i, shard_output_buffers, import_buffers);
2952  ++all_shard_row_counts[shard];
2953  }
2954 }
2955 
2957  std::vector<OneShardBuffers>& all_shard_import_buffers,
2958  std::vector<size_t>& all_shard_row_counts,
2959  const OneShardBuffers& import_buffers,
2960  const size_t row_count,
2961  const size_t shard_count,
2962  const Catalog_Namespace::SessionInfo* session_info) {
2963  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2964  CHECK(shard_tds.size() == shard_count);
2965 
2966  for (size_t shard = 0; shard < shard_count; ++shard) {
2967  auto& shard_output_buffers = all_shard_import_buffers[shard];
2968  if (row_count != 0) {
2969  fillShardRow(0, shard_output_buffers, import_buffers);
2970  }
2971  // when replicating a column, row count of a shard == replicate count of the column
2972  // on the shard
2973  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2974  }
2975 }
2976 
2977 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2978  std::vector<size_t>& all_shard_row_counts,
2979  const OneShardBuffers& import_buffers,
2980  const size_t row_count,
2981  const size_t shard_count,
2982  const Catalog_Namespace::SessionInfo* session_info) {
2983  all_shard_row_counts.resize(shard_count);
2984  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2985  all_shard_import_buffers.emplace_back();
2986  for (const auto& typed_import_buffer : import_buffers) {
2987  all_shard_import_buffers.back().emplace_back(
2988  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2989  typed_import_buffer->getStringDictionary()));
2990  }
2991  }
2993  if (isAddingColumns()) {
2994  distributeToShardsNewColumns(all_shard_import_buffers,
2995  all_shard_row_counts,
2996  import_buffers,
2997  row_count,
2998  shard_count,
2999  session_info);
3000  } else {
3001  distributeToShardsExistingColumns(all_shard_import_buffers,
3002  all_shard_row_counts,
3003  import_buffers,
3004  row_count,
3005  shard_count,
3006  session_info);
3007  }
3008 }
3009 
3011  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3012  size_t row_count,
3013  bool checkpoint,
3014  const Catalog_Namespace::SessionInfo* session_info) {
3015  if (load_callback_) {
3016  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
3017  return load_callback_(import_buffers, data_blocks, row_count);
3018  }
3019  if (table_desc_->nShards) {
3020  std::vector<OneShardBuffers> all_shard_import_buffers;
3021  std::vector<size_t> all_shard_row_counts;
3022  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
3023  distributeToShards(all_shard_import_buffers,
3024  all_shard_row_counts,
3025  import_buffers,
3026  row_count,
3027  shard_tables.size(),
3028  session_info);
3029  bool success = true;
3030  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3031  success = success && loadToShard(all_shard_import_buffers[shard_idx],
3032  all_shard_row_counts[shard_idx],
3033  shard_tables[shard_idx],
3034  checkpoint,
3035  session_info);
3036  }
3037  return success;
3038  }
3039  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
3040 }
3041 
3043  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3044  std::vector<DataBlockPtr> result(import_buffers.size());
3045  std::vector<std::pair<const size_t, std::future<int8_t*>>>
3046  encoded_data_block_ptrs_futures;
3047  // make all async calls to string dictionary here and then continue execution
3048  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3049  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
3050  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
3051  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3052  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
3053 
3054  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3055  buf_idx,
3056  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
3057  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3058  return import_buffers[buf_idx]->getStringDictBuffer();
3059  })));
3060  }
3061  }
3062 
3063  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
3064  DataBlockPtr p;
3065  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
3066  import_buffers[buf_idx]->getTypeInfo().is_time() ||
3067  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
3068  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
3069  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
3070  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
3071  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
3072  p.stringsPtr = string_payload_ptr;
3073  } else {
3074  // This condition means we have column which is ENCODED string. We already made
3075  // Async request to gain the encoded integer values above so we should skip this
3076  // iteration and continue.
3077  continue;
3078  }
3079  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
3080  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
3081  p.stringsPtr = geo_payload_ptr;
3082  } else {
3083  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
3084  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
3085  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
3086  import_buffers[buf_idx]->addDictEncodedStringArray(
3087  *import_buffers[buf_idx]->getStringArrayBuffer());
3088  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
3089  } else {
3090  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
3091  }
3092  }
3093  result[buf_idx] = p;
3094  }
3095 
3096  // wait for the async requests we made for string dictionary
3097  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3098  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3099  }
3100  return result;
3101 }
3102 
3104  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3105  size_t row_count,
3106  const TableDescriptor* shard_table,
3107  bool checkpoint,
3108  const Catalog_Namespace::SessionInfo* session_info) {
3109  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
3111  ins_data.numRows = row_count;
3112  bool success = false;
3113  try {
3114  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
3115  } catch (std::exception& e) {
3116  std::ostringstream oss;
3117  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
3118  << e.what();
3119 
3120  LOG(ERROR) << oss.str();
3121  error_msg_ = oss.str();
3122  return success;
3123  }
3124  if (isAddingColumns()) {
3125  // when Adding columns we omit any columns except the ones being added
3126  ins_data.columnIds.clear();
3127  ins_data.is_default.clear();
3128  for (auto& buffer : import_buffers) {
3129  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
3130  ins_data.is_default.push_back(true);
3131  }
3132  } else {
3133  ins_data.is_default.resize(ins_data.columnIds.size(), false);
3134  }
3135  // release loader_lock so that in InsertOrderFragmenter::insertDat
3136  // we can have multiple threads sort/shuffle InsertData
3137  loader_lock.unlock();
3138  success = true;
3139  {
3140  try {
3141  if (checkpoint) {
3142  shard_table->fragmenter->insertData(ins_data);
3143  } else {
3144  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
3145  }
3146  } catch (std::exception& e) {
3147  std::ostringstream oss;
3148  oss << "Fragmenter Insert Exception when processing Table "
3149  << shard_table->tableName << " issue was " << e.what();
3150 
3151  LOG(ERROR) << oss.str();
3152  loader_lock.lock();
3153  error_msg_ = oss.str();
3154  success = false;
3155  }
3156  }
3157  return success;
3158 }
3159 
3160 void Loader::dropColumns(const std::vector<int>& columnIds) {
3161  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
3162  if (table_desc_->nShards) {
3164  }
3165  for (auto table_desc : table_descs) {
3166  table_desc->fragmenter->dropColumns(columnIds);
3167  }
3168 }
3169 
3170 void Loader::init(const bool use_catalog_locks) {
3173  for (auto cd : column_descs_) {
3174  insert_data_.columnIds.push_back(cd->columnId);
3175  if (cd->columnType.get_compression() == kENCODING_DICT) {
3176  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3177  const auto dd = use_catalog_locks
3178  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
3180  cd->columnType.get_comp_param(), true);
3181  CHECK(dd);
3182  dict_map_[cd->columnId] = dd->stringDict.get();
3183  }
3184  }
3185  insert_data_.numRows = 0;
3186 }
3187 
3190  split_raw_data();
3192 }
3193 
3195  const std::string& file_path,
3196  const bool decompressed,
3197  const Catalog_Namespace::SessionInfo* session_info) {
3198  // we do not check interrupt status for this detection
3199  if (!p_file) {
3200  p_file = fopen(file_path.c_str(), "rb");
3201  }
3202  if (!p_file) {
3203  throw std::runtime_error("failed to open file '" + file_path +
3204  "': " + strerror(errno));
3205  }
3206 
3207  // somehow clang does not support ext/stdio_filebuf.h, so
3208  // need to diy readline with customized copy_params.line_delim...
3209  std::string line;
3210  line.reserve(1 * 1024 * 1024);
3211  auto end_time = std::chrono::steady_clock::now() +
3212  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3213  try {
3214  while (!feof(p_file)) {
3215  int c;
3216  size_t n = 0;
3217  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3218  if (n++ >= line.capacity()) {
3219  break;
3220  }
3221  line += c;
3222  }
3223  if (0 == n) {
3224  break;
3225  }
3226  // remember the first line, which is possibly a header line, to
3227  // ignore identical header line(s) in 2nd+ files of a archive;
3228  // otherwise, 2nd+ header may be mistaken as an all-string row
3229  // and so be final column types.
3230  if (line1.empty()) {
3231  line1 = line;
3232  } else if (line == line1) {
3233  line.clear();
3234  continue;
3235  }
3236 
3237  raw_data += line;
3239  line.clear();
3241  if (std::chrono::steady_clock::now() > end_time) {
3242  if (import_status_.rows_completed > 10000) {
3243  break;
3244  }
3245  }
3246  }
3247  } catch (std::exception& e) {
3248  }
3249 
3250  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3251  import_status_.load_failed = true;
3252 
3253  fclose(p_file);
3254  p_file = nullptr;
3255  return import_status_;
3256 }
3257 
3259  // this becomes analogous to Importer::import()
3260  (void)DataStreamSink::archivePlumber(nullptr);
3261 }
3262 
3264  if (copy_params.delimiter == '\0') {
3265  copy_params.delimiter = ',';
3266  if (boost::filesystem::extension(file_path) == ".tsv") {
3267  copy_params.delimiter = '\t';
3268  }
3269  }
3270 }
3271 
3273  const char* buf = raw_data.c_str();
3274  const char* buf_end = buf + raw_data.size();
3275  bool try_single_thread = false;
3276  for (const char* p = buf; p < buf_end; p++) {
3277  std::vector<std::string> row;
3278  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3280  buf_end,
3281  buf_end,
3282  copy_params,
3283  nullptr,
3284  row,
3285  tmp_buffers,
3286  try_single_thread,
3287  true);
3288  raw_rows.push_back(row);
3289  if (try_single_thread) {
3290  break;
3291  }
3292  }
3293  if (try_single_thread) {
3294  copy_params.threads = 1;
3295  raw_rows.clear();
3296  for (const char* p = buf; p < buf_end; p++) {
3297  std::vector<std::string> row;
3298  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3300  buf_end,
3301  buf_end,
3302  copy_params,
3303  nullptr,
3304  row,
3305  tmp_buffers,
3306  try_single_thread,
3307  true);
3308  raw_rows.push_back(row);
3309  }
3310  }
3311 }
3312 
3313 template <class T>
3314 bool try_cast(const std::string& str) {
3315  try {
3316  boost::lexical_cast<T>(str);
3317  } catch (const boost::bad_lexical_cast& e) {
3318  return false;
3319  }
3320  return true;
3321 }
3322 
3323 SQLTypes Detector::detect_sqltype(const std::string& str) {
3324  SQLTypes type = kTEXT;
3325  if (try_cast<double>(str)) {
3326  type = kDOUBLE;
3327  /*if (try_cast<bool>(str)) {
3328  type = kBOOLEAN;
3329  }*/
3330  if (try_cast<int16_t>(str)) {
3331  type = kSMALLINT;
3332  } else if (try_cast<int32_t>(str)) {
3333  type = kINT;
3334  } else if (try_cast<int64_t>(str)) {
3335  type = kBIGINT;
3336  } else if (try_cast<float>(str)) {
3337  type = kFLOAT;
3338  }
3339  }
3340 
3341  // check for geo types
3342  if (type == kTEXT) {
3343  // convert to upper case
3344  std::string str_upper_case = str;
3345  std::transform(
3346  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3347 
3348  // then test for leading words
3349  if (str_upper_case.find("POINT") == 0) {
3350  type = kPOINT;
3351  } else if (str_upper_case.find("LINESTRING") == 0) {
3352  type = kLINESTRING;
3353  } else if (str_upper_case.find("POLYGON") == 0) {
3355  type = kMULTIPOLYGON;
3356  } else {
3357  type = kPOLYGON;
3358  }
3359  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3360  type = kMULTIPOLYGON;
3361  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3362  std::string::npos &&
3363  (str_upper_case.size() % 2) == 0) {
3364  // simple hex blob (two characters per byte, not uu-encode or base64)
3365  if (str_upper_case.size() >= 10) {
3366  // match WKB blobs for supported geometry types
3367  // the first byte specifies if the data is big-endian or little-endian
3368  // the next four bytes are the geometry type (1 = POINT etc.)
3369  // @TODO support eWKB, which has extra bits set in the geometry type
3370  auto first_five_bytes = str_upper_case.substr(0, 10);
3371  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3372  type = kPOINT;
3373  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3374  type = kLINESTRING;
3375  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3376  type = kPOLYGON;
3377  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3378  type = kMULTIPOLYGON;
3379  } else {
3380  // unsupported WKB type
3381  return type;
3382  }
3383  } else {
3384  // too short to be WKB
3385  return type;
3386  }
3387  }
3388  }
3389 
3390  // check for time types
3391  if (type == kTEXT) {
3392  // This won't match unix timestamp, since floats and ints were checked above.
3393  if (dateTimeParseOptional<kTIME>(str, 0)) {
3394  type = kTIME;
3395  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3396  type = kTIMESTAMP;
3397  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3398  type = kDATE;
3399  }
3400  }
3401 
3402  return type;
3403 }
3404 
3405 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3406  std::vector<SQLTypes> types(row.size());
3407  for (size_t i = 0; i < row.size(); i++) {
3408  types[i] = detect_sqltype(row[i]);
3409  }
3410  return types;
3411 }
3412 
3414  static std::array<int, kSQLTYPE_LAST> typeorder;
3415  typeorder[kCHAR] = 0;
3416  typeorder[kBOOLEAN] = 2;
3417  typeorder[kSMALLINT] = 3;
3418  typeorder[kINT] = 4;
3419  typeorder[kBIGINT] = 5;
3420  typeorder[kFLOAT] = 6;
3421  typeorder[kDOUBLE] = 7;
3422  typeorder[kTIMESTAMP] = 8;
3423  typeorder[kTIME] = 9;
3424  typeorder[kDATE] = 10;
3425  typeorder[kPOINT] = 11;
3426  typeorder[kLINESTRING] = 11;
3427  typeorder[kPOLYGON] = 11;
3428  typeorder[kMULTIPOLYGON] = 11;
3429  typeorder[kTEXT] = 12;
3430 
3431  // note: b < a instead of a < b because the map is ordered most to least restrictive
3432  return typeorder[b] < typeorder[a];
3433 }
3434 
3437  best_encodings =
3438  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3439  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3440  switch (copy_params.has_header) {
3442  has_headers = detect_headers(head_types, best_sqltypes);
3443  if (has_headers) {
3445  } else {
3447  }
3448  break;
3450  has_headers = false;
3451  break;
3453  has_headers = true;
3454  break;
3455  }
3456 }
3457 
3460 }
3461 
3462 std::vector<SQLTypes> Detector::find_best_sqltypes(
3463  const std::vector<std::vector<std::string>>& raw_rows,
3464  const CopyParams& copy_params) {
3465  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3466 }
3467 
3468 std::vector<SQLTypes> Detector::find_best_sqltypes(
3469  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3470  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3471  const CopyParams& copy_params) {
3472  if (raw_rows.size() < 1) {
3473  throw std::runtime_error("No rows found in: " +
3474  boost::filesystem::basename(file_path));
3475  }
3476  auto end_time = std::chrono::steady_clock::now() + timeout;
3477  size_t num_cols = raw_rows.front().size();
3478  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3479  std::vector<size_t> non_null_col_counts(num_cols, 0);
3480  for (auto row = row_begin; row != row_end; row++) {
3481  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3482  best_types.push_back(kCHAR);
3483  non_null_col_counts.push_back(0);
3484  }
3485  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3486  // do not count nulls
3487  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3488  continue;
3489  }
3490  SQLTypes t = detect_sqltype(row->at(col_idx));
3491  non_null_col_counts[col_idx]++;
3492  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3493  best_types[col_idx] = t;
3494  }
3495  }
3496  if (std::chrono::steady_clock::now() > end_time) {
3497  break;
3498  }
3499  }
3500  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3501  // if we don't have any non-null values for this column make it text to be
3502  // safe b/c that is least restrictive type
3503  if (non_null_col_counts[col_idx] == 0) {
3504  best_types[col_idx] = kTEXT;
3505  }
3506  }
3507 
3508  return best_types;
3509 }
3510 
3511 std::vector<EncodingType> Detector::find_best_encodings(
3512  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3513  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3514  const std::vector<SQLTypes>& best_types) {
3515  if (raw_rows.size() < 1) {
3516  throw std::runtime_error("No rows found in: " +
3517  boost::filesystem::basename(file_path));
3518  }
3519  size_t num_cols = best_types.size();
3520  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3521  std::vector<size_t> num_rows_per_col(num_cols, 1);
3522  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3523  for (auto row = row_begin; row != row_end; row++) {
3524  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3525  if (IS_STRING(best_types[col_idx])) {
3526  count_set[col_idx].insert(row->at(col_idx));
3527  num_rows_per_col[col_idx]++;
3528  }
3529  }
3530  }
3531  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3532  if (IS_STRING(best_types[col_idx])) {
3533  float uniqueRatio =
3534  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3535  if (uniqueRatio < 0.75) {
3536  best_encodes[col_idx] = kENCODING_DICT;
3537  }
3538  }
3539  }
3540  return best_encodes;
3541 }
3542 
3543 // detect_headers returns true if:
3544 // - all elements of the first argument are kTEXT
3545 // - there is at least one instance where tail_types is more restrictive than head_types
3546 // (ie, not kTEXT)
3547 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3548  const std::vector<SQLTypes>& tail_types) {
3549  if (head_types.size() != tail_types.size()) {
3550  return false;
3551  }
3552  bool has_headers = false;
3553  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3554  if (head_types[col_idx] != kTEXT) {
3555  return false;
3556  }
3557  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3558  }
3559  return has_headers;
3560 }
3561 
3562 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3563  n = std::min(n, raw_rows.size());
3564  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3565  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3566  raw_rows.begin() + n);
3567  return sample_rows;
3568 }
3569 
3570 std::vector<std::string> Detector::get_headers() {
3571  std::vector<std::string> headers(best_sqltypes.size());
3572  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3573  if (has_headers && i < raw_rows[0].size()) {
3574  headers[i] = raw_rows[0][i];
3575  } else {
3576  headers[i] = "column_" + std::to_string(i + 1);
3577  }
3578  }
3579  return headers;
3580 }
3581 
3582 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3583  size_t row_count,
3584  const Catalog_Namespace::SessionInfo* session_info) {
3585  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3586  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3587  import_status_.load_failed = true;
3588  import_status_.load_msg = loader->getErrorMessage();
3589  }
3590 }
3591 
3593  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3594  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3595  mapd_lock_guard<mapd_shared_mutex> read_lock(import_mutex_);
3597  // rollback to starting epoch - undo all the added records
3598  loader->setTableEpochs(table_epochs);
3599  } else {
3600  loader->checkpoint();
3601  }
3602  }
3603 
3604  if (loader->getTableDesc()->persistenceLevel ==
3605  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3606  // tables
3607  auto ms = measure<>::execution([&]() {
3608  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3609  if (!import_status_.load_failed) {
3610  for (auto& p : import_buffers_vec[0]) {
3611  if (!p->stringDictCheckpoint()) {
3612  LOG(ERROR) << "Checkpointing Dictionary for Column "
3613  << p->getColumnDesc()->columnName << " failed.";
3614  import_status_.load_failed = true;
3615  import_status_.load_msg = "Dictionary checkpoint failed";
3616  break;
3617  }
3618  }
3619  }
3620  });
3621  if (DEBUG_TIMING) {
3622  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3623  << std::endl;
3624  }
3625  }
3626 }
3627 
3629  const Catalog_Namespace::SessionInfo* session_info) {
3630  // in generalized importing scheme, reaching here file_path may
3631  // contain a file path, a url or a wildcard of file paths.
3632  // see CopyTableStmt::execute.
3633 
3634  std::vector<std::string> file_paths;
3635  try {
3636  auto file_paths_set = shared::glob_recursive_and_filter_local_files(
3638  file_paths = std::vector<std::string>(file_paths_set.begin(), file_paths_set.end());
3639  } catch (const shared::FileNotFoundException& e) {
3640  // After finding no matching files locally, file_path may still be an s3 url
3641  file_paths.push_back(file_path);
3642  }
3643 
3644  // sum up sizes of all local files -- only for local files. if
3645  // file_path is a s3 url, sizes will be obtained via S3Archive.
3646  for (const auto& file_path : file_paths) {
3648  }
3649 
3650 #ifdef ENABLE_IMPORT_PARQUET
3651  // s3 parquet goes different route because the files do not use libarchive
3652  // but parquet api, and they need to landed like .7z files.
3653  //
3654  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3655  // because for example spark sql users may specify a output url w/o file
3656  // extension like this:
3657  // df.write
3658  // .mode("overwrite")
3659  // .parquet("s3://bucket/folder/parquet/mydata")
3660  // without the parameter, it means plain or compressed csv files.
3661  // note: .ORC and AVRO files should follow a similar path to Parquet?
3662  if (copy_params.file_type == FileType::PARQUET) {
3663  import_parquet(file_paths, session_info);
3664  } else
3665 #endif
3666  {
3667  import_compressed(file_paths, session_info);
3668  }
3669 
3670  return import_status_;
3671 }
3672 
3673 #ifdef ENABLE_IMPORT_PARQUET
3674 inline auto open_parquet_table(const std::string& file_path,
3675  std::shared_ptr<arrow::io::ReadableFile>& infile,
3676  std::unique_ptr<parquet::arrow::FileReader>& reader,
3677  std::shared_ptr<arrow::Table>& table) {
3678  using namespace parquet::arrow;
3679  auto file_result = arrow::io::ReadableFile::Open(file_path);
3680  PARQUET_THROW_NOT_OK(file_result.status());
3681  infile = file_result.ValueOrDie();
3682 
3683  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3684  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3685  const auto num_row_groups = reader->num_row_groups();
3686  const auto num_columns = table->num_columns();
3687  const auto num_rows = table->num_rows();
3688  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3689  << " columns in " << num_row_groups << " groups.";
3690  return std::make_tuple(num_row_groups, num_columns, num_rows);
3691 }
3692 
3693 void Detector::import_local_parquet(const std::string& file_path,
3694  const Catalog_Namespace::SessionInfo* session_info) {
3695  /*Skip interrupt checking in detector*/
3696  std::shared_ptr<arrow::io::ReadableFile> infile;
3697  std::unique_ptr<parquet::arrow::FileReader> reader;
3698  std::shared_ptr<arrow::Table> table;
3699  int num_row_groups, num_columns;
3700  int64_t num_rows;
3701  std::tie(num_row_groups, num_columns, num_rows) =
3702  open_parquet_table(file_path, infile, reader, table);
3703  // make up header line if not yet
3704  if (0 == raw_data.size()) {
3706  copy_params.line_delim = '\n';
3707  copy_params.delimiter = ',';
3708  // must quote values to skip any embedded delimiter
3709  copy_params.quoted = true;
3710  copy_params.quote = '"';
3711  copy_params.escape = '"';
3712  for (int c = 0; c < num_columns; ++c) {
3713  if (c) {
3715  }
3716  raw_data += table->ColumnNames().at(c);
3717  }
3719  }
3720  // make up raw data... rowwize...
3721  const ColumnDescriptor cd;
3722  for (int g = 0; g < num_row_groups; ++g) {
3723  // data is columnwise
3724  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3725  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3726  arrays.resize(num_columns);
3727  for (int c = 0; c < num_columns; ++c) {
3728  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3729  for (auto chunk : arrays[c]->chunks()) {
3730  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3731  }
3732  }
3733  for (int r = 0; r < num_rows; ++r) {
3734  for (int c = 0; c < num_columns; ++c) {
3735  std::vector<std::string> buffer;
3736  for (auto chunk : arrays[c]->chunks()) {
3737  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3738  if (c) {
3740  }
3741  if (!chunk->IsNull(r)) {
3743  raw_data += boost::replace_all_copy(
3744  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3746  }
3747  }
3748  }
3750  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3751  if (++import_status_.rows_completed >= 10000) {
3752  // as if load truncated
3753  import_status_.load_failed = true;
3754  import_status_.load_msg = "Detector processed 10000 records";
3755  return;
3756  }
3757  }
3758  }
3759 }
3760 
3761 template <typename DATA_TYPE>
3763  std::vector<DATA_TYPE>& buffer,
3764  import_export::BadRowsTracker* const bad_rows_tracker) {
3765  const auto old_size = buffer.size();
3766  // erase backward to minimize memory movement overhead
3767  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3768  ++rit) {
3769  buffer.erase(buffer.begin() + *rit);
3770  }
3771  return std::make_tuple(old_size, buffer.size());
3772 }
3773 
3775  BadRowsTracker* const bad_rows_tracker) {
3776  switch (type) {
3777  case kBOOLEAN:
3778  return del_values(*bool_buffer_, bad_rows_tracker);
3779  case kTINYINT:
3780  return del_values(*tinyint_buffer_, bad_rows_tracker);
3781  case kSMALLINT:
3782  return del_values(*smallint_buffer_, bad_rows_tracker);
3783  case kINT:
3784  return del_values(*int_buffer_, bad_rows_tracker);
3785  case kBIGINT:
3786  case kNUMERIC:
3787  case kDECIMAL:
3788  case kTIME:
3789  case kTIMESTAMP:
3790  case kDATE:
3791  return del_values(*bigint_buffer_, bad_rows_tracker);
3792  case kFLOAT:
3793  return del_values(*float_buffer_, bad_rows_tracker);
3794  case kDOUBLE:
3795  return del_values(*double_buffer_, bad_rows_tracker);
3796  case kTEXT:
3797  case kVARCHAR:
3798  case kCHAR:
3799  return del_values(*string_buffer_, bad_rows_tracker);
3800  case kPOINT:
3801  case kLINESTRING:
3802  case kPOLYGON:
3803  case kMULTIPOLYGON:
3804  return del_values(*geo_string_buffer_, bad_rows_tracker);
3805  case kARRAY:
3806  return del_values(*array_buffer_, bad_rows_tracker);
3807  default:
3808  throw std::runtime_error("Invalid Type");
3809  }
3810 }
3811 
3812 void Importer::import_local_parquet(const std::string& file_path,
3813  const Catalog_Namespace::SessionInfo* session_info) {
3814  std::shared_ptr<arrow::io::ReadableFile> infile;
3815  std::unique_ptr<parquet::arrow::FileReader> reader;
3816  std::shared_ptr<arrow::Table> table;
3817  int num_row_groups, num_columns;
3818  int64_t nrow_in_file;
3819  std::tie(num_row_groups, num_columns, nrow_in_file) =
3820  open_parquet_table(file_path, infile, reader, table);
3821  // column_list has no $deleted
3822  const auto& column_list = get_column_descs();
3823  // for now geo columns expect a wkt or wkb hex string
3824  std::vector<const ColumnDescriptor*> cds;
3825  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3826  int num_physical_cols = 0;
3827  for (auto& cd : column_list) {
3828  cds.push_back(cd);
3829  num_physical_cols += cd->columnType.get_physical_cols();
3830  }
3831  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3832  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3833  std::to_string(num_columns) + " columns in file vs " +
3834  std::to_string(column_list.size() - num_physical_cols) +
3835  " columns in table.");
3836  // slice each group to import slower columns faster, eg. geo or string
3839  : std::min(static_cast<size_t>(cpu_threads()), g_max_import_threads);
3840  VLOG(1) << "Parquet import # threads: " << max_threads;
3841  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3842  // init row estimate for this file
3843  const auto filesize = get_filesize(file_path);
3844  size_t nrow_completed{0};
3845  file_offsets.push_back(0);
3846  // map logic column index to physical column index
3847  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3848  int physical_col_idx = 0;
3849  for (int i = 0; i < logic_col_idx; ++i) {
3850  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3851  }
3852  return physical_col_idx;
3853  };
3854  // load a file = nested iteration of row groups, row slices and logical columns
3855  auto query_session = session_info ? session_info->get_session_id() : "";
3856  auto ms_load_a_file = measure<>::execution([&]() {
3857  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3858  {
3859  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
3861  break;
3862  }
3863  }
3864  // a sliced row group will be handled like a (logic) parquet file, with
3865  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3866  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3867  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3868  import_status_.load_failed = true;
3869  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3870  }
3871  import_buffers_vec.resize(num_slices);
3872  for (int slice = 0; slice < num_slices; slice++) {
3873  import_buffers_vec[slice].clear();
3874  for (const auto cd : cds) {
3875  import_buffers_vec[slice].emplace_back(
3876  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3877  }
3878  }
3879  /*
3880  * A caveat here is: Parquet files or arrow data is imported column wise.
3881  * Unlike importing row-wise csv files, a error on any row of any column
3882  * forces to give up entire row group of all columns, unless there is a
3883  * sophisticated method to trace erroneous rows in individual columns so
3884  * that we can union bad rows and drop them from corresponding
3885  * import_buffers_vec; otherwise, we may exceed maximum number of
3886  * truncated rows easily even with very sparse errors in the files.
3887  */
3888  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3889  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3890  auto& bad_rows_tracker = bad_rows_trackers[slice];
3891  bad_rows_tracker.file_name = file_path;
3892  bad_rows_tracker.row_group = slice;
3893  bad_rows_tracker.importer = this;
3894  }
3895  // process arrow arrays to import buffers
3896  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3897  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3898  const auto cd = cds[physical_col_idx];
3899  std::shared_ptr<arrow::ChunkedArray> array;
3900  PARQUET_THROW_NOT_OK(
3901  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3902  const size_t array_size = array->length();
3903  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3904  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3905  for (int slice = 0; slice < num_slices; ++slice) {
3906  if (UNLIKELY(check_session_interrupted(query_session, executor))) {
3907  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3908  import_status_.load_failed = true;
3909  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3910  }
3911  thread_controller.startThread([&, slice] {
3912  const auto slice_offset = slice % num_slices;
3913  ArraySliceRange slice_range(
3914  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3915  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3916  auto& bad_rows_tracker = bad_rows_trackers[slice];
3917  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3918  import_buffer->import_buffers = &import_buffers_vec[slice];
3919  import_buffer->col_idx = physical_col_idx + 1;
3920  for (auto chunk : array->chunks()) {
3921  import_buffer->add_arrow_values(
3922  cd, *chunk, false, slice_range, &bad_rows_tracker);
3923  }
3924  });
3925  }
3926  thread_controller.finish();
3927  }
3928  std::vector<size_t> nrow_in_slice_raw(num_slices);
3929  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3930  // trim bad rows from import buffers
3931  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3932  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3933  const auto cd = cds[physical_col_idx];
3934  for (int slice = 0; slice < num_slices; ++slice) {
3935  auto& bad_rows_tracker = bad_rows_trackers[slice];
3936  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3937  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3938  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3939  }
3940  }
3941  // flush slices of this row group to chunks
3942  for (int slice = 0; slice < num_slices; ++slice) {
3943  load(import_buffers_vec[slice],
3944  nrow_in_slice_successfully_loaded[slice],
3945  session_info);
3946  }
3947  // update import stats
3948  const auto nrow_original =
3949  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3950  const auto nrow_imported =
3951  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3952  nrow_in_slice_successfully_loaded.end(),
3953  0);
3954  const auto nrow_dropped = nrow_original - nrow_imported;
3955  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3956  << " rows, drop " << nrow_dropped << " rows.";
3957  {
3958  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3959  import_status_.rows_completed += nrow_imported;
3960  import_status_.rows_rejected += nrow_dropped;
3962  import_status_.load_failed = true;
3964  ") rows rejected exceeded. Halting load.";
3965  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3966  << ") rows rejected exceeded. Halting load.";
3967  }
3968  }
3969  // row estimate
3970  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3971  nrow_completed += nrow_imported;
3972  file_offsets.back() =
3973  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3974  // sum up current total file offsets
3975  const auto total_file_offset =
3976  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3977  // estimate number of rows per current total file offset
3978  if (total_file_offset) {
3980  (float)total_file_size / total_file_offset * import_status_.rows_completed;
3981  VLOG(3) << "rows_completed " << import_status_.rows_completed
3982  << ", rows_estimated " << import_status_.rows_estimated
3983  << ", total_file_size " << total_file_size << ", total_file_offset "
3984  << total_file_offset;
3985  }
3986  }
3987  });
3988  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3989  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3990 }
3991 
3992 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
3993  const Catalog_Namespace::SessionInfo* session_info) {
3994  auto importer = dynamic_cast<Importer*>(this);
3995  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
3996  : std::vector<Catalog_Namespace::TableEpochInfo>{};
3997  try {
3998  std::exception_ptr teptr;
3999  // file_paths may contain one local file path, a list of local file paths
4000  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
4001  for (auto const& file_path : file_paths) {
4002  std::map<int, std::string> url_parts;
4003  Archive::parse_url(file_path, url_parts);
4004 
4005  // for a s3 url we need to know the obj keys that it comprises
4006  std::vector<std::string> objkeys;
4007  std::unique_ptr<S3ParquetArchive> us3arch;
4008  if ("s3" == url_parts[2]) {
4009 #ifdef HAVE_AWS_S3
4010  us3arch.reset(new S3ParquetArchive(file_path,
4018  us3arch->init_for_read();
4019  total_file_size += us3arch->get_total_file_size();
4020  objkeys = us3arch->get_objkeys();
4021 #else
4022  throw std::runtime_error("AWS S3 support not available");
4023 #endif // HAVE_AWS_S3
4024  } else {
4025  objkeys.emplace_back(file_path);
4026  }
4027 
4028  // for each obj key of a s3 url we need to land it before
4029  // importing it like doing with a 'local file'.
4030  for (auto const& objkey : objkeys) {
4031  try {
4032  auto file_path =
4033  us3arch
4034  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
4035  : objkey;
4036  import_local_parquet(file_path, session_info);
4037  if (us3arch) {
4038  us3arch->vacuum(objkey);
4039  }
4040  } catch (...) {
4041  if (us3arch) {
4042  us3arch->vacuum(objkey);
4043  }
4044  throw;
4045  }
4046  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4048  break;
4049  }
4050  }
4051  }
4052  // rethrow any exception happened herebefore
4053  if (teptr) {
4054  std::rethrow_exception(teptr);
4055  }
4056  } catch (const shared::NoRegexFilterMatchException& e) {
4057  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4058  import_status_.load_failed = true;
4059  import_status_.load_msg = e.what();
4060  throw e;
4061  } catch (const std::exception& e) {
4062  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4063  import_status_.load_failed = true;
4064  import_status_.load_msg = e.what();
4065  }
4066 
4067  if (importer) {
4068  importer->checkpoint(table_epochs);
4069  }
4070 }
4071 #endif // ENABLE_IMPORT_PARQUET
4072 
4074  std::vector<std::string>& file_paths,
4075  const Catalog_Namespace::SessionInfo* session_info) {
4076  // a new requirement is to have one single input stream into
4077  // Importer::importDelimited, so need to move pipe related
4078  // stuff to the outmost block.
4079  int fd[2];
4080 #ifdef _WIN32
4081  // For some reason when folly is used to create the pipe, reader can
4082  // read nothing.
4083  auto pipe_res =
4084  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4085 #else
4086  auto pipe_res = pipe(fd);
4087 #endif
4088  if (pipe_res < 0) {
4089  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4090  }
4091 #ifndef _WIN32
4092  signal(SIGPIPE, SIG_IGN);
4093 #endif
4094 
4095  std::exception_ptr teptr;
4096  // create a thread to read uncompressed byte stream out of pipe and
4097  // feed into importDelimited()
4098  ImportStatus ret1;
4099  auto th_pipe_reader = std::thread([&]() {
4100  try {
4101  // importDelimited will read from FILE* p_file
4102  if (0 == (p_file = fdopen(fd[0], "r"))) {
4103  throw std::runtime_error(std::string("failed to open a pipe: ") +
4104  strerror(errno));
4105  }
4106 
4107  // in future, depending on data types of this uncompressed stream
4108  // it can be feed into other function such like importParquet, etc
4109  ret1 = importDelimited(file_path, true, session_info);
4110 
4111  } catch (...) {
4112  if (!teptr) { // no replace
4113  teptr = std::current_exception();
4114  }
4115  }
4116 
4117  if (p_file) {
4118  fclose(p_file);
4119  }
4120  p_file = 0;
4121  });
4122 
4123  // create a thread to iterate all files (in all archives) and
4124  // forward the uncompressed byte stream to fd[1] which is
4125  // then feed into importDelimited, importParquet, and etc.
4126  auto th_pipe_writer = std::thread([&]() {
4127  std::unique_ptr<S3Archive> us3arch;
4128  bool stop = false;
4129  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4130  try {
4131  auto file_path = file_paths[fi];
4132  std::unique_ptr<Archive> uarch;
4133  std::map<int, std::string> url_parts;
4134  Archive::parse_url(file_path, url_parts);
4135  const std::string S3_objkey_url_scheme = "s3ok";
4136  if ("file" == url_parts[2] || "" == url_parts[2]) {
4137  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4138  } else if ("s3" == url_parts[2]) {
4139 #ifdef HAVE_AWS_S3
4140  // new a S3Archive with a shared s3client.
4141  // should be safe b/c no wildcard with s3 url
4142  us3arch.reset(new S3Archive(file_path,
4150  us3arch->init_for_read();
4151  total_file_size += us3arch->get_total_file_size();
4152  // not land all files here but one by one in following iterations
4153  for (const auto& objkey : us3arch->get_objkeys()) {
4154  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4155  }
4156  continue;
4157 #else
4158  throw std::runtime_error("AWS S3 support not available");
4159 #endif // HAVE_AWS_S3
4160  } else if (S3_objkey_url_scheme == url_parts[2]) {
4161 #ifdef HAVE_AWS_S3
4162  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4163  auto file_path =
4164  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4165  if (0 == file_path.size()) {
4166  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4167  }
4168  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4169  // file not removed until file closed
4170  us3arch->vacuum(objkey);
4171 #else
4172  throw std::runtime_error("AWS S3 support not available");
4173 #endif // HAVE_AWS_S3
4174  }
4175 #if 0 // TODO(ppan): implement and enable any other archive class
4176  else
4177  if ("hdfs" == url_parts[2])
4178  uarch.reset(new HdfsArchive(file_path));
4179 #endif
4180  else {
4181  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4182  }
4183 
4184  // init the archive for read
4185  auto& arch = *uarch;
4186 
4187  // coming here, the archive of url should be ready to be read, unarchived
4188  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4189  const void* buf;
4190  size_t size;
4191  bool just_saw_archive_header;
4192  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4193  bool first_text_header_skipped = false;
4194  // start reading uncompressed bytes of this archive from libarchive
4195  // note! this archive may contain more than one files!
4196  file_offsets.push_back(0);
4197  size_t num_block_read = 0;
4198  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4199  bool insert_line_delim_after_this_file = false;
4200  while (!stop) {
4201  int64_t offset{-1};
4202  auto ok = arch.read_data_block(&buf, &size, &offset);
4203  // can't use (uncompressed) size, so track (max) file offset.
4204  // also we want to capture offset even on e.o.f.
4205  if (offset > 0) {
4206  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4207  file_offsets.back() = offset;
4208  }
4209  if (!ok) {
4210  break;
4211  }
4212  // one subtle point here is now we concatenate all files
4213  // to a single FILE stream with which we call importDelimited
4214  // only once. this would make it misunderstand that only one
4215  // header line is with this 'single' stream, while actually
4216  // we may have one header line for each of the files.
4217  // so we need to skip header lines here instead in importDelimited.
4218  const char* buf2 = (const char*)buf;
4219  int size2 = size;
4221  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4222  while (size2-- > 0) {
4223  if (*buf2++ == copy_params.line_delim) {
4224  break;
4225  }
4226  }
4227  if (size2 <= 0) {
4228  LOG(WARNING) << "No line delimiter in block." << std::endl;
4229  } else {
4230  just_saw_archive_header = false;
4231  first_text_header_skipped = true;
4232  }
4233  }
4234  // In very rare occasions the write pipe somehow operates in a mode similar
4235  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4236  // means blocking mode. On such a unreliable blocking mode, a possible fix
4237  // is to loop reading till no bytes left, otherwise the annoying `failed to
4238  // write pipe: Success`...
4239  if (size2 > 0) {
4240  int nremaining = size2;
4241  while (nremaining > 0) {
4242  // try to write the entire remainder of the buffer to the pipe
4243  int nwritten = write(fd[1], buf2, nremaining);
4244  // how did we do?
4245  if (nwritten < 0) {
4246  // something bad happened
4247  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4248  // ignore these, assume nothing written, try again
4249  nwritten = 0;
4250  } else {
4251  // a real error
4252  throw std::runtime_error(
4253  std::string("failed or interrupted write to pipe: ") +
4254  strerror(errno));
4255  }
4256  } else if (nwritten == nremaining) {
4257  // we wrote everything; we're done
4258  break;
4259  }
4260  // only wrote some (or nothing), try again
4261  nremaining -= nwritten;
4262  buf2 += nwritten;
4263  // no exception when too many rejected
4264  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4266  stop = true;
4267  break;
4268  }
4269  }
4270  // check that this file (buf for size) ended with a line delim
4271  if (size > 0) {
4272  const char* plast = static_cast<const char*>(buf) + (size - 1);
4273  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4274  }
4275  }
4276  ++num_block_read;
4277  }
4278 
4279  // if that file didn't end with a line delim, we insert one here to terminate
4280  // that file's stream use a loop for the same reason as above
4281  if (insert_line_delim_after_this_file) {
4282  while (true) {
4283  // write the delim char to the pipe
4284  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4285  // how did we do?
4286  if (nwritten < 0) {
4287  // something bad happened
4288  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4289  // ignore these, assume nothing written, try again
4290  nwritten = 0;
4291  } else {
4292  // a real error
4293  throw std::runtime_error(
4294  std::string("failed or interrupted write to pipe: ") +
4295  strerror(errno));
4296  }
4297  } else if (nwritten == 1) {
4298  // we wrote it; we're done
4299  break;
4300  }
4301  }
4302  }
4303  }
4304  } catch (...) {
4305  // when import is aborted because too many data errors or because end of a
4306  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4307  // suppressed.
4308  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4310  break;
4311  }
4312  if (import_status_.rows_completed > 0) {
4313  if (nullptr != dynamic_cast<Detector*>(this)) {
4314  break;
4315  }
4316  }
4317  if (!teptr) { // no replace
4318  teptr = std::current_exception();
4319  }
4320  break;
4321  }
4322  }
4323  // close writer end
4324  close(fd[1]);
4325  });
4326 
4327  th_pipe_reader.join();
4328  th_pipe_writer.join();
4329 
4330  // rethrow any exception happened herebefore
4331  if (teptr) {
4332  std::rethrow_exception(teptr);
4333  }
4334 }
4335 
4337  return DataStreamSink::archivePlumber(session_info);
4338 }
4339 
4341  const std::string& file_path,
4342  const bool decompressed,
4343  const Catalog_Namespace::SessionInfo* session_info) {
4345  auto query_session = session_info ? session_info->get_session_id() : "";
4346 
4347  if (!p_file) {
4348  p_file = fopen(file_path.c_str(), "rb");
4349  }
4350  if (!p_file) {
4351  throw std::runtime_error("failed to open file '" + file_path +
4352  "': " + strerror(errno));
4353  }
4354 
4355  if (!decompressed) {
4356  (void)fseek(p_file, 0, SEEK_END);
4357  file_size = ftell(p_file);
4358  }
4359 
4360  if (copy_params.threads == 0) {
4361  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4363  } else {
4364  max_threads = static_cast<size_t>(copy_params.threads);
4365  }
4366  VLOG(1) << "Delimited import # threads: " << max_threads;
4367 
4368  // deal with small files
4369  size_t alloc_size = copy_params.buffer_size;
4370  if (!decompressed && file_size < alloc_size) {
4371  alloc_size = file_size;
4372  }
4373 
4374  for (size_t i = 0; i < max_threads; i++) {
4375  import_buffers_vec.emplace_back();
4376  for (const auto cd : loader->get_column_descs()) {
4377  import_buffers_vec[i].emplace_back(
4378  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4379  }
4380  }
4381 
4382  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4383  size_t current_pos = 0;
4384  size_t end_pos;
4385  size_t begin_pos = 0;
4386 
4387  (void)fseek(p_file, current_pos, SEEK_SET);
4388  size_t size =
4389  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4390 
4391  // make render group analyzers for each poly column
4392  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4394  auto& cat = loader->getCatalog();
4395  auto* td = loader->getTableDesc();
4396  CHECK(td);
4397  auto column_descriptors =
4398  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4399  for (auto const& cd : column_descriptors) {
4400  if (IS_GEO_POLY(cd->columnType.get_type())) {
4401  auto rga = std::make_shared<RenderGroupAnalyzer>();
4402  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
4403  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4404  }
4405  }
4406  }
4407 
4408  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4409  loader->getTableDesc()->tableId};
4410  auto table_epochs = loader->getTableEpochs();
4412  {
4413  std::list<std::future<ImportStatus>> threads;
4414 
4415  // use a stack to track thread_ids which must not overlap among threads
4416  // because thread_id is used to index import_buffers_vec[]
4417  std::stack<size_t> stack_thread_ids;
4418  for (size_t i = 0; i < max_threads; i++) {
4419  stack_thread_ids.push(i);
4420  }
4421  // added for true row index on error
4422  size_t first_row_index_this_buffer = 0;
4423 
4424  while (size > 0) {
4425  unsigned int num_rows_this_buffer = 0;
4426  CHECK(scratch_buffer);
4427  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4428  scratch_buffer,
4429  size,
4430  copy_params,
4431  first_row_index_this_buffer,
4432  num_rows_this_buffer,
4433  p_file);
4434 
4435  // unput residual
4436  int nresidual = size - end_pos;
4437  std::unique_ptr<char[]> unbuf;
4438  if (nresidual > 0) {
4439  unbuf = std::make_unique<char[]>(nresidual);
4440  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4441  }
4442 
4443  // get a thread_id not in use
4444  auto thread_id = stack_thread_ids.top();
4445  stack_thread_ids.pop();
4446  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4447 
4448  threads.push_back(std::async(std::launch::async,
4450  thread_id,
4451  this,
4452  std::move(scratch_buffer),
4453  begin_pos,
4454  end_pos,
4455  end_pos,
4456  columnIdToRenderGroupAnalyzerMap,
4457  first_row_index_this_buffer,
4458  session_info,
4459  executor));
4460 
4461  first_row_index_this_buffer += num_rows_this_buffer;
4462 
4463  current_pos += end_pos;
4464  scratch_buffer = std::make_unique<char[]>(alloc_size);
4465  CHECK(scratch_buffer);
4466  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4467  size = nresidual +
4468  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4469 
4470  begin_pos = 0;
4471  while (threads.size() > 0) {
4472  int nready = 0;
4473  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4474  it != threads.end();) {
4475  auto& p = *it;
4476  std::chrono::milliseconds span(0);
4477  if (p.wait_for(span) == std::future_status::ready) {
4478  auto ret_import_status = p.get();
4479  {
4480  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4481  import_status_ += ret_import_status;
4482  if (ret_import_status.load_failed) {
4484  }
4485  }
4486  // sum up current total file offsets
4487  size_t total_file_offset{0};
4488  if (decompressed) {
4489  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4490  for (const auto file_offset : file_offsets) {
4491  total_file_offset += file_offset;
4492  }
4493  }
4494  // estimate number of rows per current total file offset
4495  if (decompressed ? total_file_offset : current_pos) {
4497  (decompressed ? (float)total_file_size / total_file_offset
4498  : (float)file_size / current_pos) *
4500  }
4501  VLOG(3) << "rows_completed " << import_status_.rows_completed
4502  << ", rows_estimated " << import_status_.rows_estimated
4503  << ", total_file_size " << total_file_size << ", total_file_offset "
4504  << total_file_offset;
4506  // recall thread_id for reuse
4507  stack_thread_ids.push(ret_import_status.thread_id);
4508  threads.erase(it++);
4509  ++nready;
4510  } else {
4511  ++it;
4512  }
4513  }
4514 
4515  if (nready == 0) {
4516  std::this_thread::yield();
4517  }
4518 
4519  // on eof, wait all threads to finish
4520  if (0 == size) {
4521  continue;
4522  }
4523 
4524  // keep reading if any free thread slot
4525  // this is one of the major difference from old threading model !!
4526  if (threads.size() < max_threads) {
4527  break;
4528  }
4529  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4531  break;
4532  }
4533  }
4534  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
4536  import_status_.load_failed = true;
4537  // todo use better message
4538  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4539  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4540  break;
4541  }
4543  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4544  break;
4545  }
4546  }
4547 
4548  // join dangling threads in case of LOG(ERROR) above
4549  for (auto& p : threads) {
4550  p.wait();
4551  }
4552  }
4553 
4554  checkpoint(table_epochs);
4555 
4556  fclose(p_file);
4557  p_file = nullptr;
4558  return import_status_;
4559 }
4560 
4562  if (getTableDesc()->persistenceLevel ==
4563  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4564  // tables
4566  }
4567 }
4568 
4569 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4570  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4571  getTableDesc()->tableId);
4572 }
4573 
4575  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4576  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4577 }
4578 
4579 /* static */
4581  // for now we only support S3
4582  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4583  // only set if non-empty to allow GDAL defaults to persist
4584  // explicitly clear if empty to revert to default and not reuse a previous session's
4585  // keys
4586  if (copy_params.s3_region.size()) {
4587 #if DEBUG_AWS_AUTHENTICATION
4588  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4589 #endif
4590  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4591  } else {
4592 #if DEBUG_AWS_AUTHENTICATION
4593  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4594 #endif
4595  CPLSetConfigOption("AWS_REGION", nullptr);
4596  }
4597  if (copy_params.s3_endpoint.size()) {
4598 #if DEBUG_AWS_AUTHENTICATION
4599  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4600 #endif
4601  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4602  } else {
4603 #if DEBUG_AWS_AUTHENTICATION
4604  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4605 #endif
4606  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4607  }
4608  if (copy_params.s3_access_key.size()) {
4609 #if DEBUG_AWS_AUTHENTICATION
4610  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4611  << "'";
4612 #endif
4613  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4614  } else {
4615 #if DEBUG_AWS_AUTHENTICATION
4616  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4617 #endif
4618  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4619  }
4620  if (copy_params.s3_secret_key.size()) {
4621 #if DEBUG_AWS_AUTHENTICATION
4622  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4623  << "'";
4624 #endif
4625  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4626  } else {
4627 #if DEBUG_AWS_AUTHENTICATION
4628  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4629 #endif
4630  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4631  }
4632 
4633 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4634  // if we haven't set keys, we need to disable signed access
4635  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4636 #if DEBUG_AWS_AUTHENTICATION
4637  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4638 #endif
4639  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4640  } else {
4641 #if DEBUG_AWS_AUTHENTICATION
4642  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4643 #endif
4644  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4645  }
4646 #endif
4647 }
4648 
4649 /* static */
4650 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4651  const CopyParams& copy_params) {
4652  // lazy init GDAL
4654 
4655  // set authorization tokens
4656  setGDALAuthorizationTokens(copy_params);
4657 
4658  // open the file
4659  OGRDataSource* poDS;
4660 #if GDAL_VERSION_MAJOR == 1
4661  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4662 #else
4663  poDS = (OGRDataSource*)GDALOpenEx(
4664  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4665  if (poDS == nullptr) {
4666  poDS = (OGRDataSource*)GDALOpenEx(
4667  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4668  if (poDS) {
4669  LOG(INFO) << "openGDALDataset had to open as read-only";
4670  }
4671  }
4672 #endif
4673  if (poDS == nullptr) {
4674  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4675  }
4676  // NOTE(adb): If extending this function, refactor to ensure any errors will not
4677  // result in a memory leak if GDAL successfully opened the input dataset.
4678  return poDS;
4679 }
4680 
4681 namespace {
4682 
4683 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4684  const OGRDataSourceUqPtr& poDS,
4685  const std::string& file_name) {
4686  // get layer with specified name, or default to first layer
4687  OGRLayer* poLayer = nullptr;
4688  if (geo_layer_name.size()) {
4689  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4690  if (poLayer == nullptr) {
4691  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4692  file_name);
4693  }
4694  } else {
4695  poLayer = poDS->GetLayer(0);
4696  if (poLayer == nullptr) {
4697  throw std::runtime_error("No layers found in " + file_name);
4698  }
4699  }
4700  return *poLayer;
4701 }
4702 
4703 } // namespace
4704 
4705 /* static */
4707  const std::string& file_name,
4708  const std::string& geo_column_name,
4709  std::map<std::string, std::vector<std::string>>& metadata,
4710  int rowLimit,
4711  const CopyParams& copy_params) {
4712  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4713  if (poDS == nullptr) {
4714  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4715  file_name);
4716  }
4717 
4718  OGRLayer& layer =
4719  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4720 
4721  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4722  CHECK(poFDefn);
4723 
4724  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4725  auto nFeats = layer.GetFeatureCount();
4726  size_t numFeatures =
4727  std::max(static_cast<decltype(nFeats)>(0),
4728  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4729  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4730  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4731  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4732  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4733  }
4734  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4735  layer.ResetReading();
4736  size_t iFeature = 0;
4737  while (iFeature < numFeatures) {
4738  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4739  if (!poFeature) {
4740  break;
4741  }
4742 
4743  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4744  if (poGeometry != nullptr) {
4745  // validate geom type (again?)
4746  switch (wkbFlatten(poGeometry->getGeometryType())) {
4747  case wkbPoint:
4748  case wkbLineString:
4749  case wkbPolygon:
4750  case wkbMultiPolygon:
4751  break;
4752  case wkbMultiPoint:
4753  case wkbMultiLineString:
4754  // supported if geo_explode_collections is specified
4755  if (!copy_params.geo_explode_collections) {
4756  throw std::runtime_error("Unsupported geometry type: " +
4757  std::string(poGeometry->getGeometryName()));
4758  }
4759  break;
4760  default:
4761  throw std::runtime_error("Unsupported geometry type: " +
4762  std::string(poGeometry->getGeometryName()));
4763  }
4764 
4765  // populate metadata for regular fields
4766  for (auto i : metadata) {
4767  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4768  if (iField >= 0) { // geom is -1
4769  metadata[i.first].at(iFeature) =
4770  std::string(poFeature->GetFieldAsString(iField));
4771  }
4772  }
4773 
4774  // populate metadata for geo column with WKT string
4775  char* wkts = nullptr;
4776  poGeometry->exportToWkt(&wkts);
4777  CHECK(wkts);
4778  metadata[geo_column_name].at(iFeature) = wkts;
4779  CPLFree(wkts);
4780  }
4781  iFeature++;
4782  }
4783 }
4784 
4785 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4786  switch (ogr_type) {
4787  case OFTInteger:
4788  return std::make_pair(kINT, false);
4789  case OFTIntegerList:
4790  return std::make_pair(kINT, true);
4791 #if GDAL_VERSION_MAJOR > 1
4792  case OFTInteger64:
4793  return std::make_pair(kBIGINT, false);
4794  case OFTInteger64List:
4795  return std::make_pair(kBIGINT, true);
4796 #endif
4797  case OFTReal:
4798  return std::make_pair(kDOUBLE, false);
4799  case OFTRealList:
4800  return std::make_pair(kDOUBLE, true);
4801  case OFTString:
4802  return std::make_pair(kTEXT, false);
4803  case OFTStringList:
4804  return std::make_pair(kTEXT, true);
4805  case OFTDate:
4806  return std::make_pair(kDATE, false);
4807  case OFTTime:
4808  return std::make_pair(kTIME, false);
4809  case OFTDateTime:
4810  return std::make_pair(kTIMESTAMP, false);
4811  case OFTBinary:
4812  // Interpret binary blobs as byte arrays here
4813  // but actual import will store NULL as GDAL will not
4814  // extract the blob (OGRFeature::GetFieldAsString will
4815  // result in the import buffers having an empty string)
4816  return std::make_pair(kTINYINT, true);
4817  default:
4818  break;
4819  }
4820  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4821 }
4822 
4823 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4824  switch (ogr_type) {
4825  case wkbPoint:
4826  return kPOINT;
4827  case wkbLineString:
4828  return kLINESTRING;
4829  case wkbPolygon:
4830  return kPOLYGON;
4831  case wkbMultiPolygon:
4832  return kMULTIPOLYGON;
4833  default:
4834  break;
4835  }
4836  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4837 }
4838 
4839 /* static */
4840 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4841  const std::string& file_name,
4842  const std::string& geo_column_name,
4843  const CopyParams& copy_params) {
4844  std::list<ColumnDescriptor> cds;
4845 
4846  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4847  if (poDS == nullptr) {
4848  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4849  file_name);
4850  }
4851 
4852  OGRLayer& layer =
4853  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4854 
4855  layer.ResetReading();
4856  // TODO(andrewseidl): support multiple features
4857  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4858  if (poFeature == nullptr) {
4859  throw std::runtime_error("No features found in " + file_name);
4860  }
4861  // get fields as regular columns
4862  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4863  CHECK(poFDefn);
4864  int iField;
4865  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4866  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4867  auto typePair = ogr_to_type(poFieldDefn->GetType());
4868  ColumnDescriptor cd;
4869  cd.columnName = poFieldDefn->GetNameRef();
4870  cd.sourceName = poFieldDefn->GetNameRef();
4871  SQLTypeInfo ti;
4872  if (typePair.second) {
4873  ti.set_type(kARRAY);
4874  ti.set_subtype(typePair.first);
4875  } else {
4876  ti.set_type(typePair.first);
4877  }
4878  if (typePair.first == kTEXT) {
4880  ti.set_comp_param(32);
4881  }
4882  ti.set_fixed_size();
4883  cd.columnType = ti;
4884  cds.push_back(cd);
4885  }
4886  // get geo column, if any
4887  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4888  if (poGeometry) {
4889  ColumnDescriptor cd;
4890  cd.columnName = geo_column_name;
4891  cd.sourceName = geo_column_name;
4892 
4893  // get GDAL type
4894  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4895 
4896  // if exploding, override any collection type to child type
4897  if (copy_params.geo_explode_collections) {
4898  if (ogr_type == wkbMultiPolygon) {
4899  ogr_type = wkbPolygon;
4900  } else if (ogr_type == wkbMultiLineString) {
4901  ogr_type = wkbLineString;
4902  } else if (ogr_type == wkbMultiPoint) {
4903  ogr_type = wkbPoint;
4904  }
4905  }
4906 
4907  // convert to internal type
4908  SQLTypes geoType = ogr_to_type(ogr_type);
4909 
4910  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4912  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4913  }
4914 
4915  // build full internal type
4916  SQLTypeInfo ti;
4917  ti.set_type(geoType);
4918  ti.set_subtype(copy_params.geo_coords_type);
4919  ti.set_input_srid(copy_params.geo_coords_srid);
4920  ti.set_output_srid(copy_params.geo_coords_srid);
4921  ti.set_compression(copy_params.geo_coords_encoding);
4922  ti.set_comp_param(copy_params.geo_coords_comp_param);
4923  cd.columnType = ti;
4924 
4925  cds.push_back(cd);
4926  }
4927  return cds;
4928 }
4929 
4930 bool Importer::gdalStatInternal(const std::string& path,
4931  const CopyParams& copy_params,
4932  bool also_dir) {
4933  // lazy init GDAL
4935 
4936  // set authorization tokens
4937  setGDALAuthorizationTokens(copy_params);
4938 
4939 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4940  // clear GDAL stat cache
4941  // without this, file existence will be cached, even if authentication changes
4942  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4943  VSICurlClearCache();
4944 #endif
4945 
4946  // stat path
4947  VSIStatBufL sb;
4948  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4949  if (result < 0) {
4950  return false;
4951  }
4952 
4953  // exists?
4954  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4955  return true;
4956  } else if (VSI_ISREG(sb.st_mode)) {
4957  return true;
4958  }
4959  return false;
4960 }
4961 
4962 /* static */
4963 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4964  return gdalStatInternal(path, copy_params, false);
4965 }
4966 
4967 /* static */
4968 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4969  const CopyParams& copy_params) {
4970  return gdalStatInternal(path, copy_params, true);
4971 }
4972 
4973 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4974  std::vector<std::string>& files) {
4975  // prepare to gather subdirectories
4976  std::vector<std::string> subdirectories;
4977 
4978  // get entries
4979  char** entries = VSIReadDir(archive_path.c_str());
4980  if (!entries) {
4981  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4982  return;
4983  }
4984 
4985  // force scope
4986  {
4987  // request clean-up
4988  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4989 
4990  // check all the entries
4991  int index = 0;
4992  while (true) {
4993  // get next entry, or drop out if there isn't one
4994  char* entry_c = entries[index++];
4995  if (!entry_c) {
4996  break;
4997  }
4998  std::string entry(entry_c);
4999 
5000  // ignore '.' and '..'
5001  if (entry == "." || entry == "..") {
5002  continue;
5003  }
5004 
5005  // build the full path
5006  std::string entry_path = archive_path + std::string("/") + entry;
5007 
5008  // is it a file or a sub-folder
5009  VSIStatBufL sb;
5010  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5011  if (result < 0) {
5012  break;
5013  }
5014 
5015  if (VSI_ISDIR(sb.st_mode)) {
5016  // a directory that ends with .gdb could be a Geodatabase bundle
5017  // arguably dangerous to decide this purely by name, but any further
5018  // validation would be very complex especially at this scope
5019  if (boost::iends_with(entry_path, ".gdb")) {
5020  // add the directory as if it was a file and don't recurse into it
5021  files.push_back(entry_path);
5022  } else {
5023  // add subdirectory to be recursed into
5024  subdirectories.push_back(entry_path);
5025  }
5026  } else {
5027  // add this file
5028  files.push_back(entry_path);
5029  }
5030  }
5031  }
5032 
5033  // recurse into each subdirectories we found
5034  for (const auto& subdirectory : subdirectories) {
5035  gdalGatherFilesInArchiveRecursive(subdirectory, files);
5036  }
5037 }
5038 
5039 /* static */
5040 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
5041  const std::string& archive_path,
5042  const CopyParams& copy_params) {
5043  // lazy init GDAL
5045 
5046  // set authorization tokens
5047  setGDALAuthorizationTokens(copy_params);
5048 
5049  // prepare to gather files
5050  std::vector<std::string> files;
5051 
5052  // gather the files recursively
5053  gdalGatherFilesInArchiveRecursive(archive_path, files);
5054 
5055  // convert to relative paths inside archive
5056  for (auto& file : files) {
5057  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
5058  }
5059 
5060  // done
5061  return files;
5062 }
5063 
5064 /* static */
5065 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
5066  const std::string& file_name,
5067  const CopyParams& copy_params) {
5068  // lazy init GDAL
5070 
5071  // set authorization tokens
5072  setGDALAuthorizationTokens(copy_params);
5073 
5074  // prepare to gather layer info
5075  std::vector<GeoFileLayerInfo> layer_info;
5076 
5077  // open the data set
5078  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
5079  if (poDS == nullptr) {
5080  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
5081  file_name);
5082  }
5083 
5084  // enumerate the layers
5085  for (auto&& poLayer : poDS->GetLayers()) {
5087  // prepare to read this layer
5088  poLayer->ResetReading();
5089  // skip layer if empty
5090  if (poLayer->GetFeatureCount() > 0) {
5091  // get first feature
5092  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
5093  CHECK(first_feature);
5094  // check feature for geometry
5095  const OGRGeometry* geometry = first_feature->GetGeometryRef();
5096  if (!geometry) {
5097  // layer has no geometry
5098  contents = GeoFileLayerContents::NON_GEO;
5099  } else {
5100  // check the geometry type
5101  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5102  switch (wkbFlatten(geometry_type)) {
5103  case wkbPoint:
5104  case wkbLineString:
5105  case wkbPolygon:
5106  case wkbMultiPolygon:
5107  // layer has supported geo
5108  contents = GeoFileLayerContents::GEO;
5109  break;
5110  case wkbMultiPoint:
5111  case wkbMultiLineString:
5112  // supported if geo_explode_collections is specified
5113  contents = copy_params.geo_explode_collections
5116  break;
5117  default:
5118  // layer has unsupported geometry
5120  break;
5121  }
5122  }
5123  }
5124  // store info for this layer
5125  layer_info.emplace_back(poLayer->GetName(), contents);
5126  }
5127 
5128  // done
5129  return layer_info;
5130 }
5131 
5133  const Catalog_Namespace::SessionInfo* session_info) {
5134  // initial status
5137  if (poDS == nullptr) {
5138  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
5139  file_path);
5140  }
5141 
5142  OGRLayer& layer =
5144 
5145  // get the number of features in this layer
5146  size_t numFeatures = layer.GetFeatureCount();
5147 
5148  // build map of metadata field (additional columns) name to index
5149  // use shared_ptr since we need to pass it to the worker
5150  FieldNameToIndexMapType fieldNameToIndexMap;
5151  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5152  CHECK(poFDefn);
5153  size_t numFields = poFDefn->GetFieldCount();
5154  for (size_t iField = 0; iField < numFields; iField++) {
5155  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5156  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5157  }
5158 
5159  // the geographic spatial reference we want to put everything in
5160  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
5161  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5162 
5163 #if GDAL_VERSION_MAJOR >= 3
5164  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
5165  // this results in X and Y being transposed for angle-based
5166  // coordinate systems. This restores the previous behavior.
5167  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5168 #endif
5169 
5170 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5171  // just one "thread"
5172  max_threads = 1;
5173 #else
5174  // how many threads to use
5175  if (copy_params.threads == 0) {
5176  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
5178  } else {
5179  max_threads = copy_params.threads;
5180  }
5181 #endif
5182 
5183  VLOG(1) << "GDAL import # threads: " << max_threads;
5184 
5185  // import geo table is specifically handled in both DBHandler and QueryRunner
5186  // that is separate path against a normal SQL execution
5187  // so we here explicitly enroll the import session to allow interruption
5188  // while importing geo table
5189  auto query_session = session_info ? session_info->get_session_id() : "";
5190  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
5192  auto is_session_already_registered = false;
5193  {
5194  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
5195  is_session_already_registered =
5196  executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5197  }
5198  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty() &&
5199  !is_session_already_registered) {
5200  executor->enrollQuerySession(query_session,
5201  "IMPORT_GEO_TABLE",
5202  query_submitted_time,
5204  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5205  }
5206  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5207  // reset the runtime query interrupt status
5208  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
5209  executor->clearQuerySessionStatus(query_session, query_submitted_time);
5210  }
5211  };
5212 
5213  // make an import buffer for each thread
5214  CHECK_EQ(import_buffers_vec.size(), 0u);
5215  import_buffers_vec.resize(max_threads);
5216  for (size_t i = 0; i < max_threads; i++) {
5217  for (const auto cd : loader->get_column_descs()) {
5218  import_buffers_vec[i].emplace_back(
5219  new TypedImportBuffer(cd, loader->getStringDict(cd)));
5220  }
5221  }
5222 
5223  // make render group analyzers for each poly column
5224  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
5226  auto& cat = loader->getCatalog();
5227  auto* td = loader->getTableDesc();
5228  CHECK(td);
5229  auto column_descriptors =
5230  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5231  for (auto const& cd : column_descriptors) {
5232  if (IS_GEO_POLY(cd->columnType.get_type())) {
5233  auto rga = std::make_shared<RenderGroupAnalyzer>();
5234  rga->seedFromExistingTableContents(cat, td->tableName, cd->columnName);
5235  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5236  }
5237  }
5238  }
5239 
5240 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5241  // threads
5242  std::list<std::future<ImportStatus>> threads;
5243 
5244  // use a stack to track thread_ids which must not overlap among threads
5245  // because thread_id is used to index import_buffers_vec[]
5246  std::stack<size_t> stack_thread_ids;
5247  for (size_t i = 0; i < max_threads; i++) {
5248  stack_thread_ids.push(i);
5249  }
5250 #endif
5251 
5252  // checkpoint the table
5253  auto table_epochs = loader->getTableEpochs();
5254 
5255  // reset the layer
5256  layer.ResetReading();
5257 
5258  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5259 
5260  // make a features buffer for each thread
5261  std::vector<FeaturePtrVector> features(max_threads);
5262 
5263  // for each feature...
5264  size_t firstFeatureThisChunk = 0;
5265  while (firstFeatureThisChunk < numFeatures) {
5266  // how many features this chunk
5267  size_t numFeaturesThisChunk =
5268  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5269 
5270 // get a thread_id not in use
5271 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5272  size_t thread_id = 0;
5273 #else
5274  auto thread_id = stack_thread_ids.top();
5275  stack_thread_ids.pop();
5276  CHECK(thread_id < max_threads);
5277 #endif
5278 
5279  // fill features buffer for new thread
5280  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5281  features[thread_id].emplace_back(layer.GetNextFeature());
5282  }
5283 
5284 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5285  // call worker function directly
5286  auto ret_import_status = import_thread_shapefile(0,
5287  this,
5288  poGeographicSR.get(),
5289  std::move(features[thread_id]),
5290  firstFeatureThisChunk,
5291  numFeaturesThisChunk,
5292  fieldNameToIndexMap,
5293  columnNameToSourceNameMap,
5294  columnIdToRenderGroupAnalyzerMap,
5295  session_info,
5296  executor.get());
5297  import_status += ret_import_status;
5298  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5299  import_status.rows_completed;
5300  set_import_status(import_id, import_status);
5301 #else
5302  // fire up that thread to import this geometry
5303  threads.push_back(std::async(std::launch::async,
5305  thread_id,
5306  this,
5307  poGeographicSR.get(),
5308  std::move(features[thread_id]),
5309  firstFeatureThisChunk,
5310  numFeaturesThisChunk,
5311  fieldNameToIndexMap,
5312  columnNameToSourceNameMap,
5313  columnIdToRenderGroupAnalyzerMap,
5314  session_info,
5315  executor.get()));
5316 
5317  // let the threads run
5318  while (threads.size() > 0) {
5319  int nready = 0;
5320  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5321  it != threads.end();) {
5322  auto& p = *it;
5323  std::chrono::milliseconds span(
5324  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5325  if (p.wait_for(span) == std::future_status::ready) {
5326  auto ret_import_status = p.get();
5327  {
5328  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
5329  import_status_ += ret_import_status;
5331  ((float)firstFeatureThisChunk / (float)numFeatures) *
5335  break;
5336  }
5337  }
5338  // recall thread_id for reuse
5339  stack_thread_ids.push(ret_import_status.thread_id);
5340 
5341  threads.erase(it++);
5342  ++nready;
5343  } else {
5344  ++it;
5345  }
5346  }
5347 
5348  if (nready == 0) {
5349  std::this_thread::yield();
5350  }
5351 
5352  // keep reading if any free thread slot
5353  // this is one of the major difference from old threading model !!
5354  if (threads.size() < max_threads) {
5355  break;
5356  }
5357  }
5358 #endif
5359 
5360  // out of rows?
5361 
5362  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
5364  import_status_.load_failed = true;
5365  // todo use better message
5366  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5367  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5368  break;
5369  }
5371  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5372  "more details";
5373  break;
5374  }
5375 
5376  firstFeatureThisChunk += numFeaturesThisChunk;
5377  }
5378 
5379 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5380  // wait for any remaining threads
5381  if (threads.size()) {
5382  for (auto& p : threads) {
5383  // wait for the thread
5384  p.wait();
5385  // get the result and update the final import status
5386  auto ret_import_status = p.get();
5387  import_status_ += ret_import_status;