OmniSciDB  0264ff685a
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/dynamic_bitset.hpp>
31 #include <boost/filesystem.hpp>
32 #include <boost/geometry.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
56 #include "Geospatial/Compression.h"
57 #include "Geospatial/GDAL.h"
58 #include "Geospatial/Transforms.h"
59 #include "Geospatial/Types.h"
61 #include "Logger/Logger.h"
64 #include "Shared/DateTimeParser.h"
65 #include "Shared/SqlTypesLayout.h"
66 #include "Shared/import_helpers.h"
67 #include "Shared/measure.h"
68 #include "Shared/misc.h"
69 #include "Shared/scope.h"
70 #include "Shared/shard_key.h"
71 #include "Shared/thread_count.h"
73 
74 #include "gen-cpp/OmniSci.h"
75 
77  32; // Max number of default import threads to use (num hardware threads will be used
78  // if lower, and can also be explicitly overriden in copy statement with threads
79  // option)
80 size_t g_archive_read_buf_size = 1 << 20;
81 
82 inline auto get_filesize(const std::string& file_path) {
83  boost::filesystem::path boost_file_path{file_path};
84  boost::system::error_code ec;
85  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
86  return ec ? 0 : filesize;
87 }
88 
89 namespace {
90 
92  void operator()(OGRDataSource* datasource) {
93  if (datasource) {
94  GDALClose(datasource);
95  }
96  }
97 };
98 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
99 
101  void operator()(OGRFeature* feature) {
102  if (feature) {
103  OGRFeature::DestroyFeature(feature);
104  }
105  }
106 };
107 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
108 
110  void operator()(OGRSpatialReference* ref) {
111  if (ref) {
112  OGRSpatialReference::DestroySpatialReference(ref);
113  }
114  }
115 };
117  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
118 
119 } // namespace
120 
121 // For logging std::vector<std::string> row.
122 namespace boost {
123 namespace log {
124 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
125  out << '[';
126  for (size_t i = 0; i < row.size(); ++i) {
127  out << (i ? ", " : "") << row[i];
128  }
129  out << ']';
130  return out;
131 }
132 } // namespace log
133 } // namespace boost
134 
135 namespace import_export {
136 
137 using FieldNameToIndexMapType = std::map<std::string, size_t>;
138 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
140  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
141 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
142 
143 #define DEBUG_TIMING false
144 #define DEBUG_RENDER_GROUP_ANALYZER 0
145 #define DEBUG_AWS_AUTHENTICATION 0
146 
147 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
148 
149 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
150 
152 static std::map<std::string, ImportStatus> import_status_map;
153 
154 Importer::Importer(Catalog_Namespace::Catalog& c,
155  const TableDescriptor* t,
156  const std::string& f,
157  const CopyParams& p)
158  : Importer(new Loader(c, t), f, p) {}
159 
160 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
161  : DataStreamSink(p, f), loader(providedLoader) {
162  import_id = boost::filesystem::path(file_path).filename().string();
163  file_size = 0;
164  max_threads = 0;
165  p_file = nullptr;
166  buffer[0] = nullptr;
167  buffer[1] = nullptr;
168  // we may be overallocating a little more memory here due to dropping phy cols.
169  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
170  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
171  int i = 0;
172  bool has_array = false;
173  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
174  int skip_physical_cols = 0;
175  for (auto& p : loader->get_column_descs()) {
176  // phy geo columns can't be in input file
177  if (skip_physical_cols-- > 0) {
178  continue;
179  }
180  // neither are rowid or $deleted$
181  // note: columns can be added after rowid/$deleted$
182  if (p->isVirtualCol || p->isDeletedCol) {
183  continue;
184  }
185  skip_physical_cols = p->columnType.get_physical_cols();
186  if (p->columnType.get_type() == kARRAY) {
187  is_array.get()[i] = true;
188  has_array = true;
189  } else {
190  is_array.get()[i] = false;
191  }
192  ++i;
193  }
194  if (has_array) {
195  is_array_a = std::unique_ptr<bool[]>(is_array.release());
196  } else {
197  is_array_a = std::unique_ptr<bool[]>(nullptr);
198  }
199 }
200 
202  if (p_file != nullptr) {
203  fclose(p_file);
204  }
205  if (buffer[0] != nullptr) {
206  free(buffer[0]);
207  }
208  if (buffer[1] != nullptr) {
209  free(buffer[1]);
210  }
211 }
212 
214  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
215  return import_status_map.at(import_id);
216 }
217 
218 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
219  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
220  is.end = std::chrono::steady_clock::now();
221  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
222  import_status_map[import_id] = is;
223 }
224 
225 static const std::string trim_space(const char* field, const size_t len) {
226  size_t i = 0;
227  size_t j = len;
228  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
229  i++;
230  }
231  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
232  j--;
233  }
234  return std::string(field + i, j - i);
235 }
236 
238  Datum d;
239  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
240  switch (type) {
241  case kBOOLEAN:
243  break;
244  case kBIGINT:
246  break;
247  case kINT:
249  break;
250  case kSMALLINT:
252  break;
253  case kTINYINT:
255  break;
256  case kFLOAT:
257  d.floatval = NULL_FLOAT;
258  break;
259  case kDOUBLE:
261  break;
262  case kTIME:
263  case kTIMESTAMP:
264  case kDATE:
266  break;
267  case kPOINT:
268  case kLINESTRING:
269  case kPOLYGON:
270  case kMULTIPOLYGON:
271  throw std::runtime_error("Internal error: geometry type in NullDatum.");
272  default:
273  throw std::runtime_error("Internal error: invalid type in NullDatum.");
274  }
275  return d;
276 }
277 
279  Datum d;
280  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
281  switch (type) {
282  case kBOOLEAN:
284  break;
285  case kBIGINT:
287  break;
288  case kINT:
290  break;
291  case kSMALLINT:
293  break;
294  case kTINYINT:
296  break;
297  case kFLOAT:
299  break;
300  case kDOUBLE:
302  break;
303  case kTIME:
304  case kTIMESTAMP:
305  case kDATE:
307  break;
308  case kPOINT:
309  case kLINESTRING:
310  case kPOLYGON:
311  case kMULTIPOLYGON:
312  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
313  default:
314  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
315  }
316  return d;
317 }
318 
319 ArrayDatum StringToArray(const std::string& s,
320  const SQLTypeInfo& ti,
321  const CopyParams& copy_params) {
322  SQLTypeInfo elem_ti = ti.get_elem_type();
323  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
324  return ArrayDatum(0, NULL, true);
325  }
326  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
327  LOG(WARNING) << "Malformed array: " << s;
328  return ArrayDatum(0, NULL, true);
329  }
330  std::vector<std::string> elem_strs;
331  size_t last = 1;
332  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
333  i = s.find(copy_params.array_delim, last)) {
334  elem_strs.push_back(s.substr(last, i - last));
335  last = i + 1;
336  }
337  if (last + 1 <= s.size()) {
338  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
339  }
340  if (elem_strs.size() == 1) {
341  auto str = elem_strs.front();
342  auto str_trimmed = trim_space(str.c_str(), str.length());
343  if (str_trimmed == "") {
344  elem_strs.clear(); // Empty array
345  }
346  }
347  if (!elem_ti.is_string()) {
348  size_t len = elem_strs.size() * elem_ti.get_size();
349  int8_t* buf = (int8_t*)checked_malloc(len);
350  int8_t* p = buf;
351  for (auto& es : elem_strs) {
352  auto e = trim_space(es.c_str(), es.length());
353  bool is_null = (e == copy_params.null_str) || e == "NULL";
354  if (!elem_ti.is_string() && e == "") {
355  is_null = true;
356  }
357  if (elem_ti.is_number() || elem_ti.is_time()) {
358  if (!isdigit(e[0]) && e[0] != '-') {
359  is_null = true;
360  }
361  }
362  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
363  p = appendDatum(p, d, elem_ti);
364  }
365  return ArrayDatum(len, buf, false);
366  }
367  // must not be called for array of strings
368  CHECK(false);
369  return ArrayDatum(0, NULL, true);
370 }
371 
373  SQLTypeInfo elem_ti = ti.get_elem_type();
374  auto len = ti.get_size();
375 
376  if (elem_ti.is_string()) {
377  // must not be called for array of strings
378  CHECK(false);
379  return ArrayDatum(0, NULL, true);
380  }
381 
382  if (len > 0) {
383  // Compose a NULL fixlen array
384  int8_t* buf = (int8_t*)checked_malloc(len);
385  // First scalar is a NULL_ARRAY sentinel
386  Datum d = NullArrayDatum(elem_ti);
387  int8_t* p = appendDatum(buf, d, elem_ti);
388  // Rest is filled with normal NULL sentinels
389  Datum d0 = NullDatum(elem_ti);
390  while ((p - buf) < len) {
391  p = appendDatum(p, d0, elem_ti);
392  }
393  CHECK((p - buf) == len);
394  return ArrayDatum(len, buf, true);
395  }
396  // NULL varlen array
397  return ArrayDatum(0, NULL, true);
398 }
399 
401  return NullArray(ti);
402 }
403 
404 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
405  const auto& arr = datum.val.arr_val;
406  for (const auto& elem_datum : arr) {
407  string_vec.push_back(elem_datum.val.str_val);
408  }
409 }
410 
411 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
412  Datum d;
413  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
414  switch (type) {
415  case kBOOLEAN:
416  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
417  break;
418  case kBIGINT:
419  d.bigintval =
420  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
421  break;
422  case kINT:
423  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
424  break;
425  case kSMALLINT:
426  d.smallintval =
427  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
428  break;
429  case kTINYINT:
430  d.tinyintval =
431  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
432  break;
433  case kFLOAT:
434  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
435  break;
436  case kDOUBLE:
437  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
438  break;
439  case kTIME:
440  case kTIMESTAMP:
441  case kDATE:
442  d.bigintval =
443  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
444  break;
445  case kPOINT:
446  case kLINESTRING:
447  case kPOLYGON:
448  case kMULTIPOLYGON:
449  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
450  default:
451  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
452  }
453  return d;
454 }
455 
456 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
457  SQLTypeInfo elem_ti = ti.get_elem_type();
458 
459  CHECK(!elem_ti.is_string());
460 
461  if (datum.is_null) {
462  return NullArray(ti);
463  }
464 
465  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
466  int8_t* buf = (int8_t*)checked_malloc(len);
467  int8_t* p = buf;
468  for (auto& e : datum.val.arr_val) {
469  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
470  }
471 
472  return ArrayDatum(len, buf, false);
473 }
474 
475 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
476  CHECK(string_dict_);
477  std::vector<std::string_view> string_view_vec;
478  string_view_vec.reserve(string_vec.size());
479  for (const auto& str : string_vec) {
480  if (str.size() > StringDictionary::MAX_STRLEN) {
481  throw std::runtime_error("String too long for dictionary encoding.");
482  }
483  string_view_vec.push_back(str);
484  }
485  switch (column_desc_->columnType.get_size()) {
486  case 1:
487  string_dict_i8_buffer_->resize(string_view_vec.size());
488  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
489  break;
490  case 2:
491  string_dict_i16_buffer_->resize(string_view_vec.size());
492  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
493  break;
494  case 4:
495  string_dict_i32_buffer_->resize(string_view_vec.size());
496  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
497  break;
498  default:
499  CHECK(false);
500  }
501 }
502 
504  const std::string_view val,
505  const bool is_null,
506  const CopyParams& copy_params,
507  const int64_t replicate_count) {
508  set_replicate_count(replicate_count);
509  const auto type = cd->columnType.get_type();
510  switch (type) {
511  case kBOOLEAN: {
512  if (is_null) {
513  if (cd->columnType.get_notnull()) {
514  throw std::runtime_error("NULL for column " + cd->columnName);
515  }
517  } else {
518  auto ti = cd->columnType;
519  Datum d = StringToDatum(val, ti);
520  addBoolean(static_cast<int8_t>(d.boolval));
521  }
522  break;
523  }
524  case kTINYINT: {
525  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
526  auto ti = cd->columnType;
527  Datum d = StringToDatum(val, ti);
528  addTinyint(d.tinyintval);
529  } else {
530  if (cd->columnType.get_notnull()) {
531  throw std::runtime_error("NULL for column " + cd->columnName);
532  }
534  }
535  break;
536  }
537  case kSMALLINT: {
538  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
539  auto ti = cd->columnType;
540  Datum d = StringToDatum(val, ti);
541  addSmallint(d.smallintval);
542  } else {
543  if (cd->columnType.get_notnull()) {
544  throw std::runtime_error("NULL for column " + cd->columnName);
545  }
546  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
547  }
548  break;
549  }
550  case kINT: {
551  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
552  auto ti = cd->columnType;
553  Datum d = StringToDatum(val, ti);
554  addInt(d.intval);
555  } else {
556  if (cd->columnType.get_notnull()) {
557  throw std::runtime_error("NULL for column " + cd->columnName);
558  }
560  }
561  break;
562  }
563  case kBIGINT: {
564  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
565  auto ti = cd->columnType;
566  Datum d = StringToDatum(val, ti);
567  addBigint(d.bigintval);
568  } else {
569  if (cd->columnType.get_notnull()) {
570  throw std::runtime_error("NULL for column " + cd->columnName);
571  }
573  }
574  break;
575  }
576  case kDECIMAL:
577  case kNUMERIC: {
578  if (!is_null) {
579  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
580  Datum d = StringToDatum(val, ti);
581  const auto converted_decimal_value =
583  addBigint(converted_decimal_value);
584  } else {
585  if (cd->columnType.get_notnull()) {
586  throw std::runtime_error("NULL for column " + cd->columnName);
587  }
589  }
590  break;
591  }
592  case kFLOAT:
593  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
594  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
595  } else {
596  if (cd->columnType.get_notnull()) {
597  throw std::runtime_error("NULL for column " + cd->columnName);
598  }
599  addFloat(NULL_FLOAT);
600  }
601  break;
602  case kDOUBLE:
603  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
604  addDouble(std::atof(std::string(val).c_str()));
605  } else {
606  if (cd->columnType.get_notnull()) {
607  throw std::runtime_error("NULL for column " + cd->columnName);
608  }
609  addDouble(NULL_DOUBLE);
610  }
611  break;
612  case kTEXT:
613  case kVARCHAR:
614  case kCHAR: {
615  // @TODO(wei) for now, use empty string for nulls
616  if (is_null) {
617  if (cd->columnType.get_notnull()) {
618  throw std::runtime_error("NULL for column " + cd->columnName);
619  }
620  addString(std::string());
621  } else {
622  if (val.length() > StringDictionary::MAX_STRLEN) {
623  throw std::runtime_error("String too long for column " + cd->columnName +
624  " was " + std::to_string(val.length()) + " max is " +
626  }
627  addString(val);
628  }
629  break;
630  }
631  case kTIME:
632  case kTIMESTAMP:
633  case kDATE:
634  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
635  SQLTypeInfo ti = cd->columnType;
636  Datum d = StringToDatum(val, ti);
637  addBigint(d.bigintval);
638  } else {
639  if (cd->columnType.get_notnull()) {
640  throw std::runtime_error("NULL for column " + cd->columnName);
641  }
643  }
644  break;
645  case kARRAY: {
646  if (is_null && cd->columnType.get_notnull()) {
647  throw std::runtime_error("NULL for column " + cd->columnName);
648  }
649  SQLTypeInfo ti = cd->columnType;
650  if (IS_STRING(ti.get_subtype())) {
651  std::vector<std::string> string_vec;
652  // Just parse string array, don't push it to buffer yet as we might throw
654  std::string(val), copy_params, string_vec);
655  if (!is_null) {
656  // TODO: add support for NULL string arrays
657  if (ti.get_size() > 0) {
658  auto sti = ti.get_elem_type();
659  size_t expected_size = ti.get_size() / sti.get_size();
660  size_t actual_size = string_vec.size();
661  if (actual_size != expected_size) {
662  throw std::runtime_error("Fixed length array column " + cd->columnName +
663  " expects " + std::to_string(expected_size) +
664  " values, received " +
665  std::to_string(actual_size));
666  }
667  }
668  addStringArray(string_vec);
669  } else {
670  if (ti.get_size() > 0) {
671  // TODO: remove once NULL fixlen arrays are allowed
672  throw std::runtime_error("Fixed length array column " + cd->columnName +
673  " currently cannot accept NULL arrays");
674  }
675  // TODO: add support for NULL string arrays, replace with addStringArray(),
676  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
677  addStringArray(string_vec);
678  }
679  } else {
680  if (!is_null) {
681  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
682  if (d.is_null) { // val could be "NULL"
683  addArray(NullArray(ti));
684  } else {
685  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
686  throw std::runtime_error("Fixed length array for column " + cd->columnName +
687  " has incorrect length: " + std::string(val));
688  }
689  addArray(d);
690  }
691  } else {
692  addArray(NullArray(ti));
693  }
694  }
695  break;
696  }
697  case kPOINT:
698  case kLINESTRING:
699  case kPOLYGON:
700  case kMULTIPOLYGON:
701  addGeoString(val);
702  break;
703  default:
704  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
705  }
706 }
707 
709  const auto type = column_desc_->columnType.is_decimal()
710  ? decimal_to_int_type(column_desc_->columnType)
711  : column_desc_->columnType.get_type();
712  switch (type) {
713  case kBOOLEAN:
714  bool_buffer_->pop_back();
715  break;
716  case kTINYINT:
717  tinyint_buffer_->pop_back();
718  break;
719  case kSMALLINT:
720  smallint_buffer_->pop_back();
721  break;
722  case kINT:
723  int_buffer_->pop_back();
724  break;
725  case kBIGINT:
726  bigint_buffer_->pop_back();
727  break;
728  case kFLOAT:
729  float_buffer_->pop_back();
730  break;
731  case kDOUBLE:
732  double_buffer_->pop_back();
733  break;
734  case kTEXT:
735  case kVARCHAR:
736  case kCHAR:
737  string_buffer_->pop_back();
738  break;
739  case kDATE:
740  case kTIME:
741  case kTIMESTAMP:
742  bigint_buffer_->pop_back();
743  break;
744  case kARRAY:
745  if (IS_STRING(column_desc_->columnType.get_subtype())) {
746  string_array_buffer_->pop_back();
747  } else {
748  array_buffer_->pop_back();
749  }
750  break;
751  case kPOINT:
752  case kLINESTRING:
753  case kPOLYGON:
754  case kMULTIPOLYGON:
755  geo_string_buffer_->pop_back();
756  break;
757  default:
758  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
759  }
760 }
761 
762 struct GeoImportException : std::runtime_error {
763  using std::runtime_error::runtime_error;
764 };
765 
766 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
767 template <typename DATA_TYPE>
769  const ColumnDescriptor* cd,
770  const Array& array,
771  std::vector<DATA_TYPE>& buffer,
772  const ArraySliceRange& slice_range,
773  import_export::BadRowsTracker* const bad_rows_tracker) {
774  auto data =
775  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
776  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
777  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
778  if (bad_rows_tracker && cd->columnType.is_geometry()) {
779  f_add_geo_phy_cols = [&](const int64_t row) {
780  // Populate physical columns (ref. DBHandler::load_table)
781  std::vector<double> coords, bounds;
782  std::vector<int> ring_sizes, poly_rings;
783  int render_group = 0;
784  SQLTypeInfo ti;
785  // replace any unexpected exception from getGeoColumns or other
786  // on this path with a GeoImportException so that we wont over
787  // push a null to the logical column...
788  try {
789  SQLTypeInfo import_ti{ti};
790  if (array.IsNull(row)) {
792  import_ti, coords, bounds, ring_sizes, poly_rings, false);
793  } else {
794  arrow_throw_if<GeoImportException>(
795  !Geospatial::GeoTypesFactory::getGeoColumns(geo_string_buffer_->back(),
796  ti,
797  coords,
798  bounds,
799  ring_sizes,
800  poly_rings,
801  false),
802  error_context(cd, bad_rows_tracker) + "Invalid geometry");
803  arrow_throw_if<GeoImportException>(
804  cd->columnType.get_type() != ti.get_type(),
805  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
806  }
807  auto col_idx_workpad = col_idx; // what a pitfall!!
809  bad_rows_tracker->importer->getCatalog(),
810  cd,
811  *import_buffers,
812  col_idx_workpad,
813  coords,
814  bounds,
815  ring_sizes,
816  poly_rings,
817  render_group);
818  } catch (GeoImportException&) {
819  throw;
820  } catch (std::runtime_error& e) {
821  throw GeoImportException(e.what());
822  } catch (const std::exception& e) {
823  throw GeoImportException(e.what());
824  } catch (...) {
825  throw GeoImportException("unknown exception");
826  }
827  };
828  }
829  auto f_mark_a_bad_row = [&](const auto row) {
830  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
831  bad_rows_tracker->rows.insert(row - slice_range.first);
832  };
833  buffer.reserve(slice_range.second - slice_range.first);
834  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
835  try {
836  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
837  f_add_geo_phy_cols(row);
838  } catch (GeoImportException&) {
839  f_mark_a_bad_row(row);
840  } catch (ArrowImporterException&) {
841  // trace bad rows of each column; otherwise rethrow.
842  if (bad_rows_tracker) {
843  *data << nullptr;
844  f_mark_a_bad_row(row);
845  } else {
846  throw;
847  }
848  }
849  }
850  return buffer.size();
851 }
852 
854  const Array& col,
855  const bool exact_type_match,
856  const ArraySliceRange& slice_range,
857  BadRowsTracker* const bad_rows_tracker) {
858  const auto type = cd->columnType.get_type();
859  if (cd->columnType.get_notnull()) {
860  // We can't have any null values for this column; to have them is an error
861  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
862  }
863 
864  switch (type) {
865  case kBOOLEAN:
866  if (exact_type_match) {
867  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
868  }
869  return convert_arrow_val_to_import_buffer(
870  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
871  case kTINYINT:
872  if (exact_type_match) {
873  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
874  }
875  return convert_arrow_val_to_import_buffer(
876  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
877  case kSMALLINT:
878  if (exact_type_match) {
879  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
880  }
881  return convert_arrow_val_to_import_buffer(
882  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
883  case kINT:
884  if (exact_type_match) {
885  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
886  }
887  return convert_arrow_val_to_import_buffer(
888  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
889  case kBIGINT:
890  case kNUMERIC:
891  case kDECIMAL:
892  if (exact_type_match) {
893  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
894  }
895  return convert_arrow_val_to_import_buffer(
896  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
897  case kFLOAT:
898  if (exact_type_match) {
899  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
900  }
901  return convert_arrow_val_to_import_buffer(
902  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
903  case kDOUBLE:
904  if (exact_type_match) {
905  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
906  }
907  return convert_arrow_val_to_import_buffer(
908  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
909  case kTEXT:
910  case kVARCHAR:
911  case kCHAR:
912  if (exact_type_match) {
913  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
914  "Expected string type");
915  }
916  return convert_arrow_val_to_import_buffer(
917  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
918  case kTIME:
919  if (exact_type_match) {
920  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
921  "Expected time32 or time64 type");
922  }
923  return convert_arrow_val_to_import_buffer(
924  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
925  case kTIMESTAMP:
926  if (exact_type_match) {
927  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
928  }
929  return convert_arrow_val_to_import_buffer(
930  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
931  case kDATE:
932  if (exact_type_match) {
933  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
934  "Expected date32 or date64 type");
935  }
936  return convert_arrow_val_to_import_buffer(
937  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
938  case kPOINT:
939  case kLINESTRING:
940  case kPOLYGON:
941  case kMULTIPOLYGON:
942  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
943  "Expected string type");
944  return convert_arrow_val_to_import_buffer(
945  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
946  case kARRAY:
947  throw std::runtime_error("Arrow array appends not yet supported");
948  default:
949  throw std::runtime_error("Invalid Type");
950  }
951 }
952 
953 // this is exclusively used by load_table_binary_columnar
954 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
955  size_t dataSize = 0;
956  if (cd->columnType.get_notnull()) {
957  // We can't have any null values for this column; to have them is an error
958  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
959  throw std::runtime_error("NULL for column " + cd->columnName);
960  }
961  }
962 
963  switch (cd->columnType.get_type()) {
964  case kBOOLEAN: {
965  dataSize = col.data.int_col.size();
966  bool_buffer_->reserve(dataSize);
967  for (size_t i = 0; i < dataSize; i++) {
968  if (col.nulls[i]) {
969  bool_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
970  } else {
971  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
972  }
973  }
974  break;
975  }
976  case kTINYINT: {
977  dataSize = col.data.int_col.size();
978  tinyint_buffer_->reserve(dataSize);
979  for (size_t i = 0; i < dataSize; i++) {
980  if (col.nulls[i]) {
981  tinyint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
982  } else {
983  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
984  }
985  }
986  break;
987  }
988  case kSMALLINT: {
989  dataSize = col.data.int_col.size();
990  smallint_buffer_->reserve(dataSize);
991  for (size_t i = 0; i < dataSize; i++) {
992  if (col.nulls[i]) {
993  smallint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
994  } else {
995  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
996  }
997  }
998  break;
999  }
1000  case kINT: {
1001  dataSize = col.data.int_col.size();
1002  int_buffer_->reserve(dataSize);
1003  for (size_t i = 0; i < dataSize; i++) {
1004  if (col.nulls[i]) {
1005  int_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1006  } else {
1007  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1008  }
1009  }
1010  break;
1011  }
1012  case kBIGINT:
1013  case kNUMERIC:
1014  case kDECIMAL: {
1015  dataSize = col.data.int_col.size();
1016  bigint_buffer_->reserve(dataSize);
1017  for (size_t i = 0; i < dataSize; i++) {
1018  if (col.nulls[i]) {
1019  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1020  } else {
1021  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1022  }
1023  }
1024  break;
1025  }
1026  case kFLOAT: {
1027  dataSize = col.data.real_col.size();
1028  float_buffer_->reserve(dataSize);
1029  for (size_t i = 0; i < dataSize; i++) {
1030  if (col.nulls[i]) {
1031  float_buffer_->push_back(NULL_FLOAT);
1032  } else {
1033  float_buffer_->push_back((float)col.data.real_col[i]);
1034  }
1035  }
1036  break;
1037  }
1038  case kDOUBLE: {
1039  dataSize = col.data.real_col.size();
1040  double_buffer_->reserve(dataSize);
1041  for (size_t i = 0; i < dataSize; i++) {
1042  if (col.nulls[i]) {
1043  double_buffer_->push_back(NULL_DOUBLE);
1044  } else {
1045  double_buffer_->push_back((double)col.data.real_col[i]);
1046  }
1047  }
1048  break;
1049  }
1050  case kTEXT:
1051  case kVARCHAR:
1052  case kCHAR: {
1053  // TODO: for now, use empty string for nulls
1054  dataSize = col.data.str_col.size();
1055  string_buffer_->reserve(dataSize);
1056  for (size_t i = 0; i < dataSize; i++) {
1057  if (col.nulls[i]) {
1058  string_buffer_->push_back(std::string());
1059  } else {
1060  string_buffer_->push_back(col.data.str_col[i]);
1061  }
1062  }
1063  break;
1064  }
1065  case kTIME:
1066  case kTIMESTAMP:
1067  case kDATE: {
1068  dataSize = col.data.int_col.size();
1069  bigint_buffer_->reserve(dataSize);
1070  for (size_t i = 0; i < dataSize; i++) {
1071  if (col.nulls[i]) {
1072  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1073  } else {
1074  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1075  }
1076  }
1077  break;
1078  }
1079  case kPOINT:
1080  case kLINESTRING:
1081  case kPOLYGON:
1082  case kMULTIPOLYGON: {
1083  dataSize = col.data.str_col.size();
1084  geo_string_buffer_->reserve(dataSize);
1085  for (size_t i = 0; i < dataSize; i++) {
1086  if (col.nulls[i]) {
1087  // TODO: add support for NULL geo
1088  geo_string_buffer_->push_back(std::string());
1089  } else {
1090  geo_string_buffer_->push_back(col.data.str_col[i]);
1091  }
1092  }
1093  break;
1094  }
1095  case kARRAY: {
1096  dataSize = col.data.arr_col.size();
1097  if (IS_STRING(cd->columnType.get_subtype())) {
1098  for (size_t i = 0; i < dataSize; i++) {
1099  std::vector<std::string>& string_vec = addStringArray();
1100  if (!col.nulls[i]) {
1101  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1102  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1103  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1104  }
1105  }
1106  }
1107  } else {
1108  auto elem_ti = cd->columnType.get_subtype();
1109  switch (elem_ti) {
1110  case kBOOLEAN: {
1111  for (size_t i = 0; i < dataSize; i++) {
1112  if (col.nulls[i]) {
1113  addArray(NullArray(cd->columnType));
1114  } else {
1115  size_t len = col.data.arr_col[i].data.int_col.size();
1116  size_t byteSize = len * sizeof(int8_t);
1117  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1118  int8_t* p = buf;
1119  for (size_t j = 0; j < len; ++j) {
1120  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1121  p += sizeof(bool);
1122  }
1123  addArray(ArrayDatum(byteSize, buf, false));
1124  }
1125  }
1126  break;
1127  }
1128  case kTINYINT: {
1129  for (size_t i = 0; i < dataSize; i++) {
1130  if (col.nulls[i]) {
1131  addArray(NullArray(cd->columnType));
1132  } else {
1133  size_t len = col.data.arr_col[i].data.int_col.size();
1134  size_t byteSize = len * sizeof(int8_t);
1135  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1136  int8_t* p = buf;
1137  for (size_t j = 0; j < len; ++j) {
1138  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1139  p += sizeof(int8_t);
1140  }
1141  addArray(ArrayDatum(byteSize, buf, false));
1142  }
1143  }
1144  break;
1145  }
1146  case kSMALLINT: {
1147  for (size_t i = 0; i < dataSize; i++) {
1148  if (col.nulls[i]) {
1149  addArray(NullArray(cd->columnType));
1150  } else {
1151  size_t len = col.data.arr_col[i].data.int_col.size();
1152  size_t byteSize = len * sizeof(int16_t);
1153  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1154  int8_t* p = buf;
1155  for (size_t j = 0; j < len; ++j) {
1156  *(int16_t*)p =
1157  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1158  p += sizeof(int16_t);
1159  }
1160  addArray(ArrayDatum(byteSize, buf, false));
1161  }
1162  }
1163  break;
1164  }
1165  case kINT: {
1166  for (size_t i = 0; i < dataSize; i++) {
1167  if (col.nulls[i]) {
1168  addArray(NullArray(cd->columnType));
1169  } else {
1170  size_t len = col.data.arr_col[i].data.int_col.size();
1171  size_t byteSize = len * sizeof(int32_t);
1172  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1173  int8_t* p = buf;
1174  for (size_t j = 0; j < len; ++j) {
1175  *(int32_t*)p =
1176  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1177  p += sizeof(int32_t);
1178  }
1179  addArray(ArrayDatum(byteSize, buf, false));
1180  }
1181  }
1182  break;
1183  }
1184  case kBIGINT:
1185  case kNUMERIC:
1186  case kDECIMAL: {
1187  for (size_t i = 0; i < dataSize; i++) {
1188  if (col.nulls[i]) {
1189  addArray(NullArray(cd->columnType));
1190  } else {
1191  size_t len = col.data.arr_col[i].data.int_col.size();
1192  size_t byteSize = len * sizeof(int64_t);
1193  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1194  int8_t* p = buf;
1195  for (size_t j = 0; j < len; ++j) {
1196  *(int64_t*)p =
1197  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1198  p += sizeof(int64_t);
1199  }
1200  addArray(ArrayDatum(byteSize, buf, false));
1201  }
1202  }
1203  break;
1204  }
1205  case kFLOAT: {
1206  for (size_t i = 0; i < dataSize; i++) {
1207  if (col.nulls[i]) {
1208  addArray(NullArray(cd->columnType));
1209  } else {
1210  size_t len = col.data.arr_col[i].data.real_col.size();
1211  size_t byteSize = len * sizeof(float);
1212  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1213  int8_t* p = buf;
1214  for (size_t j = 0; j < len; ++j) {
1215  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1216  p += sizeof(float);
1217  }
1218  addArray(ArrayDatum(byteSize, buf, false));
1219  }
1220  }
1221  break;
1222  }
1223  case kDOUBLE: {
1224  for (size_t i = 0; i < dataSize; i++) {
1225  if (col.nulls[i]) {
1226  addArray(NullArray(cd->columnType));
1227  } else {
1228  size_t len = col.data.arr_col[i].data.real_col.size();
1229  size_t byteSize = len * sizeof(double);
1230  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1231  int8_t* p = buf;
1232  for (size_t j = 0; j < len; ++j) {
1233  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1234  p += sizeof(double);
1235  }
1236  addArray(ArrayDatum(byteSize, buf, false));
1237  }
1238  }
1239  break;
1240  }
1241  case kTIME:
1242  case kTIMESTAMP:
1243  case kDATE: {
1244  for (size_t i = 0; i < dataSize; i++) {
1245  if (col.nulls[i]) {
1246  addArray(NullArray(cd->columnType));
1247  } else {
1248  size_t len = col.data.arr_col[i].data.int_col.size();
1249  size_t byteWidth = sizeof(int64_t);
1250  size_t byteSize = len * byteWidth;
1251  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1252  int8_t* p = buf;
1253  for (size_t j = 0; j < len; ++j) {
1254  *reinterpret_cast<int64_t*>(p) =
1255  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1256  p += sizeof(int64_t);
1257  }
1258  addArray(ArrayDatum(byteSize, buf, false));
1259  }
1260  }
1261  break;
1262  }
1263  default:
1264  throw std::runtime_error("Invalid Array Type");
1265  }
1266  }
1267  break;
1268  }
1269  default:
1270  throw std::runtime_error("Invalid Type");
1271  }
1272  return dataSize;
1273 }
1274 
1276  const TDatum& datum,
1277  const bool is_null,
1278  const int64_t replicate_count) {
1279  set_replicate_count(replicate_count);
1280  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1281  : cd->columnType.get_type();
1282  switch (type) {
1283  case kBOOLEAN: {
1284  if (is_null) {
1285  if (cd->columnType.get_notnull()) {
1286  throw std::runtime_error("NULL for column " + cd->columnName);
1287  }
1288  addBoolean(inline_fixed_encoding_null_val(cd->columnType));
1289  } else {
1290  addBoolean((int8_t)datum.val.int_val);
1291  }
1292  break;
1293  }
1294  case kTINYINT:
1295  if (!is_null) {
1296  addTinyint((int8_t)datum.val.int_val);
1297  } else {
1298  if (cd->columnType.get_notnull()) {
1299  throw std::runtime_error("NULL for column " + cd->columnName);
1300  }
1301  addTinyint(inline_fixed_encoding_null_val(cd->columnType));
1302  }
1303  break;
1304  case kSMALLINT:
1305  if (!is_null) {
1306  addSmallint((int16_t)datum.val.int_val);
1307  } else {
1308  if (cd->columnType.get_notnull()) {
1309  throw std::runtime_error("NULL for column " + cd->columnName);
1310  }
1311  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
1312  }
1313  break;
1314  case kINT:
1315  if (!is_null) {
1316  addInt((int32_t)datum.val.int_val);
1317  } else {
1318  if (cd->columnType.get_notnull()) {
1319  throw std::runtime_error("NULL for column " + cd->columnName);
1320  }
1322  }
1323  break;
1324  case kBIGINT:
1325  if (!is_null) {
1326  addBigint(datum.val.int_val);
1327  } else {
1328  if (cd->columnType.get_notnull()) {
1329  throw std::runtime_error("NULL for column " + cd->columnName);
1330  }
1332  }
1333  break;
1334  case kFLOAT:
1335  if (!is_null) {
1336  addFloat((float)datum.val.real_val);
1337  } else {
1338  if (cd->columnType.get_notnull()) {
1339  throw std::runtime_error("NULL for column " + cd->columnName);
1340  }
1341  addFloat(NULL_FLOAT);
1342  }
1343  break;
1344  case kDOUBLE:
1345  if (!is_null) {
1346  addDouble(datum.val.real_val);
1347  } else {
1348  if (cd->columnType.get_notnull()) {
1349  throw std::runtime_error("NULL for column " + cd->columnName);
1350  }
1351  addDouble(NULL_DOUBLE);
1352  }
1353  break;
1354  case kTEXT:
1355  case kVARCHAR:
1356  case kCHAR: {
1357  // @TODO(wei) for now, use empty string for nulls
1358  if (is_null) {
1359  if (cd->columnType.get_notnull()) {
1360  throw std::runtime_error("NULL for column " + cd->columnName);
1361  }
1362  addString(std::string());
1363  } else {
1364  addString(datum.val.str_val);
1365  }
1366  break;
1367  }
1368  case kTIME:
1369  case kTIMESTAMP:
1370  case kDATE: {
1371  if (!is_null) {
1372  addBigint(datum.val.int_val);
1373  } else {
1374  if (cd->columnType.get_notnull()) {
1375  throw std::runtime_error("NULL for column " + cd->columnName);
1376  }
1378  }
1379  break;
1380  }
1381  case kARRAY:
1382  if (is_null && cd->columnType.get_notnull()) {
1383  throw std::runtime_error("NULL for column " + cd->columnName);
1384  }
1385  if (IS_STRING(cd->columnType.get_subtype())) {
1386  std::vector<std::string>& string_vec = addStringArray();
1387  addBinaryStringArray(datum, string_vec);
1388  } else {
1389  if (!is_null) {
1390  addArray(TDatumToArrayDatum(datum, cd->columnType));
1391  } else {
1392  addArray(NullArray(cd->columnType));
1393  }
1394  }
1395  break;
1396  case kPOINT:
1397  case kLINESTRING:
1398  case kPOLYGON:
1399  case kMULTIPOLYGON:
1400  if (is_null) {
1401  if (cd->columnType.get_notnull()) {
1402  throw std::runtime_error("NULL for column " + cd->columnName);
1403  }
1404  addGeoString(std::string());
1405  } else {
1406  addGeoString(datum.val.str_val);
1407  }
1408  break;
1409  default:
1410  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1411  }
1412 }
1413 
1414 bool importGeoFromLonLat(double lon,
1415  double lat,
1416  std::vector<double>& coords,
1417  SQLTypeInfo& ti) {
1418  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1419  return false;
1420  }
1421  if (ti.transforms()) {
1422  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1423  if (!pt.transform(ti)) {
1424  return false;
1425  }
1426  pt.getColumns(coords);
1427  return true;
1428  }
1429  coords.push_back(lon);
1430  coords.push_back(lat);
1431  return true;
1432 }
1433 
1435  const Catalog_Namespace::Catalog& catalog,
1436  const ColumnDescriptor* cd,
1437  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1438  size_t& col_idx,
1439  std::vector<double>& coords,
1440  std::vector<double>& bounds,
1441  std::vector<int>& ring_sizes,
1442  std::vector<int>& poly_rings,
1443  int render_group,
1444  const int64_t replicate_count) {
1445  const auto col_ti = cd->columnType;
1446  const auto col_type = col_ti.get_type();
1447  auto columnId = cd->columnId;
1448  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1449  bool is_null_geo = false;
1450  bool is_null_point = false;
1451  if (!col_ti.get_notnull()) {
1452  // Check for NULL geo
1453  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1454  is_null_point = true;
1455  coords.clear();
1456  }
1457  is_null_geo = coords.empty();
1458  if (is_null_point) {
1459  coords.push_back(NULL_ARRAY_DOUBLE);
1460  coords.push_back(NULL_DOUBLE);
1461  // Treating POINT coords as notnull, need to store actual encoding
1462  // [un]compressed+[not]null
1463  is_null_geo = false;
1464  }
1465  }
1466  TDatum tdd_coords;
1467  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1468  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1469  // in a fixlen array, compressed and uncompressed.
1470  if (!is_null_geo) {
1471  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1472  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1473  for (auto cc : compressed_coords) {
1474  tdd_coords.val.arr_val.emplace_back();
1475  tdd_coords.val.arr_val.back().val.int_val = cc;
1476  }
1477  }
1478  tdd_coords.is_null = is_null_geo;
1479  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1480 
1481  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1482  // Create ring_sizes array value and add it to the physical column
1483  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1484  TDatum tdd_ring_sizes;
1485  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1486  if (!is_null_geo) {
1487  for (auto ring_size : ring_sizes) {
1488  tdd_ring_sizes.val.arr_val.emplace_back();
1489  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1490  }
1491  }
1492  tdd_ring_sizes.is_null = is_null_geo;
1493  import_buffers[col_idx++]->add_value(
1494  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1495  }
1496 
1497  if (col_type == kMULTIPOLYGON) {
1498  // Create poly_rings array value and add it to the physical column
1499  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1500  TDatum tdd_poly_rings;
1501  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1502  if (!is_null_geo) {
1503  for (auto num_rings : poly_rings) {
1504  tdd_poly_rings.val.arr_val.emplace_back();
1505  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1506  }
1507  }
1508  tdd_poly_rings.is_null = is_null_geo;
1509  import_buffers[col_idx++]->add_value(
1510  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1511  }
1512 
1513  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1514  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1515  TDatum tdd_bounds;
1516  tdd_bounds.val.arr_val.reserve(bounds.size());
1517  if (!is_null_geo) {
1518  for (auto b : bounds) {
1519  tdd_bounds.val.arr_val.emplace_back();
1520  tdd_bounds.val.arr_val.back().val.real_val = b;
1521  }
1522  }
1523  tdd_bounds.is_null = is_null_geo;
1524  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1525  }
1526 
1527  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1528  // Create render_group value and add it to the physical column
1529  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1530  TDatum td_render_group;
1531  td_render_group.val.int_val = render_group;
1532  td_render_group.is_null = is_null_geo;
1533  import_buffers[col_idx++]->add_value(
1534  cd_render_group, td_render_group, false, replicate_count);
1535  }
1536 }
1537 
1539  const Catalog_Namespace::Catalog& catalog,
1540  const ColumnDescriptor* cd,
1541  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1542  size_t& col_idx,
1543  std::vector<std::vector<double>>& coords_column,
1544  std::vector<std::vector<double>>& bounds_column,
1545  std::vector<std::vector<int>>& ring_sizes_column,
1546  std::vector<std::vector<int>>& poly_rings_column,
1547  int render_group,
1548  const int64_t replicate_count) {
1549  const auto col_ti = cd->columnType;
1550  const auto col_type = col_ti.get_type();
1551  auto columnId = cd->columnId;
1552 
1553  auto coords_row_count = coords_column.size();
1554  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1555  for (auto coords : coords_column) {
1556  bool is_null_geo = false;
1557  bool is_null_point = false;
1558  if (!col_ti.get_notnull()) {
1559  // Check for NULL geo
1560  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1561  is_null_point = true;
1562  coords.clear();
1563  }
1564  is_null_geo = coords.empty();
1565  if (is_null_point) {
1566  coords.push_back(NULL_ARRAY_DOUBLE);
1567  coords.push_back(NULL_DOUBLE);
1568  // Treating POINT coords as notnull, need to store actual encoding
1569  // [un]compressed+[not]null
1570  is_null_geo = false;
1571  }
1572  }
1573  std::vector<TDatum> td_coords_data;
1574  if (!is_null_geo) {
1575  std::vector<uint8_t> compressed_coords =
1576  Geospatial::compress_coords(coords, col_ti);
1577  for (auto cc : compressed_coords) {
1578  TDatum td_byte;
1579  td_byte.val.int_val = cc;
1580  td_coords_data.push_back(td_byte);
1581  }
1582  }
1583  TDatum tdd_coords;
1584  tdd_coords.val.arr_val = td_coords_data;
1585  tdd_coords.is_null = is_null_geo;
1586  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1587  }
1588  col_idx++;
1589 
1590  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1591  if (ring_sizes_column.size() != coords_row_count) {
1592  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1593  }
1594  // Create ring_sizes array value and add it to the physical column
1595  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1596  for (auto ring_sizes : ring_sizes_column) {
1597  bool is_null_geo = false;
1598  if (!col_ti.get_notnull()) {
1599  // Check for NULL geo
1600  is_null_geo = ring_sizes.empty();
1601  }
1602  std::vector<TDatum> td_ring_sizes;
1603  for (auto ring_size : ring_sizes) {
1604  TDatum td_ring_size;
1605  td_ring_size.val.int_val = ring_size;
1606  td_ring_sizes.push_back(td_ring_size);
1607  }
1608  TDatum tdd_ring_sizes;
1609  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1610  tdd_ring_sizes.is_null = is_null_geo;
1611  import_buffers[col_idx]->add_value(
1612  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1613  }
1614  col_idx++;
1615  }
1616 
1617  if (col_type == kMULTIPOLYGON) {
1618  if (poly_rings_column.size() != coords_row_count) {
1619  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1620  }
1621  // Create poly_rings array value and add it to the physical column
1622  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1623  for (auto poly_rings : poly_rings_column) {
1624  bool is_null_geo = false;
1625  if (!col_ti.get_notnull()) {
1626  // Check for NULL geo
1627  is_null_geo = poly_rings.empty();
1628  }
1629  std::vector<TDatum> td_poly_rings;
1630  for (auto num_rings : poly_rings) {
1631  TDatum td_num_rings;
1632  td_num_rings.val.int_val = num_rings;
1633  td_poly_rings.push_back(td_num_rings);
1634  }
1635  TDatum tdd_poly_rings;
1636  tdd_poly_rings.val.arr_val = td_poly_rings;
1637  tdd_poly_rings.is_null = is_null_geo;
1638  import_buffers[col_idx]->add_value(
1639  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1640  }
1641  col_idx++;
1642  }
1643 
1644  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1645  if (bounds_column.size() != coords_row_count) {
1646  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1647  }
1648  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1649  for (auto bounds : bounds_column) {
1650  bool is_null_geo = false;
1651  if (!col_ti.get_notnull()) {
1652  // Check for NULL geo
1653  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1654  }
1655  std::vector<TDatum> td_bounds_data;
1656  for (auto b : bounds) {
1657  TDatum td_double;
1658  td_double.val.real_val = b;
1659  td_bounds_data.push_back(td_double);
1660  }
1661  TDatum tdd_bounds;
1662  tdd_bounds.val.arr_val = td_bounds_data;
1663  tdd_bounds.is_null = is_null_geo;
1664  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1665  }
1666  col_idx++;
1667  }
1668 
1669  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1670  // Create render_group value and add it to the physical column
1671  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1672  TDatum td_render_group;
1673  td_render_group.val.int_val = render_group;
1674  td_render_group.is_null = false;
1675  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1676  import_buffers[col_idx]->add_value(
1677  cd_render_group, td_render_group, false, replicate_count);
1678  }
1679  col_idx++;
1680  }
1681 }
1682 
1683 namespace {
1684 
1685 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1686  const std::list<const ColumnDescriptor*>& col_descs) {
1687  // validate the columns
1688  // for now we can only explode into a single destination column
1689  // which must be of the child type (POLYGON, LINESTRING, POINT)
1690  int collection_col_idx = -1;
1691  int col_idx = 0;
1692  std::string collection_col_name;
1693  SQLTypes collection_child_type = kNULLT;
1694  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1695  auto const& cd = *cd_it;
1696  auto const col_type = cd->columnType.get_type();
1697  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1698  if (collection_col_idx >= 0) {
1699  throw std::runtime_error(
1700  "Explode Collections: Found more than one destination column");
1701  }
1702  collection_col_idx = col_idx;
1703  collection_child_type = col_type;
1704  collection_col_name = cd->columnName;
1705  }
1706  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1707  ++cd_it;
1708  }
1709  col_idx++;
1710  }
1711  if (collection_col_idx < 0) {
1712  throw std::runtime_error(
1713  "Explode Collections: Failed to find a supported column type to explode "
1714  "into");
1715  }
1716  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1717 }
1718 
1720  OGRGeometry* ogr_geometry,
1721  SQLTypes collection_child_type,
1722  const std::string& collection_col_name,
1723  size_t row_or_feature_idx,
1724  std::function<void(OGRGeometry*)> execute_import_lambda) {
1725  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1726  bool is_collection = false;
1727  switch (collection_child_type) {
1728  case kPOINT:
1729  switch (ogr_geometry_type) {
1730  case wkbMultiPoint:
1731  is_collection = true;
1732  break;
1733  case wkbPoint:
1734  break;
1735  default:
1736  throw std::runtime_error(
1737  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1738  }
1739  break;
1740  case kLINESTRING:
1741  switch (ogr_geometry_type) {
1742  case wkbMultiLineString:
1743  is_collection = true;
1744  break;
1745  case wkbLineString:
1746  break;
1747  default:
1748  throw std::runtime_error(
1749  "Explode Collections: Source geo type must be MULTILINESTRING or "
1750  "LINESTRING");
1751  }
1752  break;
1753  case kPOLYGON:
1754  switch (ogr_geometry_type) {
1755  case wkbMultiPolygon:
1756  is_collection = true;
1757  break;
1758  case wkbPolygon:
1759  break;
1760  default:
1761  throw std::runtime_error(
1762  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1763  }
1764  break;
1765  default:
1766  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1767  }
1768 
1769  int64_t us = 0LL;
1770 
1771  // explode or just import
1772  if (is_collection) {
1773  // cast to collection
1774  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1775  CHECK(collection_geometry);
1776 
1777 #if LOG_EXPLODE_COLLECTIONS
1778  // log number of children
1779  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1780  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1781  << " child rows";
1782 #endif
1783 
1784  // loop over children
1785  uint32_t child_geometry_count = 0;
1786  auto child_geometry_it = collection_geometry->begin();
1787  while (child_geometry_it != collection_geometry->end()) {
1788  // get and import this child
1789  OGRGeometry* import_geometry = *child_geometry_it;
1791  [&] { execute_import_lambda(import_geometry); });
1792 
1793  // next child
1794  child_geometry_it++;
1795  child_geometry_count++;
1796  }
1797  } else {
1798  // import non-collection row just once
1800  [&] { execute_import_lambda(ogr_geometry); });
1801  }
1802 
1803  // done
1804  return us;
1805 }
1806 
1807 } // namespace
1808 
1810  int thread_id,
1811  Importer* importer,
1812  std::unique_ptr<char[]> scratch_buffer,
1813  size_t begin_pos,
1814  size_t end_pos,
1815  size_t total_size,
1816  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1817  size_t first_row_index_this_buffer) {
1819  int64_t total_get_row_time_us = 0;
1820  int64_t total_str_to_val_time_us = 0;
1821  CHECK(scratch_buffer);
1822  auto buffer = scratch_buffer.get();
1823  auto load_ms = measure<>::execution([]() {});
1824  auto ms = measure<>::execution([&]() {
1825  const CopyParams& copy_params = importer->get_copy_params();
1826  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1827  size_t begin =
1828  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
1829  const char* thread_buf = buffer + begin_pos + begin;
1830  const char* thread_buf_end = buffer + end_pos;
1831  const char* buf_end = buffer + total_size;
1832  bool try_single_thread = false;
1833  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1834  importer->get_import_buffers(thread_id);
1836  int phys_cols = 0;
1837  int point_cols = 0;
1838  for (const auto cd : col_descs) {
1839  const auto& col_ti = cd->columnType;
1840  phys_cols += col_ti.get_physical_cols();
1841  if (cd->columnType.get_type() == kPOINT) {
1842  point_cols++;
1843  }
1844  }
1845  auto num_cols = col_descs.size() - phys_cols;
1846  for (const auto& p : import_buffers) {
1847  p->clear();
1848  }
1849  std::vector<std::string_view> row;
1850  size_t row_index_plus_one = 0;
1851  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1852  row.clear();
1853  std::vector<std::unique_ptr<char[]>>
1854  tmp_buffers; // holds string w/ removed escape chars, etc
1855  if (DEBUG_TIMING) {
1858  thread_buf_end,
1859  buf_end,
1860  copy_params,
1861  importer->get_is_array(),
1862  row,
1863  tmp_buffers,
1864  try_single_thread);
1865  });
1866  total_get_row_time_us += us;
1867  } else {
1869  thread_buf_end,
1870  buf_end,
1871  copy_params,
1872  importer->get_is_array(),
1873  row,
1874  tmp_buffers,
1875  try_single_thread);
1876  }
1877  row_index_plus_one++;
1878  // Each POINT could consume two separate coords instead of a single WKT
1879  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1880  import_status.rows_rejected++;
1881  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1882  << row.size() << "): " << shared::printContainer(row);
1883  if (import_status.rows_rejected > copy_params.max_reject) {
1884  break;
1885  }
1886  continue;
1887  }
1888 
1889  //
1890  // lambda for importing a row (perhaps multiple times if exploding a collection)
1891  //
1892 
1893  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1894  size_t import_idx = 0;
1895  size_t col_idx = 0;
1896  try {
1897  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1898  auto cd = *cd_it;
1899  const auto& col_ti = cd->columnType;
1900 
1901  bool is_null =
1902  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1903  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1904  // So initially nullness may be missed and not passed to add_value,
1905  // which then might also check and still decide it's actually a NULL, e.g.
1906  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1907  // So "NULL" is not recognized as NULL but then it's not recognized as
1908  // a valid kINT, so it's a NULL after all.
1909  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1910 
1911  // Treating empty as NULL
1912  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1913  is_null = true;
1914  }
1915 
1916  if (col_ti.get_physical_cols() == 0) {
1917  // not geo
1918 
1919  import_buffers[col_idx]->add_value(
1920  cd, row[import_idx], is_null, copy_params);
1921 
1922  // next
1923  ++import_idx;
1924  ++col_idx;
1925  } else {
1926  // geo
1927 
1928  // store null string in the base column
1929  import_buffers[col_idx]->add_value(
1930  cd, copy_params.null_str, true, copy_params);
1931 
1932  // WKT from string we're not storing
1933  auto const& geo_string = row[import_idx];
1934 
1935  // next
1936  ++import_idx;
1937  ++col_idx;
1938 
1939  SQLTypes col_type = col_ti.get_type();
1940  CHECK(IS_GEO(col_type));
1941 
1942  std::vector<double> coords;
1943  std::vector<double> bounds;
1944  std::vector<int> ring_sizes;
1945  std::vector<int> poly_rings;
1946  int render_group = 0;
1947 
1948  // if this is a POINT column, and the field is not null, and
1949  // looks like a scalar numeric value (and not a hex blob)
1950  // attempt to import two columns as lon/lat (or lat/lon)
1951  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
1952  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
1953  geo_string[0] == '-') &&
1954  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
1955  double lon = std::atof(std::string(geo_string).c_str());
1956  double lat = NAN;
1957  auto lat_str = row[import_idx];
1958  ++import_idx;
1959  if (lat_str.size() > 0 &&
1960  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1961  lat = std::atof(std::string(lat_str).c_str());
1962  }
1963  // Swap coordinates if this table uses a reverse order: lat/lon
1964  if (!copy_params.lonlat) {
1965  std::swap(lat, lon);
1966  }
1967  // TODO: should check if POINT column should have been declared with
1968  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1969  // throw std::runtime_error("POINT column " + cd->columnName + " is
1970  // not WGS84, cannot insert lon/lat");
1971  // }
1972  SQLTypeInfo import_ti{col_ti};
1973  if (copy_params.file_type == FileType::DELIMITED &&
1974  import_ti.get_output_srid() == 4326) {
1975  auto srid0 = copy_params.source_srid;
1976  if (srid0 > 0) {
1977  // srid0 -> 4326 transform is requested on import
1978  import_ti.set_input_srid(srid0);
1979  }
1980  }
1981  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
1982  throw std::runtime_error(
1983  "Cannot read lon/lat to insert into POINT column " +
1984  cd->columnName);
1985  }
1986  } else {
1987  // import it
1988  SQLTypeInfo import_ti{col_ti};
1989  if (copy_params.file_type == FileType::DELIMITED &&
1990  import_ti.get_output_srid() == 4326) {
1991  auto srid0 = copy_params.source_srid;
1992  if (srid0 > 0) {
1993  // srid0 -> 4326 transform is requested on import
1994  import_ti.set_input_srid(srid0);
1995  }
1996  }
1997  if (is_null) {
1998  if (col_ti.get_notnull()) {
1999  throw std::runtime_error("NULL geo for column " + cd->columnName);
2000  }
2002  import_ti,
2003  coords,
2004  bounds,
2005  ring_sizes,
2006  poly_rings,
2007  PROMOTE_POLYGON_TO_MULTIPOLYGON);
2008  } else {
2009  if (import_geometry) {
2010  // geometry already exploded
2012  import_geometry,
2013  import_ti,
2014  coords,
2015  bounds,
2016  ring_sizes,
2017  poly_rings,
2018  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2019  std::string msg =
2020  "Failed to extract valid geometry from exploded row " +
2021  std::to_string(first_row_index_this_buffer +
2022  row_index_plus_one) +
2023  " for column " + cd->columnName;
2024  throw std::runtime_error(msg);
2025  }
2026  } else {
2027  // extract geometry directly from WKT
2029  std::string(geo_string),
2030  import_ti,
2031  coords,
2032  bounds,
2033  ring_sizes,
2034  poly_rings,
2035  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2036  std::string msg = "Failed to extract valid geometry from row " +
2037  std::to_string(first_row_index_this_buffer +
2038  row_index_plus_one) +
2039  " for column " + cd->columnName;
2040  throw std::runtime_error(msg);
2041  }
2042  }
2043 
2044  // validate types
2045  if (col_type != import_ti.get_type()) {
2046  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2047  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2048  col_type == SQLTypes::kMULTIPOLYGON)) {
2049  throw std::runtime_error(
2050  "Imported geometry doesn't match the type of column " +
2051  cd->columnName);
2052  }
2053  }
2054  }
2055 
2056  // assign render group?
2057  if (columnIdToRenderGroupAnalyzerMap.size()) {
2058  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2059  if (ring_sizes.size()) {
2060  // get a suitable render group for these poly coords
2061  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2062  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2063  render_group =
2064  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2065  } else {
2066  // empty poly
2067  render_group = -1;
2068  }
2069  }
2070  }
2071  }
2072 
2073  // import extracted geo
2075  cd,
2076  import_buffers,
2077  col_idx,
2078  coords,
2079  bounds,
2080  ring_sizes,
2081  poly_rings,
2082  render_group);
2083 
2084  // skip remaining physical columns
2085  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2086  ++cd_it;
2087  }
2088  }
2089  }
2090  import_status.rows_completed++;
2091  } catch (const std::exception& e) {
2092  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2093  import_buffers[col_idx_to_pop]->pop_value();
2094  }
2095  import_status.rows_rejected++;
2096  LOG(ERROR) << "Input exception thrown: " << e.what()
2097  << ". Row discarded. Data: " << shared::printContainer(row);
2098  }
2099  };
2100 
2101  if (copy_params.geo_explode_collections) {
2102  // explode and import
2103  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2104  explode_collections_step1(col_descs);
2105  // pull out the collection WKT or WKB hex
2106  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2107  auto const& collection_geo_string = row[collection_col_idx];
2108  // convert to OGR
2109  OGRGeometry* ogr_geometry = nullptr;
2110  ScopeGuard destroy_ogr_geometry = [&] {
2111  if (ogr_geometry) {
2112  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2113  }
2114  };
2116  std::string(collection_geo_string));
2117  // do the explode and import
2118  us = explode_collections_step2(ogr_geometry,
2119  collection_child_type,
2120  collection_col_name,
2121  first_row_index_this_buffer + row_index_plus_one,
2122  execute_import_row);
2123  } else {
2124  // import non-collection row just once
2126  [&] { execute_import_row(nullptr); });
2127  }
2128  total_str_to_val_time_us += us;
2129  } // end thread
2130  if (import_status.rows_completed > 0) {
2131  load_ms = measure<>::execution(
2132  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2133  }
2134  });
2135  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2136  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2137  << import_status.rows_completed << " rows inserted in "
2138  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2139  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2140  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2141  << "sec" << std::endl;
2142  }
2143 
2144  import_status.thread_id = thread_id;
2145  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2146 
2147  return import_status;
2148 }
2149 
2151  int thread_id,
2152  Importer* importer,
2153  OGRSpatialReference* poGeographicSR,
2154  const FeaturePtrVector& features,
2155  size_t firstFeature,
2156  size_t numFeatures,
2157  const FieldNameToIndexMapType& fieldNameToIndexMap,
2158  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2159  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2161  const CopyParams& copy_params = importer->get_copy_params();
2162  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2163  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2164  importer->get_import_buffers(thread_id);
2165 
2166  for (const auto& p : import_buffers) {
2167  p->clear();
2168  }
2169 
2170  auto convert_timer = timer_start();
2171 
2172  // we create this on the fly based on the first feature's SR
2173  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2174 
2175  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2176  if (!features[iFeature]) {
2177  continue;
2178  }
2179 
2180  // get this feature's geometry
2181  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2182  if (pGeometry) {
2183  // for geodatabase, we need to consider features with no geometry
2184  // as we still want to create a table, even if it has no geo column
2185 
2186  // transform it
2187  // avoid GDAL error if not transformable
2188  auto geometry_sr = pGeometry->getSpatialReference();
2189  if (geometry_sr) {
2190  // create an OGRCoordinateTransformation (CT) on the fly
2191  // we must assume that all geo in this file will have
2192  // the same source SR, so the CT will be valid for all
2193  // transforming to a reusable CT is faster than to an SR
2194  if (coordinate_transformation == nullptr) {
2195  coordinate_transformation.reset(
2196  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2197  if (coordinate_transformation == nullptr) {
2198  throw std::runtime_error(
2199  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2200  }
2201  }
2202  pGeometry->transform(coordinate_transformation.get());
2203  }
2204  }
2205 
2206  //
2207  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2208  //
2209 
2210  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2211  size_t col_idx = 0;
2212  try {
2213  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2214  auto cd = *cd_it;
2215 
2216  // is this a geo column?
2217  const auto& col_ti = cd->columnType;
2218  if (col_ti.is_geometry()) {
2219  // Note that this assumes there is one and only one geo column in the table.
2220  // Currently, the importer only supports reading a single geospatial feature
2221  // from an input shapefile / geojson file, but this code will need to be
2222  // modified if that changes
2223  SQLTypes col_type = col_ti.get_type();
2224 
2225  // store null string in the base column
2226  import_buffers[col_idx]->add_value(
2227  cd, copy_params.null_str, true, copy_params);
2228  ++col_idx;
2229 
2230  // the data we now need to extract for the other columns
2231  std::vector<double> coords;
2232  std::vector<double> bounds;
2233  std::vector<int> ring_sizes;
2234  std::vector<int> poly_rings;
2235  int render_group = 0;
2236 
2237  // extract it
2238  SQLTypeInfo import_ti{col_ti};
2239  bool is_null_geo = !import_geometry;
2240  if (is_null_geo) {
2241  if (col_ti.get_notnull()) {
2242  throw std::runtime_error("NULL geo for column " + cd->columnName);
2243  }
2245  import_ti,
2246  coords,
2247  bounds,
2248  ring_sizes,
2249  poly_rings,
2250  PROMOTE_POLYGON_TO_MULTIPOLYGON);
2251  } else {
2253  import_geometry,
2254  import_ti,
2255  coords,
2256  bounds,
2257  ring_sizes,
2258  poly_rings,
2259  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2260  std::string msg = "Failed to extract valid geometry from feature " +
2261  std::to_string(firstFeature + iFeature + 1) +
2262  " for column " + cd->columnName;
2263  throw std::runtime_error(msg);
2264  }
2265 
2266  // validate types
2267  if (col_type != import_ti.get_type()) {
2268  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2269  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2270  col_type == SQLTypes::kMULTIPOLYGON)) {
2271  throw std::runtime_error(
2272  "Imported geometry doesn't match the type of column " +
2273  cd->columnName);
2274  }
2275  }
2276  }
2277 
2278  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2279  if (ring_sizes.size()) {
2280  // get a suitable render group for these poly coords
2281  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2282  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2283  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2284  } else {
2285  // empty poly
2286  render_group = -1;
2287  }
2288  }
2289 
2290  // create coords array value and add it to the physical column
2291  ++cd_it;
2292  auto cd_coords = *cd_it;
2293  std::vector<TDatum> td_coord_data;
2294  if (!is_null_geo) {
2295  std::vector<uint8_t> compressed_coords =
2296  Geospatial::compress_coords(coords, col_ti);
2297  for (auto cc : compressed_coords) {
2298  TDatum td_byte;
2299  td_byte.val.int_val = cc;
2300  td_coord_data.push_back(td_byte);
2301  }
2302  }
2303  TDatum tdd_coords;
2304  tdd_coords.val.arr_val = td_coord_data;
2305  tdd_coords.is_null = is_null_geo;
2306  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2307  ++col_idx;
2308 
2309  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2310  // Create ring_sizes array value and add it to the physical column
2311  ++cd_it;
2312  auto cd_ring_sizes = *cd_it;
2313  std::vector<TDatum> td_ring_sizes;
2314  if (!is_null_geo) {
2315  for (auto ring_size : ring_sizes) {
2316  TDatum td_ring_size;
2317  td_ring_size.val.int_val = ring_size;
2318  td_ring_sizes.push_back(td_ring_size);
2319  }
2320  }
2321  TDatum tdd_ring_sizes;
2322  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2323  tdd_ring_sizes.is_null = is_null_geo;
2324  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2325  ++col_idx;
2326  }
2327 
2328  if (col_type == kMULTIPOLYGON) {
2329  // Create poly_rings array value and add it to the physical column
2330  ++cd_it;
2331  auto cd_poly_rings = *cd_it;
2332  std::vector<TDatum> td_poly_rings;
2333  if (!is_null_geo) {
2334  for (auto num_rings : poly_rings) {
2335  TDatum td_num_rings;
2336  td_num_rings.val.int_val = num_rings;
2337  td_poly_rings.push_back(td_num_rings);
2338  }
2339  }
2340  TDatum tdd_poly_rings;
2341  tdd_poly_rings.val.arr_val = td_poly_rings;
2342  tdd_poly_rings.is_null = is_null_geo;
2343  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2344  ++col_idx;
2345  }
2346 
2347  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2348  col_type == kMULTIPOLYGON) {
2349  // Create bounds array value and add it to the physical column
2350  ++cd_it;
2351  auto cd_bounds = *cd_it;
2352  std::vector<TDatum> td_bounds_data;
2353  if (!is_null_geo) {
2354  for (auto b : bounds) {
2355  TDatum td_double;
2356  td_double.val.real_val = b;
2357  td_bounds_data.push_back(td_double);
2358  }
2359  }
2360  TDatum tdd_bounds;
2361  tdd_bounds.val.arr_val = td_bounds_data;
2362  tdd_bounds.is_null = is_null_geo;
2363  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2364  ++col_idx;
2365  }
2366 
2367  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2368  // Create render_group value and add it to the physical column
2369  ++cd_it;
2370  auto cd_render_group = *cd_it;
2371  TDatum td_render_group;
2372  td_render_group.val.int_val = render_group;
2373  td_render_group.is_null = is_null_geo;
2374  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2375  ++col_idx;
2376  }
2377  } else {
2378  // regular column
2379  // pull from GDAL metadata
2380  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2381  CHECK(cit != columnNameToSourceNameMap.end());
2382  auto const& field_name = cit->second;
2383 
2384  auto const fit = fieldNameToIndexMap.find(field_name);
2385  CHECK(fit != fieldNameToIndexMap.end());
2386  auto const& field_index = fit->second;
2387  CHECK(field_index < fieldNameToIndexMap.size());
2388 
2389  auto const& feature = features[iFeature];
2390 
2391  auto field_defn = feature->GetFieldDefnRef(field_index);
2392  CHECK(field_defn);
2393 
2394  // OGRFeature::GetFieldAsString() can only return 80 characters
2395  // so for array columns, we are obliged to fetch the actual values
2396  // and construct the concatenated string ourselves
2397 
2398  std::string value_string;
2399  int array_index = 0, array_size = 0;
2400 
2401  auto stringify_numeric_list = [&](auto* values) {
2402  value_string = "{";
2403  while (array_index < array_size) {
2404  auto separator = (array_index > 0) ? "," : "";
2405  value_string += separator + std::to_string(values[array_index]);
2406  array_index++;
2407  }
2408  value_string += "}";
2409  };
2410 
2411  auto field_type = field_defn->GetType();
2412  switch (field_type) {
2413  case OFTInteger:
2414  case OFTInteger64:
2415  case OFTReal:
2416  case OFTString:
2417  case OFTBinary:
2418  case OFTDate:
2419  case OFTTime:
2420  case OFTDateTime: {
2421  value_string = feature->GetFieldAsString(field_index);
2422  } break;
2423  case OFTIntegerList: {
2424  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2425  stringify_numeric_list(values);
2426  } break;
2427  case OFTInteger64List: {
2428  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2429  stringify_numeric_list(values);
2430  } break;
2431  case OFTRealList: {
2432  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2433  stringify_numeric_list(values);
2434  } break;
2435  case OFTStringList: {
2436  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2437  value_string = "{";
2438  if (array_of_strings) {
2439  while (auto* this_string = array_of_strings[array_index]) {
2440  auto separator = (array_index > 0) ? "," : "";
2441  value_string += separator + std::string(this_string);
2442  array_index++;
2443  }
2444  }
2445  value_string += "}";
2446  } break;
2447  default:
2448  throw std::runtime_error("Unsupported geo file field type (" +
2449  std::to_string(static_cast<int>(field_type)) +
2450  ")");
2451  }
2452 
2453  static CopyParams default_copy_params;
2454  import_buffers[col_idx]->add_value(
2455  cd, value_string, false, default_copy_params);
2456  ++col_idx;
2457  }
2458  }
2459  import_status.rows_completed++;
2460  } catch (const std::exception& e) {
2461  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2462  import_buffers[col_idx_to_pop]->pop_value();
2463  }
2464  import_status.rows_rejected++;
2465  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2466  }
2467  };
2468 
2469  if (pGeometry && copy_params.geo_explode_collections) {
2470  // explode and import
2471  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2472  explode_collections_step1(col_descs);
2473  explode_collections_step2(pGeometry,
2474  collection_child_type,
2475  collection_col_name,
2476  firstFeature + iFeature + 1,
2477  execute_import_feature);
2478  } else {
2479  // import non-collection or null feature just once
2480  execute_import_feature(pGeometry);
2481  }
2482  } // end features
2483 
2484  float convert_ms =
2485  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2486  convert_timer)) /
2487  1000.0f;
2488 
2489  float load_ms = 0.0f;
2490  if (import_status.rows_completed > 0) {
2491  auto load_timer = timer_start();
2492  importer->load(import_buffers, import_status.rows_completed);
2493  load_ms =
2494  float(
2495  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2496  load_timer)) /
2497  1000.0f;
2498  }
2499 
2500  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2501  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2502  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2503  }
2504 
2505  import_status.thread_id = thread_id;
2506 
2507  if (DEBUG_TIMING) {
2508  LOG(INFO) << "DEBUG: Total "
2509  << float(timer_stop<std::chrono::steady_clock::time_point,
2510  std::chrono::microseconds>(convert_timer)) /
2511  1000.0f
2512  << "ms";
2513  }
2514 
2515  return import_status;
2516 }
2517 
2519  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2520  size_t row_count) {
2521  return loadImpl(import_buffers, row_count, false);
2522 }
2523 
2524 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2525  size_t row_count) {
2526  return loadImpl(import_buffers, row_count, true);
2527 }
2528 
2529 namespace {
2530 
2531 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2532  const auto& ti = import_buffer.getTypeInfo();
2533  const int8_t* values_buffer{nullptr};
2534  if (ti.is_string()) {
2535  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2536  values_buffer = import_buffer.getStringDictBuffer();
2537  } else {
2538  values_buffer = import_buffer.getAsBytes();
2539  }
2540  CHECK(values_buffer);
2541  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2542  switch (logical_size) {
2543  case 1: {
2544  return values_buffer[index];
2545  }
2546  case 2: {
2547  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2548  }
2549  case 4: {
2550  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2551  }
2552  case 8: {
2553  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2554  }
2555  default:
2556  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2557  }
2558  UNREACHABLE();
2559  return 0;
2560 }
2561 
2562 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2563  const auto& ti = import_buffer.getTypeInfo();
2564  CHECK_EQ(kFLOAT, ti.get_type());
2565  const auto values_buffer = import_buffer.getAsBytes();
2566  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2567 }
2568 
2569 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2570  const auto& ti = import_buffer.getTypeInfo();
2571  CHECK_EQ(kDOUBLE, ti.get_type());
2572  const auto values_buffer = import_buffer.getAsBytes();
2573  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2574 }
2575 
2576 } // namespace
2577 
2578 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2579  std::vector<size_t>& all_shard_row_counts,
2580  const OneShardBuffers& import_buffers,
2581  const size_t row_count,
2582  const size_t shard_count) {
2583  all_shard_row_counts.resize(shard_count);
2584  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2585  all_shard_import_buffers.emplace_back();
2586  for (const auto& typed_import_buffer : import_buffers) {
2587  all_shard_import_buffers.back().emplace_back(
2588  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2589  typed_import_buffer->getStringDictionary()));
2590  }
2591  }
2592  CHECK_GT(table_desc_->shardedColumnId, 0);
2593  int col_idx{0};
2594  const ColumnDescriptor* shard_col_desc{nullptr};
2595  for (const auto col_desc : column_descs_) {
2596  ++col_idx;
2597  if (col_idx == table_desc_->shardedColumnId) {
2598  shard_col_desc = col_desc;
2599  break;
2600  }
2601  }
2602  CHECK(shard_col_desc);
2603  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2604  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2605  const auto& shard_col_ti = shard_col_desc->columnType;
2606  CHECK(shard_col_ti.is_integer() ||
2607  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2608  shard_col_ti.is_time());
2609  if (shard_col_ti.is_string()) {
2610  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2611  CHECK(payloads_ptr);
2612  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2613  }
2614 
2615  // for each replicated (alter added) columns, number of rows in a shard is
2616  // inferred from that of the sharding column, not simply evenly distributed.
2617  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2618  // Here the loop count is overloaded. For normal imports, we loop thru all
2619  // input values (rows), so the loop count is the number of input rows.
2620  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2621  // all shards, so the loop count is the number of shards.
2622  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2623  for (size_t i = 0; i < loop_count; ++i) {
2624  const size_t shard =
2625  getReplicating()
2626  ? i
2627  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2628  auto& shard_output_buffers = all_shard_import_buffers[shard];
2629 
2630  // when replicate a column, populate 'rows' to all shards only once
2631  // and its value is fetch from the first and the single row
2632  const auto row_index = getReplicating() ? 0 : i;
2633 
2634  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2635  const auto& input_buffer = import_buffers[col_idx];
2636  const auto& col_ti = input_buffer->getTypeInfo();
2637  const auto type =
2638  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2639 
2640  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2641  // count. and, bypass non-replicated column.
2642  if (getReplicating()) {
2643  if (input_buffer->get_replicate_count() > 0) {
2644  shard_output_buffers[col_idx]->set_replicate_count(
2645  shard_tds[shard]->fragmenter->getNumRows());
2646  } else {
2647  continue;
2648  }
2649  }
2650 
2651  switch (type) {
2652  case kBOOLEAN:
2653  shard_output_buffers[col_idx]->addBoolean(
2654  int_value_at(*input_buffer, row_index));
2655  break;
2656  case kTINYINT:
2657  shard_output_buffers[col_idx]->addTinyint(
2658  int_value_at(*input_buffer, row_index));
2659  break;
2660  case kSMALLINT:
2661  shard_output_buffers[col_idx]->addSmallint(
2662  int_value_at(*input_buffer, row_index));
2663  break;
2664  case kINT:
2665  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2666  break;
2667  case kBIGINT:
2668  shard_output_buffers[col_idx]->addBigint(
2669  int_value_at(*input_buffer, row_index));
2670  break;
2671  case kFLOAT:
2672  shard_output_buffers[col_idx]->addFloat(
2673  float_value_at(*input_buffer, row_index));
2674  break;
2675  case kDOUBLE:
2676  shard_output_buffers[col_idx]->addDouble(
2677  double_value_at(*input_buffer, row_index));
2678  break;
2679  case kTEXT:
2680  case kVARCHAR:
2681  case kCHAR: {
2682  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2683  shard_output_buffers[col_idx]->addString(
2684  (*input_buffer->getStringBuffer())[row_index]);
2685  break;
2686  }
2687  case kTIME:
2688  case kTIMESTAMP:
2689  case kDATE:
2690  shard_output_buffers[col_idx]->addBigint(
2691  int_value_at(*input_buffer, row_index));
2692  break;
2693  case kARRAY:
2694  if (IS_STRING(col_ti.get_subtype())) {
2695  CHECK(input_buffer->getStringArrayBuffer());
2696  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2697  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2698  shard_output_buffers[col_idx]->addStringArray(input_arr);
2699  } else {
2700  shard_output_buffers[col_idx]->addArray(
2701  (*input_buffer->getArrayBuffer())[row_index]);
2702  }
2703  break;
2704  case kPOINT:
2705  case kLINESTRING:
2706  case kPOLYGON:
2707  case kMULTIPOLYGON: {
2708  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2709  shard_output_buffers[col_idx]->addGeoString(
2710  (*input_buffer->getGeoStringBuffer())[row_index]);
2711  break;
2712  }
2713  default:
2714  CHECK(false);
2715  }
2716  }
2717  ++all_shard_row_counts[shard];
2718  // when replicating a column, row count of a shard == replicate count of the column on
2719  // the shard
2720  if (getReplicating()) {
2721  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2722  }
2723  }
2724 }
2725 
2727  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2728  size_t row_count,
2729  bool checkpoint) {
2730  if (load_callback_) {
2731  auto data_blocks = get_data_block_pointers(import_buffers);
2732  return load_callback_(import_buffers, data_blocks, row_count);
2733  }
2734  if (table_desc_->nShards) {
2735  std::vector<OneShardBuffers> all_shard_import_buffers;
2736  std::vector<size_t> all_shard_row_counts;
2737  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2738  distributeToShards(all_shard_import_buffers,
2739  all_shard_row_counts,
2740  import_buffers,
2741  row_count,
2742  shard_tables.size());
2743  bool success = true;
2744  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2745  if (!all_shard_row_counts[shard_idx]) {
2746  continue;
2747  }
2748  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2749  all_shard_row_counts[shard_idx],
2750  shard_tables[shard_idx],
2751  checkpoint);
2752  }
2753  return success;
2754  }
2755  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2756 }
2757 
2758 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2759  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2760  std::vector<DataBlockPtr> result(import_buffers.size());
2761  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2762  encoded_data_block_ptrs_futures;
2763  // make all async calls to string dictionary here and then continue execution
2764  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2765  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2766  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2767  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2768  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2769 
2770  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2771  buf_idx,
2772  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2773  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2774  return import_buffers[buf_idx]->getStringDictBuffer();
2775  })));
2776  }
2777  }
2778 
2779  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2780  DataBlockPtr p;
2781  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2782  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2783  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2784  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2785  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2786  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2787  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2788  p.stringsPtr = string_payload_ptr;
2789  } else {
2790  // This condition means we have column which is ENCODED string. We already made
2791  // Async request to gain the encoded integer values above so we should skip this
2792  // iteration and continue.
2793  continue;
2794  }
2795  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2796  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2797  p.stringsPtr = geo_payload_ptr;
2798  } else {
2799  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2800  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2801  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2802  import_buffers[buf_idx]->addDictEncodedStringArray(
2803  *import_buffers[buf_idx]->getStringArrayBuffer());
2804  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2805  } else {
2806  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2807  }
2808  }
2809  result[buf_idx] = p;
2810  }
2811 
2812  // wait for the async requests we made for string dictionary
2813  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2814  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2815  }
2816  return result;
2817 }
2818 
2820  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2821  size_t row_count,
2822  const TableDescriptor* shard_table,
2823  bool checkpoint) {
2824  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2825  // patch insert_data with new column
2826  if (this->getReplicating()) {
2827  for (const auto& import_buff : import_buffers) {
2828  insert_data_.replicate_count = import_buff->get_replicate_count();
2829  }
2830  }
2831 
2832  Fragmenter_Namespace::InsertData ins_data(insert_data_);
2833  ins_data.numRows = row_count;
2834  bool success = true;
2835 
2836  ins_data.data = get_data_block_pointers(import_buffers);
2837 
2838  for (const auto& import_buffer : import_buffers) {
2839  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2840  }
2841 
2842  // release loader_lock so that in InsertOrderFragmenter::insertDat
2843  // we can have multiple threads sort/shuffle InsertData
2844  loader_lock.unlock();
2845 
2846  {
2847  try {
2848  if (checkpoint) {
2849  shard_table->fragmenter->insertData(ins_data);
2850  } else {
2851  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2852  }
2853  } catch (std::exception& e) {
2854  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2855  success = false;
2856  }
2857  }
2858  return success;
2859 }
2860 
2861 void Loader::dropColumns(const std::vector<int>& columnIds) {
2862  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2863  if (table_desc_->nShards) {
2864  table_descs = catalog_.getPhysicalTablesDescriptors(table_desc_);
2865  }
2866  for (auto table_desc : table_descs) {
2867  table_desc->fragmenter->dropColumns(columnIds);
2868  }
2869 }
2870 
2871 void Loader::init(const bool use_catalog_locks) {
2872  insert_data_.databaseId = catalog_.getCurrentDB().dbId;
2873  insert_data_.tableId = table_desc_->tableId;
2874  for (auto cd : column_descs_) {
2875  insert_data_.columnIds.push_back(cd->columnId);
2876  if (cd->columnType.get_compression() == kENCODING_DICT) {
2877  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2878  const auto dd = use_catalog_locks
2879  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2880  : catalog_.getMetadataForDictUnlocked(
2881  cd->columnType.get_comp_param(), true);
2882  CHECK(dd);
2883  dict_map_[cd->columnId] = dd->stringDict.get();
2884  }
2885  }
2886  insert_data_.numRows = 0;
2887 }
2888 
2890  detect_row_delimiter();
2891  split_raw_data();
2892  find_best_sqltypes_and_headers();
2893 }
2894 
2896  const bool decompressed) {
2897  if (!p_file) {
2898  p_file = fopen(file_path.c_str(), "rb");
2899  }
2900  if (!p_file) {
2901  throw std::runtime_error("failed to open file '" + file_path +
2902  "': " + strerror(errno));
2903  }
2904 
2905  // somehow clang does not support ext/stdio_filebuf.h, so
2906  // need to diy readline with customized copy_params.line_delim...
2907  std::string line;
2908  line.reserve(1 * 1024 * 1024);
2909  auto end_time = std::chrono::steady_clock::now() +
2910  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2911  try {
2912  while (!feof(p_file)) {
2913  int c;
2914  size_t n = 0;
2915  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2916  if (n++ >= line.capacity()) {
2917  break;
2918  }
2919  line += c;
2920  }
2921  if (0 == n) {
2922  break;
2923  }
2924  // remember the first line, which is possibly a header line, to
2925  // ignore identical header line(s) in 2nd+ files of a archive;
2926  // otherwise, 2nd+ header may be mistaken as an all-string row
2927  // and so be final column types.
2928  if (line1.empty()) {
2929  line1 = line;
2930  } else if (line == line1) {
2931  line.clear();
2932  continue;
2933  }
2934 
2935  raw_data += line;
2936  raw_data += copy_params.line_delim;
2937  line.clear();
2939  if (std::chrono::steady_clock::now() > end_time) {
2940  if (import_status.rows_completed > 10000) {
2941  break;
2942  }
2943  }
2944  }
2945  } catch (std::exception& e) {
2946  }
2947 
2948  // as if load truncated
2950  load_failed = true;
2951 
2952  fclose(p_file);
2953  p_file = nullptr;
2954  return import_status;
2955 }
2956 
2958  // this becomes analogous to Importer::import()
2960 }
2961 
2963  if (copy_params.delimiter == '\0') {
2964  copy_params.delimiter = ',';
2965  if (boost::filesystem::extension(file_path) == ".tsv") {
2966  copy_params.delimiter = '\t';
2967  }
2968  }
2969 }
2970 
2972  const char* buf = raw_data.c_str();
2973  const char* buf_end = buf + raw_data.size();
2974  bool try_single_thread = false;
2975  for (const char* p = buf; p < buf_end; p++) {
2976  std::vector<std::string> row;
2977  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2979  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2980  raw_rows.push_back(row);
2981  if (try_single_thread) {
2982  break;
2983  }
2984  }
2985  if (try_single_thread) {
2986  copy_params.threads = 1;
2987  raw_rows.clear();
2988  for (const char* p = buf; p < buf_end; p++) {
2989  std::vector<std::string> row;
2990  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2992  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2993  raw_rows.push_back(row);
2994  }
2995  }
2996 }
2997 
2998 template <class T>
2999 bool try_cast(const std::string& str) {
3000  try {
3001  boost::lexical_cast<T>(str);
3002  } catch (const boost::bad_lexical_cast& e) {
3003  return false;
3004  }
3005  return true;
3006 }
3007 
3008 SQLTypes Detector::detect_sqltype(const std::string& str) {
3009  SQLTypes type = kTEXT;
3010  if (try_cast<double>(str)) {
3011  type = kDOUBLE;
3012  /*if (try_cast<bool>(str)) {
3013  type = kBOOLEAN;
3014  }*/
3015  if (try_cast<int16_t>(str)) {
3016  type = kSMALLINT;
3017  } else if (try_cast<int32_t>(str)) {
3018  type = kINT;
3019  } else if (try_cast<int64_t>(str)) {
3020  type = kBIGINT;
3021  } else if (try_cast<float>(str)) {
3022  type = kFLOAT;
3023  }
3024  }
3025 
3026  // check for geo types
3027  if (type == kTEXT) {
3028  // convert to upper case
3029  std::string str_upper_case = str;
3030  std::transform(
3031  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3032 
3033  // then test for leading words
3034  if (str_upper_case.find("POINT") == 0) {
3035  type = kPOINT;
3036  } else if (str_upper_case.find("LINESTRING") == 0) {
3037  type = kLINESTRING;
3038  } else if (str_upper_case.find("POLYGON") == 0) {
3039  if (PROMOTE_POLYGON_TO_MULTIPOLYGON) {
3040  type = kMULTIPOLYGON;
3041  } else {
3042  type = kPOLYGON;
3043  }
3044  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3045  type = kMULTIPOLYGON;
3046  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3047  std::string::npos &&
3048  (str_upper_case.size() % 2) == 0) {
3049  // simple hex blob (two characters per byte, not uu-encode or base64)
3050  if (str_upper_case.size() >= 10) {
3051  // match WKB blobs for supported geometry types
3052  // the first byte specifies if the data is big-endian or little-endian
3053  // the next four bytes are the geometry type (1 = POINT etc.)
3054  // @TODO support eWKB, which has extra bits set in the geometry type
3055  auto first_five_bytes = str_upper_case.substr(0, 10);
3056  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3057  type = kPOINT;
3058  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3059  type = kLINESTRING;
3060  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3061  type = kPOLYGON;
3062  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3063  type = kMULTIPOLYGON;
3064  } else {
3065  // unsupported WKB type
3066  return type;
3067  }
3068  } else {
3069  // too short to be WKB
3070  return type;
3071  }
3072  }
3073  }
3074 
3075  // check for time types
3076  if (type == kTEXT) {
3077  // This won't match unix timestamp, since floats and ints were checked above.
3078  if (dateTimeParseOptional<kTIME>(str, 0)) {
3079  type = kTIME;
3080  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3081  type = kTIMESTAMP;
3082  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3083  type = kDATE;
3084  }
3085  }
3086 
3087  return type;
3088 }
3089 
3090 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3091  std::vector<SQLTypes> types(row.size());
3092  for (size_t i = 0; i < row.size(); i++) {
3093  types[i] = detect_sqltype(row[i]);
3094  }
3095  return types;
3096 }
3097 
3099  static std::array<int, kSQLTYPE_LAST> typeorder;
3100  typeorder[kCHAR] = 0;
3101  typeorder[kBOOLEAN] = 2;
3102  typeorder[kSMALLINT] = 3;
3103  typeorder[kINT] = 4;
3104  typeorder[kBIGINT] = 5;
3105  typeorder[kFLOAT] = 6;
3106  typeorder[kDOUBLE] = 7;
3107  typeorder[kTIMESTAMP] = 8;
3108  typeorder[kTIME] = 9;
3109  typeorder[kDATE] = 10;
3110  typeorder[kPOINT] = 11;
3111  typeorder[kLINESTRING] = 11;
3112  typeorder[kPOLYGON] = 11;
3113  typeorder[kMULTIPOLYGON] = 11;
3114  typeorder[kTEXT] = 12;
3115 
3116  // note: b < a instead of a < b because the map is ordered most to least restrictive
3117  return typeorder[b] < typeorder[a];
3118 }
3119 
3121  best_sqltypes = find_best_sqltypes(raw_rows.begin() + 1, raw_rows.end(), copy_params);
3122  best_encodings =
3123  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3124  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3125  switch (copy_params.has_header) {
3127  has_headers = detect_headers(head_types, best_sqltypes);
3128  if (has_headers) {
3130  } else {
3132  }
3133  break;
3135  has_headers = false;
3136  break;
3138  has_headers = true;
3139  break;
3140  }
3141 }
3142 
3144  best_sqltypes = find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3145 }
3146 
3147 std::vector<SQLTypes> Detector::find_best_sqltypes(
3148  const std::vector<std::vector<std::string>>& raw_rows,
3149  const CopyParams& copy_params) {
3150  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3151 }
3152 
3153 std::vector<SQLTypes> Detector::find_best_sqltypes(
3154  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3155  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3156  const CopyParams& copy_params) {
3157  if (raw_rows.size() < 1) {
3158  throw std::runtime_error("No rows found in: " +
3159  boost::filesystem::basename(file_path));
3160  }
3161  auto end_time = std::chrono::steady_clock::now() + timeout;
3162  size_t num_cols = raw_rows.front().size();
3163  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3164  std::vector<size_t> non_null_col_counts(num_cols, 0);
3165  for (auto row = row_begin; row != row_end; row++) {
3166  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3167  best_types.push_back(kCHAR);
3168  non_null_col_counts.push_back(0);
3169  }
3170  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3171  // do not count nulls
3172  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3173  continue;
3174  }
3175  SQLTypes t = detect_sqltype(row->at(col_idx));
3176  non_null_col_counts[col_idx]++;
3177  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3178  best_types[col_idx] = t;
3179  }
3180  }
3181  if (std::chrono::steady_clock::now() > end_time) {
3182  break;
3183  }
3184  }
3185  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3186  // if we don't have any non-null values for this column make it text to be
3187  // safe b/c that is least restrictive type
3188  if (non_null_col_counts[col_idx] == 0) {
3189  best_types[col_idx] = kTEXT;
3190  }
3191  }
3192 
3193  return best_types;
3194 }
3195 
3196 std::vector<EncodingType> Detector::find_best_encodings(
3197  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3198  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3199  const std::vector<SQLTypes>& best_types) {
3200  if (raw_rows.size() < 1) {
3201  throw std::runtime_error("No rows found in: " +
3202  boost::filesystem::basename(file_path));
3203  }
3204  size_t num_cols = best_types.size();
3205  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3206  std::vector<size_t> num_rows_per_col(num_cols, 1);
3207  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3208  for (auto row = row_begin; row != row_end; row++) {
3209  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3210  if (IS_STRING(best_types[col_idx])) {
3211  count_set[col_idx].insert(row->at(col_idx));
3212  num_rows_per_col[col_idx]++;
3213  }
3214  }
3215  }
3216  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3217  if (IS_STRING(best_types[col_idx])) {
3218  float uniqueRatio =
3219  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3220  if (uniqueRatio < 0.75) {
3221  best_encodes[col_idx] = kENCODING_DICT;
3222  }
3223  }
3224  }
3225  return best_encodes;
3226 }
3227 
3228 // detect_headers returns true if:
3229 // - all elements of the first argument are kTEXT
3230 // - there is at least one instance where tail_types is more restrictive than head_types
3231 // (ie, not kTEXT)
3232 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3233  const std::vector<SQLTypes>& tail_types) {
3234  if (head_types.size() != tail_types.size()) {
3235  return false;
3236  }
3237  bool has_headers = false;
3238  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3239  if (head_types[col_idx] != kTEXT) {
3240  return false;
3241  }
3242  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3243  }
3244  return has_headers;
3245 }
3246 
3247 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3248  n = std::min(n, raw_rows.size());
3249  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3250  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3251  raw_rows.begin() + n);
3252  return sample_rows;
3253 }
3254 
3255 std::vector<std::string> Detector::get_headers() {
3256  std::vector<std::string> headers(best_sqltypes.size());
3257  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3258  if (has_headers && i < raw_rows[0].size()) {
3259  headers[i] = raw_rows[0][i];
3260  } else {
3261  headers[i] = "column_" + std::to_string(i + 1);
3262  }
3263  }
3264  return headers;
3265 }
3266 
3267 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3268  size_t row_count) {
3269  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3270  load_failed = true;
3271  }
3272 }
3273 
3275  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3276  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3277  if (load_failed) {
3278  // rollback to starting epoch - undo all the added records
3279  loader->setTableEpochs(table_epochs);
3280  } else {
3281  loader->checkpoint();
3282  }
3283  }
3284 
3285  if (loader->getTableDesc()->persistenceLevel ==
3286  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3287  auto ms = measure<>::execution([&]() {
3288  if (!load_failed) {
3289  for (auto& p : import_buffers_vec[0]) {
3290  if (!p->stringDictCheckpoint()) {
3291  LOG(ERROR) << "Checkpointing Dictionary for Column "
3292  << p->getColumnDesc()->columnName << " failed.";
3293  load_failed = true;
3294  break;
3295  }
3296  }
3297  }
3298  });
3299  if (DEBUG_TIMING) {
3300  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3301  << std::endl;
3302  }
3303  }
3304 }
3305 
3307  // in generalized importing scheme, reaching here file_path may
3308  // contain a file path, a url or a wildcard of file paths.
3309  // see CopyTableStmt::execute.
3310  auto file_paths = omnisci::glob(file_path);
3311  if (file_paths.size() == 0) {
3312  file_paths.push_back(file_path);
3313  }
3314 
3315  // sum up sizes of all local files -- only for local files. if
3316  // file_path is a s3 url, sizes will be obtained via S3Archive.
3317  for (const auto& file_path : file_paths) {
3319  }
3320 
3321 #ifdef ENABLE_IMPORT_PARQUET
3322  // s3 parquet goes different route because the files do not use libarchive
3323  // but parquet api, and they need to landed like .7z files.
3324  //
3325  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3326  // because for example spark sql users may specify a output url w/o file
3327  // extension like this:
3328  // df.write
3329  // .mode("overwrite")
3330  // .parquet("s3://bucket/folder/parquet/mydata")
3331  // without the parameter, it means plain or compressed csv files.
3332  // note: .ORC and AVRO files should follow a similar path to Parquet?
3333  if (copy_params.file_type == FileType::PARQUET) {
3334  import_parquet(file_paths);
3335  } else
3336 #endif
3337  {
3338  import_compressed(file_paths);
3339  }
3340  return import_status;
3341 }
3342 
3343 #ifdef ENABLE_IMPORT_PARQUET
3344 inline auto open_parquet_table(const std::string& file_path,
3345  std::shared_ptr<arrow::io::ReadableFile>& infile,
3346  std::unique_ptr<parquet::arrow::FileReader>& reader,
3347  std::shared_ptr<arrow::Table>& table) {
3348  using namespace parquet::arrow;
3349  auto file_result = arrow::io::ReadableFile::Open(file_path);
3350  PARQUET_THROW_NOT_OK(file_result.status());
3351  infile = file_result.ValueOrDie();
3352 
3353  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3354  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3355  const auto num_row_groups = reader->num_row_groups();
3356  const auto num_columns = table->num_columns();
3357  const auto num_rows = table->num_rows();
3358  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3359  << " columns in " << num_row_groups << " groups.";
3360  return std::make_tuple(num_row_groups, num_columns, num_rows);
3361 }
3362 
3363 void Detector::import_local_parquet(const std::string& file_path) {
3364  std::shared_ptr<arrow::io::ReadableFile> infile;
3365  std::unique_ptr<parquet::arrow::FileReader> reader;
3366  std::shared_ptr<arrow::Table> table;
3367  int num_row_groups, num_columns;
3368  int64_t num_rows;
3369  std::tie(num_row_groups, num_columns, num_rows) =
3370  open_parquet_table(file_path, infile, reader, table);
3371  // make up header line if not yet
3372  if (0 == raw_data.size()) {
3374  copy_params.line_delim = '\n';
3375  copy_params.delimiter = ',';
3376  // must quote values to skip any embedded delimiter
3377  copy_params.quoted = true;
3378  copy_params.quote = '"';
3379  copy_params.escape = '"';
3380  for (int c = 0; c < num_columns; ++c) {
3381  if (c) {
3382  raw_data += copy_params.delimiter;
3383  }
3384  raw_data += table->ColumnNames().at(c);
3385  }
3386  raw_data += copy_params.line_delim;
3387  }
3388  // make up raw data... rowwize...
3389  const ColumnDescriptor cd;
3390  for (int g = 0; g < num_row_groups; ++g) {
3391  // data is columnwise
3392  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3393  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3394  arrays.resize(num_columns);
3395  for (int c = 0; c < num_columns; ++c) {
3396  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3397  for (auto chunk : arrays[c]->chunks()) {
3398  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3399  }
3400  }
3401  for (int r = 0; r < num_rows; ++r) {
3402  for (int c = 0; c < num_columns; ++c) {
3403  std::vector<std::string> buffer;
3404  for (auto chunk : arrays[c]->chunks()) {
3405  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3406  if (c) {
3407  raw_data += copy_params.delimiter;
3408  }
3409  if (!chunk->IsNull(r)) {
3410  raw_data += copy_params.quote;
3411  raw_data += boost::replace_all_copy(
3412  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3413  raw_data += copy_params.quote;
3414  }
3415  }
3416  }
3417  raw_data += copy_params.line_delim;
3418  if (++import_status.rows_completed >= 10000) {
3419  // as if load truncated
3421  load_failed = true;
3422  return;
3423  }
3424  }
3425  }
3426 }
3427 
3428 template <typename DATA_TYPE>
3430  std::vector<DATA_TYPE>& buffer,
3431  import_export::BadRowsTracker* const bad_rows_tracker) {
3432  const auto old_size = buffer.size();
3433  // erase backward to minimize memory movement overhead
3434  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3435  ++rit) {
3436  buffer.erase(buffer.begin() + *rit);
3437  }
3438  return std::make_tuple(old_size, buffer.size());
3439 }
3440 
3442  BadRowsTracker* const bad_rows_tracker) {
3443  switch (type) {
3444  case kBOOLEAN:
3445  return del_values(*bool_buffer_, bad_rows_tracker);
3446  case kTINYINT:
3447  return del_values(*tinyint_buffer_, bad_rows_tracker);
3448  case kSMALLINT:
3449  return del_values(*smallint_buffer_, bad_rows_tracker);
3450  case kINT:
3451  return del_values(*int_buffer_, bad_rows_tracker);
3452  case kBIGINT:
3453  case kNUMERIC:
3454  case kDECIMAL:
3455  case kTIME:
3456  case kTIMESTAMP:
3457  case kDATE:
3458  return del_values(*bigint_buffer_, bad_rows_tracker);
3459  case kFLOAT:
3460  return del_values(*float_buffer_, bad_rows_tracker);
3461  case kDOUBLE:
3462  return del_values(*double_buffer_, bad_rows_tracker);
3463  case kTEXT:
3464  case kVARCHAR:
3465  case kCHAR:
3466  return del_values(*string_buffer_, bad_rows_tracker);
3467  case kPOINT:
3468  case kLINESTRING:
3469  case kPOLYGON:
3470  case kMULTIPOLYGON:
3471  return del_values(*geo_string_buffer_, bad_rows_tracker);
3472  case kARRAY:
3473  return del_values(*array_buffer_, bad_rows_tracker);
3474  default:
3475  throw std::runtime_error("Invalid Type");
3476  }
3477 }
3478 
3479 void Importer::import_local_parquet(const std::string& file_path) {
3480  std::shared_ptr<arrow::io::ReadableFile> infile;
3481  std::unique_ptr<parquet::arrow::FileReader> reader;
3482  std::shared_ptr<arrow::Table> table;
3483  int num_row_groups, num_columns;
3484  int64_t nrow_in_file;
3485  std::tie(num_row_groups, num_columns, nrow_in_file) =
3486  open_parquet_table(file_path, infile, reader, table);
3487  // column_list has no $deleted
3488  const auto& column_list = get_column_descs();
3489  // for now geo columns expect a wkt or wkb hex string
3490  std::vector<const ColumnDescriptor*> cds;
3491  int num_physical_cols = 0;
3492  for (auto& cd : column_list) {
3493  cds.push_back(cd);
3494  num_physical_cols += cd->columnType.get_physical_cols();
3495  }
3496  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3497  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3498  std::to_string(num_columns) + " columns in file vs " +
3499  std::to_string(column_list.size() - num_physical_cols) +
3500  " columns in table.");
3501  // slice each group to import slower columns faster, eg. geo or string
3504  : std::min(static_cast<size_t>(cpu_threads()), g_max_import_threads);
3505  VLOG(1) << "Parquet import # threads: " << max_threads;
3506  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3507  // init row estimate for this file
3508  const auto filesize = get_filesize(file_path);
3509  size_t nrow_completed{0};
3510  file_offsets.push_back(0);
3511  // map logic column index to physical column index
3512  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3513  int physical_col_idx = 0;
3514  for (int i = 0; i < logic_col_idx; ++i) {
3515  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3516  }
3517  return physical_col_idx;
3518  };
3519  // load a file = nested iteration of row groups, row slices and logical columns
3520  auto ms_load_a_file = measure<>::execution([&]() {
3521  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3522  // a sliced row group will be handled like a (logic) parquet file, with
3523  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3524  import_buffers_vec.resize(num_slices);
3525  for (int slice = 0; slice < num_slices; slice++) {
3526  import_buffers_vec[slice].clear();
3527  for (const auto cd : cds) {
3528  import_buffers_vec[slice].emplace_back(
3529  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3530  }
3531  }
3532  /*
3533  * A caveat here is: Parquet files or arrow data is imported column wise.
3534  * Unlike importing row-wise csv files, a error on any row of any column
3535  * forces to give up entire row group of all columns, unless there is a
3536  * sophisticated method to trace erroneous rows in individual columns so
3537  * that we can union bad rows and drop them from corresponding
3538  * import_buffers_vec; otherwise, we may exceed maximum number of
3539  * truncated rows easily even with very sparse errors in the files.
3540  */
3541  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3542  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3543  auto& bad_rows_tracker = bad_rows_trackers[slice];
3544  bad_rows_tracker.file_name = file_path;
3545  bad_rows_tracker.row_group = slice;
3546  bad_rows_tracker.importer = this;
3547  }
3548  // process arrow arrays to import buffers
3549  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3550  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3551  const auto cd = cds[physical_col_idx];
3552  std::shared_ptr<arrow::ChunkedArray> array;
3553  PARQUET_THROW_NOT_OK(
3554  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3555  const size_t array_size = array->length();
3556  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3557  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3558  for (int slice = 0; slice < num_slices; ++slice) {
3559  thread_controller.startThread([&, slice] {
3560  const auto slice_offset = slice % num_slices;
3561  ArraySliceRange slice_range(
3562  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3563  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3564  auto& bad_rows_tracker = bad_rows_trackers[slice];
3565  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3566  import_buffer->import_buffers = &import_buffers_vec[slice];
3567  import_buffer->col_idx = physical_col_idx + 1;
3568  for (auto chunk : array->chunks()) {
3569  import_buffer->add_arrow_values(
3570  cd, *chunk, false, slice_range, &bad_rows_tracker);
3571  }
3572  });
3573  }
3574  thread_controller.finish();
3575  }
3576  std::vector<size_t> nrow_in_slice_raw(num_slices);
3577  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3578  // trim bad rows from import buffers
3579  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3580  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3581  const auto cd = cds[physical_col_idx];
3582  for (int slice = 0; slice < num_slices; ++slice) {
3583  auto& bad_rows_tracker = bad_rows_trackers[slice];
3584  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3585  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3586  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3587  }
3588  }
3589  // flush slices of this row group to chunks
3590  for (int slice = 0; slice < num_slices; ++slice) {
3591  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3592  }
3593  // update import stats
3594  const auto nrow_original =
3595  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3596  const auto nrow_imported =
3597  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3598  nrow_in_slice_successfully_loaded.end(),
3599  0);
3600  const auto nrow_dropped = nrow_original - nrow_imported;
3601  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3602  << " rows, drop " << nrow_dropped << " rows.";
3603  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3604  import_status.rows_completed += nrow_imported;
3605  import_status.rows_rejected += nrow_dropped;
3608  load_failed = true;
3609  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3610  << ") rows rejected exceeded. Halting load.";
3611  }
3612  // row estimate
3613  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3614  nrow_completed += nrow_imported;
3615  file_offsets.back() =
3616  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3617  // sum up current total file offsets
3618  const auto total_file_offset =
3619  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3620  // estimate number of rows per current total file offset
3621  if (total_file_offset) {
3623  (float)total_file_size / total_file_offset * import_status.rows_completed;
3624  VLOG(3) << "rows_completed " << import_status.rows_completed
3625  << ", rows_estimated " << import_status.rows_estimated
3626  << ", total_file_size " << total_file_size << ", total_file_offset "
3627  << total_file_offset;
3628  }
3629  }
3630  });
3631  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3632  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3633 }
3634 
3635 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3636  auto importer = dynamic_cast<Importer*>(this);
3637  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
3638  : std::vector<Catalog_Namespace::TableEpochInfo>{};
3639  try {
3640  std::exception_ptr teptr;
3641  // file_paths may contain one local file path, a list of local file paths
3642  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3643  for (auto const& file_path : file_paths) {
3644  std::map<int, std::string> url_parts;
3645  Archive::parse_url(file_path, url_parts);
3646 
3647  // for a s3 url we need to know the obj keys that it comprises
3648  std::vector<std::string> objkeys;
3649  std::unique_ptr<S3ParquetArchive> us3arch;
3650  if ("s3" == url_parts[2]) {
3651 #ifdef HAVE_AWS_S3
3652  us3arch.reset(new S3ParquetArchive(file_path,
3658  us3arch->init_for_read();
3659  total_file_size += us3arch->get_total_file_size();
3660  objkeys = us3arch->get_objkeys();
3661 #else
3662  throw std::runtime_error("AWS S3 support not available");
3663 #endif // HAVE_AWS_S3
3664  } else {
3665  objkeys.emplace_back(file_path);
3666  }
3667 
3668  // for each obj key of a s3 url we need to land it before
3669  // importing it like doing with a 'local file'.
3670  for (auto const& objkey : objkeys) {
3671  try {
3672  auto file_path =
3673  us3arch
3674  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3675  : objkey;
3676  import_local_parquet(file_path);
3677  if (us3arch) {
3678  us3arch->vacuum(objkey);
3679  }
3680  } catch (...) {
3681  if (us3arch) {
3682  us3arch->vacuum(objkey);
3683  }
3684  throw;
3685  }
3687  break;
3688  }
3689  }
3690  }
3691  // rethrow any exception happened herebefore
3692  if (teptr) {
3693  std::rethrow_exception(teptr);
3694  }
3695  } catch (...) {
3696  load_failed = true;
3698  throw;
3699  }
3700  }
3701 
3702  if (importer) {
3703  importer->checkpoint(table_epochs);
3704  }
3705 }
3706 #endif // ENABLE_IMPORT_PARQUET
3707 
3708 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3709 #ifdef _MSC_VER
3710  throw std::runtime_error("CSV Import not yet supported on Windows.");
3711 #else
3712  // a new requirement is to have one single input stream into
3713  // Importer::importDelimited, so need to move pipe related
3714  // stuff to the outmost block.
3715  int fd[2];
3716  if (pipe(fd) < 0) {
3717  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3718  }
3719  signal(SIGPIPE, SIG_IGN);
3720 
3721  std::exception_ptr teptr;
3722  // create a thread to read uncompressed byte stream out of pipe and
3723  // feed into importDelimited()
3724  ImportStatus ret;
3725  auto th_pipe_reader = std::thread([&]() {
3726  try {
3727  // importDelimited will read from FILE* p_file
3728  if (0 == (p_file = fdopen(fd[0], "r"))) {
3729  throw std::runtime_error(std::string("failed to open a pipe: ") +
3730  strerror(errno));
3731  }
3732 
3733  // in future, depending on data types of this uncompressed stream
3734  // it can be feed into other function such like importParquet, etc
3735  ret = importDelimited(file_path, true);
3736  } catch (...) {
3737  if (!teptr) { // no replace
3738  teptr = std::current_exception();
3739  }
3740  }
3741 
3742  if (p_file) {
3743  fclose(p_file);
3744  }
3745  p_file = 0;
3746  });
3747 
3748  // create a thread to iterate all files (in all archives) and
3749  // forward the uncompressed byte stream to fd[1] which is
3750  // then feed into importDelimited, importParquet, and etc.
3751  auto th_pipe_writer = std::thread([&]() {
3752  std::unique_ptr<S3Archive> us3arch;
3753  bool stop = false;
3754  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3755  try {
3756  auto file_path = file_paths[fi];
3757  std::unique_ptr<Archive> uarch;
3758  std::map<int, std::string> url_parts;
3759  Archive::parse_url(file_path, url_parts);
3760  const std::string S3_objkey_url_scheme = "s3ok";
3761  if ("file" == url_parts[2] || "" == url_parts[2]) {
3762  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3763  } else if ("s3" == url_parts[2]) {
3764 #ifdef HAVE_AWS_S3
3765  // new a S3Archive with a shared s3client.
3766  // should be safe b/c no wildcard with s3 url
3767  us3arch.reset(new S3Archive(file_path,
3773  us3arch->init_for_read();
3774  total_file_size += us3arch->get_total_file_size();
3775  // not land all files here but one by one in following iterations
3776  for (const auto& objkey : us3arch->get_objkeys()) {
3777  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3778  }
3779  continue;
3780 #else
3781  throw std::runtime_error("AWS S3 support not available");
3782 #endif // HAVE_AWS_S3
3783  } else if (S3_objkey_url_scheme == url_parts[2]) {
3784 #ifdef HAVE_AWS_S3
3785  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3786  auto file_path =
3787  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3788  if (0 == file_path.size()) {
3789  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3790  }
3791  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3792  // file not removed until file closed
3793  us3arch->vacuum(objkey);
3794 #else
3795  throw std::runtime_error("AWS S3 support not available");
3796 #endif // HAVE_AWS_S3
3797  }
3798 #if 0 // TODO(ppan): implement and enable any other archive class
3799  else
3800  if ("hdfs" == url_parts[2])
3801  uarch.reset(new HdfsArchive(file_path));
3802 #endif
3803  else {
3804  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3805  }
3806 
3807  // init the archive for read
3808  auto& arch = *uarch;
3809 
3810  // coming here, the archive of url should be ready to be read, unarchived
3811  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3812  const void* buf;
3813  size_t size;
3814  bool just_saw_archive_header;
3815  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3816  bool first_text_header_skipped = false;
3817  // start reading uncompressed bytes of this archive from libarchive
3818  // note! this archive may contain more than one files!
3819  file_offsets.push_back(0);
3820  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3821  bool insert_line_delim_after_this_file = false;
3822  while (!stop) {
3823  int64_t offset{-1};
3824  auto ok = arch.read_data_block(&buf, &size, &offset);
3825  // can't use (uncompressed) size, so track (max) file offset.
3826  // also we want to capture offset even on e.o.f.
3827  if (offset > 0) {
3828  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3829  file_offsets.back() = offset;
3830  }
3831  if (!ok) {
3832  break;
3833  }
3834  // one subtle point here is now we concatenate all files
3835  // to a single FILE stream with which we call importDelimited
3836  // only once. this would make it misunderstand that only one
3837  // header line is with this 'single' stream, while actually
3838  // we may have one header line for each of the files.
3839  // so we need to skip header lines here instead in importDelimited.
3840  const char* buf2 = (const char*)buf;
3841  int size2 = size;
3843  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3844  while (size2-- > 0) {
3845  if (*buf2++ == copy_params.line_delim) {
3846  break;
3847  }
3848  }
3849  if (size2 <= 0) {
3850  LOG(WARNING) << "No line delimiter in block." << std::endl;
3851  } else {
3852  just_saw_archive_header = false;
3853  first_text_header_skipped = true;
3854  }
3855  }
3856  // In very rare occasions the write pipe somehow operates in a mode similar to
3857  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3858  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3859  // loop reading till no bytes left, otherwise the annoying `failed to write
3860  // pipe: Success`...
3861  if (size2 > 0) {
3862  int nremaining = size2;
3863  while (nremaining > 0) {
3864  // try to write the entire remainder of the buffer to the pipe
3865  int nwritten = write(fd[1], buf2, nremaining);
3866  // how did we do?
3867  if (nwritten < 0) {
3868  // something bad happened
3869  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3870  // ignore these, assume nothing written, try again
3871  nwritten = 0;
3872  } else {
3873  // a real error
3874  throw std::runtime_error(
3875  std::string("failed or interrupted write to pipe: ") +
3876  strerror(errno));
3877  }
3878  } else if (nwritten == nremaining) {
3879  // we wrote everything; we're done
3880  break;
3881  }
3882  // only wrote some (or nothing), try again
3883  nremaining -= nwritten;
3884  buf2 += nwritten;
3885  // no exception when too many rejected
3886  // @simon.eves how would this get set? from the other thread? mutex
3887  // needed?
3889  stop = true;
3890  break;
3891  }
3892  }
3893  // check that this file (buf for size) ended with a line delim
3894  if (size > 0) {
3895  const char* plast = static_cast<const char*>(buf) + (size - 1);
3896  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3897  }
3898  }
3899  }
3900  // if that file didn't end with a line delim, we insert one here to terminate
3901  // that file's stream use a loop for the same reason as above
3902  if (insert_line_delim_after_this_file) {
3903  while (true) {
3904  // write the delim char to the pipe
3905  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3906  // how did we do?
3907  if (nwritten < 0) {
3908  // something bad happened
3909  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3910  // ignore these, assume nothing written, try again
3911  nwritten = 0;
3912  } else {
3913  // a real error
3914  throw std::runtime_error(
3915  std::string("failed or interrupted write to pipe: ") +
3916  strerror(errno));
3917  }
3918  } else if (nwritten == 1) {
3919  // we wrote it; we're done
3920  break;
3921  }
3922  }
3923  }
3924  }
3925  } catch (...) {
3926  // when import is aborted because too many data errors or because end of a
3927  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3928  // suppressed.
3929  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3931  break;
3932  }
3933  if (import_status.rows_completed > 0) {
3934  if (nullptr != dynamic_cast<Detector*>(this)) {
3935  break;
3936  }
3937  }
3938  if (!teptr) { // no replace
3939  teptr = std::current_exception();
3940  }
3941  break;
3942  }
3943  }
3944  // close writer end
3945  close(fd[1]);
3946  });
3947 
3948  th_pipe_reader.join();
3949  th_pipe_writer.join();
3950 
3951  // rethrow any exception happened herebefore
3952  if (teptr) {
3953  std::rethrow_exception(teptr);
3954  }
3955 #endif
3956 }
3957 
3960 }
3961 
3962 ImportStatus Importer::importDelimited(const std::string& file_path,
3963  const bool decompressed) {
3964  bool load_truncated = false;
3966 
3967  if (!p_file) {
3968  p_file = fopen(file_path.c_str(), "rb");
3969  }
3970  if (!p_file) {
3971  throw std::runtime_error("failed to open file '" + file_path +
3972  "': " + strerror(errno));
3973  }
3974 
3975  if (!decompressed) {
3976  (void)fseek(p_file, 0, SEEK_END);
3977  file_size = ftell(p_file);
3978  }
3979 
3980  if (copy_params.threads == 0) {
3981  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
3983  } else {
3984  max_threads = static_cast<size_t>(copy_params.threads);
3985  }
3986  VLOG(1) << "Delimited import # threads: " << max_threads;
3987 
3988  // deal with small files
3989  size_t alloc_size = copy_params.buffer_size;
3990  if (!decompressed && file_size < alloc_size) {
3991  alloc_size = file_size;
3992  }
3993 
3994  for (size_t i = 0; i < max_threads; i++) {
3995  import_buffers_vec.emplace_back();
3996  for (const auto cd : loader->get_column_descs()) {
3997  import_buffers_vec[i].emplace_back(
3998  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
3999  }
4000  }
4001 
4002  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4003  size_t current_pos = 0;
4004  size_t end_pos;
4005  size_t begin_pos = 0;
4006 
4007  (void)fseek(p_file, current_pos, SEEK_SET);
4008  size_t size =
4009  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4010 
4011  // make render group analyzers for each poly column
4012  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4014  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4015  loader->getTableDesc()->tableId, false, false, false);
4016  for (auto cd : columnDescriptors) {
4017  SQLTypes ct = cd->columnType.get_type();
4018  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4019  auto rga = std::make_shared<RenderGroupAnalyzer>();
4020  rga->seedFromExistingTableContents(loader, cd->columnName);
4021  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4022  }
4023  }
4024  }
4025 
4026  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4027  loader->getTableDesc()->tableId};
4028  auto table_epochs = loader->getTableEpochs();
4029  {
4030  std::list<std::future<ImportStatus>> threads;
4031 
4032  // use a stack to track thread_ids which must not overlap among threads
4033  // because thread_id is used to index import_buffers_vec[]
4034  std::stack<size_t> stack_thread_ids;
4035  for (size_t i = 0; i < max_threads; i++) {
4036  stack_thread_ids.push(i);
4037  }
4038  // added for true row index on error
4039  size_t first_row_index_this_buffer = 0;
4040 
4041  while (size > 0) {
4042  unsigned int num_rows_this_buffer = 0;
4043  CHECK(scratch_buffer);
4044  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4045  scratch_buffer,
4046  size,
4047  copy_params,
4048  first_row_index_this_buffer,
4049  num_rows_this_buffer,
4050  p_file);
4051 
4052  // unput residual
4053  int nresidual = size - end_pos;
4054  std::unique_ptr<char[]> unbuf;
4055  if (nresidual > 0) {
4056  unbuf = std::make_unique<char[]>(nresidual);
4057  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4058  }
4059 
4060  // get a thread_id not in use
4061  auto thread_id = stack_thread_ids.top();
4062  stack_thread_ids.pop();
4063  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4064 
4065  threads.push_back(std::async(std::launch::async,
4067  thread_id,
4068  this,
4069  std::move(scratch_buffer),
4070  begin_pos,
4071  end_pos,
4072  end_pos,
4073  columnIdToRenderGroupAnalyzerMap,
4074  first_row_index_this_buffer));
4075 
4076  first_row_index_this_buffer += num_rows_this_buffer;
4077 
4078  current_pos += end_pos;
4079  scratch_buffer = std::make_unique<char[]>(alloc_size);
4080  CHECK(scratch_buffer);
4081  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4082  size = nresidual +
4083  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4084 
4085  begin_pos = 0;
4086 
4087  while (threads.size() > 0) {
4088  int nready = 0;
4089  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4090  it != threads.end();) {
4091  auto& p = *it;
4092  std::chrono::milliseconds span(
4093  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4094  if (p.wait_for(span) == std::future_status::ready) {
4095  auto ret_import_status = p.get();
4096  import_status += ret_import_status;
4097  // sum up current total file offsets
4098  size_t total_file_offset{0};
4099  if (decompressed) {
4100  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4101  for (const auto file_offset : file_offsets) {
4102  total_file_offset += file_offset;
4103  }
4104  }
4105  // estimate number of rows per current total file offset
4106  if (decompressed ? total_file_offset : current_pos) {
4108  (decompressed ? (float)total_file_size / total_file_offset
4109  : (float)file_size / current_pos) *
4111  }
4112  VLOG(3) << "rows_completed " << import_status.rows_completed
4113  << ", rows_estimated " << import_status.rows_estimated
4114  << ", total_file_size " << total_file_size << ", total_file_offset "
4115  << total_file_offset;
4117  // recall thread_id for reuse
4118  stack_thread_ids.push(ret_import_status.thread_id);
4119  threads.erase(it++);
4120  ++nready;
4121  } else {
4122  ++it;
4123  }
4124  }
4125 
4126  if (nready == 0) {
4127  std::this_thread::yield();
4128  }
4129 
4130  // on eof, wait all threads to finish
4131  if (0 == size) {
4132  continue;
4133  }
4134 
4135  // keep reading if any free thread slot
4136  // this is one of the major difference from old threading model !!
4137  if (threads.size() < max_threads) {
4138  break;
4139  }
4140  }
4141 
4143  load_truncated = true;
4144  load_failed = true;
4145  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4146  break;
4147  }
4148  if (load_failed) {
4149  load_truncated = true;
4150  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
4151  "more details";
4152  break;
4153  }
4154  }
4155 
4156  // join dangling threads in case of LOG(ERROR) above
4157  for (auto& p : threads) {
4158  p.wait();
4159  }
4160  }
4161 
4162  checkpoint(table_epochs);
4163 
4164  // must set import_status.load_truncated before closing this end of pipe
4165  // otherwise, the thread on the other end would throw an unwanted 'write()'
4166  // exception
4167  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
4168  import_status.load_truncated = load_truncated;
4169 
4170  fclose(p_file);
4171  p_file = nullptr;
4172  return import_status;
4173 }
4174 
4176  if (getTableDesc()->persistenceLevel ==
4177  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4178  getCatalog().checkpointWithAutoRollback(getTableDesc()->tableId);
4179  }
4180 }
4181 
4182 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4183  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4184  getTableDesc()->tableId);
4185 }
4186 
4188  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4189  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4190 }
4191 
4192 /* static */
4194  // for now we only support S3
4195  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4196  // only set if non-empty to allow GDAL defaults to persist
4197  // explicitly clear if empty to revert to default and not reuse a previous session's
4198  // keys
4199  if (copy_params.s3_region.size()) {
4200 #if DEBUG_AWS_AUTHENTICATION
4201  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4202 #endif
4203  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4204  } else {
4205 #if DEBUG_AWS_AUTHENTICATION
4206  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4207 #endif
4208  CPLSetConfigOption("AWS_REGION", nullptr);
4209  }
4210  if (copy_params.s3_endpoint.size()) {
4211 #if DEBUG_AWS_AUTHENTICATION
4212  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4213 #endif
4214  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4215  } else {
4216 #if DEBUG_AWS_AUTHENTICATION
4217  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4218 #endif
4219  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4220  }
4221  if (copy_params.s3_access_key.size()) {
4222 #if DEBUG_AWS_AUTHENTICATION
4223  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4224  << "'";
4225 #endif
4226  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4227  } else {
4228 #if DEBUG_AWS_AUTHENTICATION
4229  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4230 #endif
4231  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4232  }
4233  if (copy_params.s3_secret_key.size()) {
4234 #if DEBUG_AWS_AUTHENTICATION
4235  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4236  << "'";
4237 #endif
4238  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4239  } else {
4240 #if DEBUG_AWS_AUTHENTICATION
4241  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4242 #endif
4243  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4244  }
4245 
4246 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4247  // if we haven't set keys, we need to disable signed access
4248  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4249 #if DEBUG_AWS_AUTHENTICATION
4250  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4251 #endif
4252  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4253  } else {
4254 #if DEBUG_AWS_AUTHENTICATION
4255  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4256 #endif
4257  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4258  }
4259 #endif
4260 }
4261 
4262 /* static */
4263 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4264  const CopyParams& copy_params) {
4265  // lazy init GDAL
4267 
4268  // set authorization tokens
4269  setGDALAuthorizationTokens(copy_params);
4270 
4271  // open the file
4272  OGRDataSource* poDS;
4273 #if GDAL_VERSION_MAJOR == 1
4274  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4275 #else
4276  poDS = (OGRDataSource*)GDALOpenEx(
4277  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4278  if (poDS == nullptr) {
4279  poDS = (OGRDataSource*)GDALOpenEx(
4280  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4281  if (poDS) {
4282  LOG(INFO) << "openGDALDataset had to open as read-only";
4283  }
4284  }
4285 #endif
4286  if (poDS == nullptr) {
4287  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4288  }
4289  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4290  // in a memory leak if GDAL successfully opened the input dataset.
4291  return poDS;
4292 }
4293 
4294 namespace {
4295 
4296 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4297  const OGRDataSourceUqPtr& poDS,
4298  const std::string& file_name) {
4299  // get layer with specified name, or default to first layer
4300  OGRLayer* poLayer = nullptr;
4301  if (geo_layer_name.size()) {
4302  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4303  if (poLayer == nullptr) {
4304  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4305  file_name);
4306  }
4307  } else {
4308  poLayer = poDS->GetLayer(0);
4309  if (poLayer == nullptr) {
4310  throw std::runtime_error("No layers found in " + file_name);
4311  }
4312  }
4313  return *poLayer;
4314 }
4315 
4316 } // namespace
4317 
4318 /* static */
4320  const std::string& file_name,
4321  const std::string& geo_column_name,
4322  std::map<std::string, std::vector<std::string>>& metadata,
4323  int rowLimit,
4324  const CopyParams& copy_params) {
4325  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4326  if (poDS == nullptr) {
4327  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4328  file_name);
4329  }
4330 
4331  OGRLayer& layer =
4332  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4333 
4334  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4335  CHECK(poFDefn);
4336 
4337  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4338  auto nFeats = layer.GetFeatureCount();
4339  size_t numFeatures =
4340  std::max(static_cast<decltype(nFeats)>(0),
4341  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4342  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4343  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4344  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4345  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4346  }
4347  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4348  layer.ResetReading();
4349  size_t iFeature = 0;
4350  while (iFeature < numFeatures) {
4351  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4352  if (!poFeature) {
4353  break;
4354  }
4355 
4356  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4357  if (poGeometry != nullptr) {
4358  // validate geom type (again?)
4359  switch (wkbFlatten(poGeometry->getGeometryType())) {
4360  case wkbPoint:
4361  case wkbLineString:
4362  case wkbPolygon:
4363  case wkbMultiPolygon:
4364  break;
4365  case wkbMultiPoint:
4366  case wkbMultiLineString:
4367  // supported if geo_explode_collections is specified
4368  if (!copy_params.geo_explode_collections) {
4369  throw std::runtime_error("Unsupported geometry type: " +
4370  std::string(poGeometry->getGeometryName()));
4371  }
4372  break;
4373  default:
4374  throw std::runtime_error("Unsupported geometry type: " +
4375  std::string(poGeometry->getGeometryName()));
4376  }
4377 
4378  // populate metadata for regular fields
4379  for (auto i : metadata) {
4380  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4381  if (iField >= 0) { // geom is -1
4382  metadata[i.first].at(iFeature) =
4383  std::string(poFeature->GetFieldAsString(iField));
4384  }
4385  }
4386 
4387  // populate metadata for geo column with WKT string
4388  char* wkts = nullptr;
4389  poGeometry->exportToWkt(&wkts);
4390  CHECK(wkts);
4391  metadata[geo_column_name].at(iFeature) = wkts;
4392  CPLFree(wkts);
4393  }
4394  iFeature++;
4395  }
4396 }
4397 
4398 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4399  switch (ogr_type) {
4400  case OFTInteger:
4401  return std::make_pair(kINT, false);
4402  case OFTIntegerList:
4403  return std::make_pair(kINT, true);
4404 #if GDAL_VERSION_MAJOR > 1
4405  case OFTInteger64:
4406  return std::make_pair(kBIGINT, false);
4407  case OFTInteger64List:
4408  return std::make_pair(kBIGINT, true);
4409 #endif
4410  case OFTReal:
4411  return std::make_pair(kDOUBLE, false);
4412  case OFTRealList:
4413  return std::make_pair(kDOUBLE, true);
4414  case OFTString:
4415  return std::make_pair(kTEXT, false);
4416  case OFTStringList:
4417  return std::make_pair(kTEXT, true);
4418  case OFTDate:
4419  return std::make_pair(kDATE, false);
4420  case OFTTime:
4421  return std::make_pair(kTIME, false);
4422  case OFTDateTime:
4423  return std::make_pair(kTIMESTAMP, false);
4424  case OFTBinary:
4425  // Interpret binary blobs as byte arrays here
4426  // but actual import will store NULL as GDAL will not
4427  // extract the blob (OGRFeature::GetFieldAsString will
4428  // result in the import buffers having an empty string)
4429  return std::make_pair(kTINYINT, true);
4430  default:
4431  break;
4432  }
4433  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4434 }
4435 
4436 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4437  switch (ogr_type) {
4438  case wkbPoint:
4439  return kPOINT;
4440  case wkbLineString:
4441  return kLINESTRING;
4442  case wkbPolygon:
4443  return kPOLYGON;
4444  case wkbMultiPolygon:
4445  return kMULTIPOLYGON;
4446  default:
4447  break;
4448  }
4449  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4450 }
4451 
4452 /* static */
4453 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4454  const std::string& file_name,
4455  const std::string& geo_column_name,
4456  const CopyParams& copy_params) {
4457  std::list<ColumnDescriptor> cds;
4458 
4459  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4460  if (poDS == nullptr) {
4461  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4462  file_name);
4463  }
4464 
4465  OGRLayer& layer =
4466  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4467 
4468  layer.ResetReading();
4469  // TODO(andrewseidl): support multiple features
4470  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4471  if (poFeature == nullptr) {
4472  throw std::runtime_error("No features found in " + file_name);
4473  }
4474  // get fields as regular columns
4475  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4476  CHECK(poFDefn);
4477  int iField;
4478  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4479  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4480  auto typePair = ogr_to_type(poFieldDefn->GetType());
4481  ColumnDescriptor cd;
4482  cd.columnName = poFieldDefn->GetNameRef();
4483  cd.sourceName = poFieldDefn->GetNameRef();
4484  SQLTypeInfo ti;
4485  if (typePair.second) {
4486  ti.set_type(kARRAY);
4487  ti.set_subtype(typePair.first);
4488  } else {
4489  ti.set_type(typePair.first);
4490  }
4491  if (typePair.first == kTEXT) {
4493  ti.set_comp_param(32);
4494  }
4495  ti.set_fixed_size();
4496  cd.columnType = ti;
4497  cds.push_back(cd);
4498  }
4499  // get geo column, if any
4500  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4501  if (poGeometry) {
4502  ColumnDescriptor cd;
4503  cd.columnName = geo_column_name;
4504  cd.sourceName = geo_column_name;
4505 
4506  // get GDAL type
4507  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4508 
4509  // if exploding, override any collection type to child type
4510  if (copy_params.geo_explode_collections) {
4511  if (ogr_type == wkbMultiPolygon) {
4512  ogr_type = wkbPolygon;
4513  } else if (ogr_type == wkbMultiLineString) {
4514  ogr_type = wkbLineString;
4515  } else if (ogr_type == wkbMultiPoint) {
4516  ogr_type = wkbPoint;
4517  }
4518  }
4519 
4520  // convert to internal type
4521  SQLTypes geoType = ogr_to_type(ogr_type);
4522 
4523  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4524  if (PROMOTE_POLYGON_TO_MULTIPOLYGON && !copy_params.geo_explode_collections) {
4525  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4526  }
4527 
4528  // build full internal type
4529  SQLTypeInfo ti;
4530  ti.set_type(geoType);
4531  ti.set_subtype(copy_params.geo_coords_type);
4532  ti.set_input_srid(copy_params.geo_coords_srid);
4533  ti.set_output_srid(copy_params.geo_coords_srid);
4534  ti.set_compression(copy_params.geo_coords_encoding);
4535  ti.set_comp_param(copy_params.geo_coords_comp_param);
4536  cd.columnType = ti;
4537 
4538  cds.push_back(cd);
4539  }
4540  return cds;
4541 }
4542 
4543 bool Importer::gdalStatInternal(const std::string& path,
4544  const CopyParams& copy_params,
4545  bool also_dir) {
4546  // lazy init GDAL
4548 
4549  // set authorization tokens
4550  setGDALAuthorizationTokens(copy_params);
4551 
4552 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4553  // clear GDAL stat cache
4554  // without this, file existence will be cached, even if authentication changes
4555  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4556  VSICurlClearCache();
4557 #endif
4558 
4559  // stat path
4560  VSIStatBufL sb;
4561  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4562  if (result < 0) {
4563  return false;
4564  }
4565 
4566  // exists?
4567  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4568  return true;
4569  } else if (VSI_ISREG(sb.st_mode)) {
4570  return true;
4571  }
4572  return false;
4573 }
4574 
4575 /* static */
4576 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4577  return gdalStatInternal(path, copy_params, false);
4578 }
4579 
4580 /* static */
4581 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4582  const CopyParams& copy_params) {
4583  return gdalStatInternal(path, copy_params, true);
4584 }
4585 
4586 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4587  std::vector<std::string>& files) {
4588  // prepare to gather subdirectories
4589  std::vector<std::string> subdirectories;
4590 
4591  // get entries
4592  char** entries = VSIReadDir(archive_path.c_str());
4593  if (!entries) {
4594  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4595  return;
4596  }
4597 
4598  // force scope
4599  {
4600  // request clean-up
4601  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4602 
4603  // check all the entries
4604  int index = 0;
4605  while (true) {
4606  // get next entry, or drop out if there isn't one
4607  char* entry_c = entries[index++];
4608  if (!entry_c) {
4609  break;
4610  }
4611  std::string entry(entry_c);
4612 
4613  // ignore '.' and '..'
4614  if (entry == "." || entry == "..") {
4615  continue;
4616  }
4617 
4618  // build the full path
4619  std::string entry_path = archive_path + std::string("/") + entry;
4620 
4621  // is it a file or a sub-folder
4622  VSIStatBufL sb;
4623  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4624  if (result < 0) {
4625  break;
4626  }
4627 
4628  if (VSI_ISDIR(sb.st_mode)) {
4629  // a directory that ends with .gdb could be a Geodatabase bundle
4630  // arguably dangerous to decide this purely by name, but any further
4631  // validation would be very complex especially at this scope
4632  if (boost::iends_with(entry_path, ".gdb")) {
4633  // add the directory as if it was a file and don't recurse into it
4634  files.push_back(entry_path);
4635  } else {
4636  // add subdirectory to be recursed into
4637  subdirectories.push_back(entry_path);
4638  }
4639  } else {
4640  // add this file
4641  files.push_back(entry_path);
4642  }
4643  }
4644  }
4645 
4646  // recurse into each subdirectories we found
4647  for (const auto& subdirectory : subdirectories) {
4648  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4649  }
4650 }
4651 
4652 /* static */
4653 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4654  const std::string& archive_path,
4655  const CopyParams& copy_params) {
4656  // lazy init GDAL
4658 
4659  // set authorization tokens
4660  setGDALAuthorizationTokens(copy_params);
4661 
4662  // prepare to gather files
4663  std::vector<std::string> files;
4664 
4665  // gather the files recursively
4666  gdalGatherFilesInArchiveRecursive(archive_path, files);
4667 
4668  // convert to relative paths inside archive
4669  for (auto& file : files) {
4670  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4671  }
4672 
4673  // done
4674  return files;
4675 }
4676 
4677 /* static */
4678 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4679  const std::string& file_name,
4680  const CopyParams& copy_params) {
4681  // lazy init GDAL
4683 
4684  // set authorization tokens
4685  setGDALAuthorizationTokens(copy_params);
4686 
4687  // prepare to gather layer info
4688  std::vector<GeoFileLayerInfo> layer_info;
4689 
4690  // open the data set
4691  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4692  if (poDS == nullptr) {
4693  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4694  file_name);
4695  }
4696 
4697  // enumerate the layers
4698  for (auto&& poLayer : poDS->GetLayers()) {
4700  // prepare to read this layer
4701  poLayer->ResetReading();
4702  // skip layer if empty
4703  if (poLayer->GetFeatureCount() > 0) {
4704  // get first feature
4705  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4706  CHECK(first_feature);
4707  // check feature for geometry
4708  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4709  if (!geometry) {
4710  // layer has no geometry
4711  contents = GeoFileLayerContents::NON_GEO;
4712  } else {
4713  // check the geometry type
4714  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4715  switch (wkbFlatten(geometry_type)) {
4716  case wkbPoint:
4717  case wkbLineString:
4718  case wkbPolygon:
4719  case wkbMultiPolygon:
4720  // layer has supported geo
4721  contents = GeoFileLayerContents::GEO;
4722  break;
4723  case wkbMultiPoint:
4724  case wkbMultiLineString:
4725  // supported if geo_explode_collections is specified
4726  contents = copy_params.geo_explode_collections
4729  break;
4730  default:
4731  // layer has unsupported geometry
4733  break;
4734  }
4735  }
4736  }
4737  // store info for this layer
4738  layer_info.emplace_back(poLayer->GetName(), contents);
4739  }
4740 
4741  // done
4742  return layer_info;
4743 }
4744 
4746  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4747  // initial status
4748  bool load_truncated = false;
4750 
4752  if (poDS == nullptr) {
4753  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4754  file_path);
4755  }
4756 
4757  OGRLayer& layer =
4759 
4760  // get the number of features in this layer
4761  size_t numFeatures = layer.GetFeatureCount();
4762 
4763  // build map of metadata field (additional columns) name to index
4764  // use shared_ptr since we need to pass it to the worker
4765  FieldNameToIndexMapType fieldNameToIndexMap;
4766  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4767  CHECK(poFDefn);
4768  size_t numFields = poFDefn->GetFieldCount();
4769  for (size_t iField = 0; iField < numFields; iField++) {
4770  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4771  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4772  }
4773 
4774  // the geographic spatial reference we want to put everything in
4775  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4776  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4777 
4778 #if GDAL_VERSION_MAJOR >= 3
4779  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4780  // this results in X and Y being transposed for angle-based
4781  // coordinate systems. This restores the previous behavior.
4782  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4783 #endif
4784 
4785 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4786  // just one "thread"
4787  max_threads = 1;
4788 #else
4789  // how many threads to use
4790  if (copy_params.threads == 0) {
4791  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4793  } else {
4795  }
4796 #endif
4797 
4798  VLOG(1) << "GDAL import # threads: " << max_threads;
4799 
4800  // make an import buffer for each thread
4801  CHECK_EQ(import_buffers_vec.size(), 0u);
4803  for (size_t i = 0; i < max_threads; i++) {
4804  for (const auto cd : loader->get_column_descs()) {
4805  import_buffers_vec[i].emplace_back(
4806  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4807  }
4808  }
4809 
4810  // make render group analyzers for each poly column
4811  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4813  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4814  loader->getTableDesc()->tableId, false, false, false);
4815  for (auto cd : columnDescriptors) {
4816  SQLTypes ct = cd->columnType.get_type();
4817  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4818  auto rga = std::make_shared<RenderGroupAnalyzer>();
4819  rga->seedFromExistingTableContents(loader, cd->columnName);
4820  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4821  }
4822  }
4823  }
4824 
4825 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4826  // threads
4827  std::list<std::future<ImportStatus>> threads;
4828 
4829  // use a stack to track thread_ids which must not overlap among threads
4830  // because thread_id is used to index import_buffers_vec[]
4831  std::stack<size_t> stack_thread_ids;
4832  for (size_t i = 0; i < max_threads; i++) {
4833  stack_thread_ids.push(i);
4834  }
4835 #endif
4836 
4837  // checkpoint the table
4838  auto table_epochs = loader->getTableEpochs();
4839 
4840  // reset the layer
4841  layer.ResetReading();
4842 
4843  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4844 
4845  // make a features buffer for each thread
4846  std::vector<FeaturePtrVector> features(max_threads);
4847 
4848  // for each feature...
4849  size_t firstFeatureThisChunk = 0;
4850  while (firstFeatureThisChunk < numFeatures) {
4851  // how many features this chunk
4852  size_t numFeaturesThisChunk =
4853  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4854 
4855 // get a thread_id not in use
4856 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4857  size_t thread_id = 0;
4858 #else
4859  auto thread_id = stack_thread_ids.top();
4860  stack_thread_ids.pop();
4861  CHECK(thread_id < max_threads);
4862 #endif
4863 
4864  // fill features buffer for new thread
4865  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4866  features[thread_id].emplace_back(layer.GetNextFeature());
4867  }
4868 
4869 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4870  // call worker function directly
4871  auto ret_import_status = import_thread_shapefile(0,
4872  this,
4873  poGeographicSR.get(),
4874  std::move(features[thread_id]),
4875  firstFeatureThisChunk,
4876  numFeaturesThisChunk,
4877  fieldNameToIndexMap,
4878  columnNameToSourceNameMap,
4879  columnIdToRenderGroupAnalyzerMap);
4880  import_status += ret_import_status;
4881  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4882  import_status.rows_completed;
4883  set_import_status(import_id, import_status);
4884 #else
4885  // fire up that thread to import this geometry
4886  threads.push_back(std::async(std::launch::async,
4888  thread_id,
4889  this,
4890  poGeographicSR.get(),
4891  std::move(features[thread_id]),
4892  firstFeatureThisChunk,
4893  numFeaturesThisChunk,
4894  fieldNameToIndexMap,
4895  columnNameToSourceNameMap,
4896  columnIdToRenderGroupAnalyzerMap));
4897 
4898  // let the threads run
4899  while (threads.size() > 0) {
4900  int nready = 0;
4901  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4902  it != threads.end();) {
4903  auto& p = *it;
4904  std::chrono::milliseconds span(
4905  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4906  if (p.wait_for(span) == std::future_status::ready) {
4907  auto ret_import_status = p.get();
4908  import_status += ret_import_status;
4909  import_status.rows_estimated =
4910  ((float)firstFeatureThisChunk / (float)numFeatures) *
4911  import_status.rows_completed;
4912  set_import_status(import_id, import_status);
4913 
4914  // recall thread_id for reuse
4915  stack_thread_ids.push(ret_import_status.thread_id);
4916 
4917  threads.erase(it++);
4918  ++nready;
4919  } else {
4920  ++it;
4921  }
4922  }
4923 
4924  if (nready == 0) {
4925  std::this_thread::yield();
4926  }
4927 
4928  // keep reading if any free thread slot
4929  // this is one of the major difference from old threading model !!
4930  if (threads.size() < max_threads) {
4931  break;
4932  }
4933  }
4934 #endif
4935 
4936  // out of rows?
4937  if (import_status.rows_rejected > copy_params.max_reject) {
4938  load_truncated = true;
4939  load_failed = true;
4940  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4941  break;
4942  }
4943 
4944  // failed?
4945  if (load_failed) {
4946  load_truncated = true;
4947  LOG(ERROR)
4948  << "A call to the Loader::load failed, Please review the logs for more details";
4949  break;
4950  }
4951 
4952  firstFeatureThisChunk += numFeaturesThisChunk;
4953  }
4954 
4955 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4956  // wait for any remaining threads
4957  if (threads.size()) {
4958  for (auto& p : threads) {
4959  // wait for the thread
4960  p.wait();
4961  // get the result and update the final import status
4962  auto ret_import_status = p.get();
4963  import_status += ret_import_status;
4966  }
4967  }
4968 #endif
4969 
4970  checkpoint(table_epochs);
4971 
4972  // must set import_status.load_truncated before closing this end of pipe
4973  // otherwise, the thread on the other end would throw an unwanted 'write()'
4974  // exception
4975  import_status.load_truncated = load_truncated;
4976  return import_status;
4977 }
4978 
4979 //
4980 // class RenderGroupAnalyzer
4981 //
4982 
4984  const std::unique_ptr<Loader>& loader,
4985  const std::string& geoColumnBaseName) {
4986  // start timer
4987  auto seedTimer = timer_start();
4988 
4989  // start with a fresh tree
4990  _rtree = nullptr;
4991  _numRenderGroups = 0;
4992 
4993  // get the table descriptor
4994  const auto& cat = loader->getCatalog();
4995  if (loader->getTableDesc()->storageType == StorageType::FOREIGN_TABLE) {
4997  LOG(INFO) << "DEBUG: Table is a foreign table";
4998  }
4999  _rtree = std::make_unique<RTree>();
5000  CHECK(_rtree);
5001  return;
5002  }
5003 
5004  const std::string& tableName = loader->getTableDesc()->tableName;
5005  const auto td = cat.getMetadataForTable(tableName);
5006  CHECK(td);
5007  CHECK(td->fragmenter);
5008 
5009  // if the table is empty, just make an empty tree
5010  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
5012  LOG(INFO) << "DEBUG: Table is empty!";
5013  }
5014  _rtree = std::make_unique<RTree>();
5015  CHECK(_rtree);
5016  return;
5017  }
5018 
5019  // no seeding possible without these two columns
5020  const auto cd_bounds =
5021  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
5022  const auto cd_render_group =
5023  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
5024  if (!cd_bounds || !cd_render_group) {
5025  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5026  " doesn't have bounds or render_group columns!");
5027  }
5028 
5029  // and validate their types
5030  if (cd_bounds->columnType.get_type() != kARRAY ||
5031  cd_bounds->columnType.get_subtype() != kDOUBLE) {
5032  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5033  " bounds column is wrong type!");
5034  }
5035  if (cd_render_group->columnType.get_type() != kINT) {
5036  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5037  " render_group column is wrong type!");
5038  }
5039 
5040  // get chunk accessor table
5041  auto chunkAccessorTable = getChunkAccessorTable(
5042  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
5043  const auto table_count = std::get<0>(chunkAccessorTable.back());
5044 
5046  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
5047  << "'";
5048  }
5049 
5050  std::vector<Node> nodes;
5051  try {
5052  nodes.resize(table_count);
5053  } catch (const std::exception& e) {
5054  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
5055  std::to_string(table_count) + " rows");
5056  }
5057 
5058  for (size_t row = 0; row < table_count; row++) {
5059  ArrayDatum ad;
5060  VarlenDatum vd;
5061  bool is_end;
5062 
5063  // get ChunkIters and fragment row offset
5064  size_t rowOffset = 0;
5065  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
5066  auto& boundsChunkIter = chunkIters[0];
5067  auto& renderGroupChunkIter = chunkIters[1];
5068 
5069  // get bounds values
5070  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
5071  CHECK(!is_end);
5072  CHECK(ad.pointer);
5073  int numBounds = (int)(ad.length / sizeof(double));
5074  CHECK(numBounds == 4);
5075 
5076  // convert to bounding box
5077  double* bounds = reinterpret_cast<double*>(ad.pointer);
5078  BoundingBox bounding_box;
5079  boost::geometry::assign_inverse(bounding_box);
5080  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5081  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5082 
5083  // get render group
5084  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
5085  CHECK(!is_end);
5086  CHECK(vd.pointer);
5087  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
5088 
5089  // skip rows with invalid render groups (e.g. EMPTY geometry)
5090  if (renderGroup < 0) {
5091  continue;
5092  }
5093 
5094  // store
5095  nodes[row] = std::make_pair(bounding_box, renderGroup);
5096 
5097  // how many render groups do we have now?
5098  if (renderGroup >= _numRenderGroups) {
5099  _numRenderGroups = renderGroup + 1;
5100  }
5101 
5103  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
5104  }
5105  }
5106 
5107  // bulk-load the tree
5108  auto bulk_load_timer = timer_start();
5109  _rtree = std::make_unique<RTree>(nodes);
5110  CHECK(_rtree);
5111  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
5112  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
5113  << timer_stop(bulk_load_timer) << " ms for tree)";
5114 
5116  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
5117  }
5118 }
5119 
5121  const std::vector<double>& bounds) {
5122  // validate
5123  CHECK(bounds.size() == 4);
5124 
5125  // get bounds
5126  BoundingBox bounding_box;
5127  boost::geometry::assign_inverse(bounding_box);
5128  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5129  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5130 
5131  // remainder under mutex to allow this to be multi-threaded
5132  std::lock_guard<std::mutex> guard(_rtreeMutex);
5133 
5134  // get the intersecting nodes
5135  std::vector<Node> intersects;
5136  _rtree->query(boost::geometry::index::intersects(bounding_box),
5137  std::back_inserter(intersects));
5138 
5139  // build bitset of render groups of the intersecting rectangles
5140  // clear bit means available, allows use of find_first()
5141  boost::dynamic_bitset<> bits(_numRenderGroups);
5142  bits.set();
5143  for (const auto& intersection : intersects) {
5144  CHECK(intersection.second < _numRenderGroups);
5145  bits.reset(intersection.second);
5146  }
5147 
5148  // find first available group
5149  int firstAvailableRenderGroup = 0;
5150  size_t firstSetBit = bits.find_first();
5151  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5152  // all known groups represented, add a new one
5153  firstAvailableRenderGroup = _numRenderGroups;
5154  _numRenderGroups++;
5155  } else {
5156  firstAvailableRenderGroup = (int)firstSetBit;
5157  }
5158 
5159  // insert new node
5160  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5161 
5162  // return it
5163  return firstAvailableRenderGroup;
5164 }
5165 
5166 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
5167  const TableDescriptor* td,
5168  Loader* loader) {
5169  CHECK(td);
5170  auto col_descs = loader->get_column_descs();
5171 
5172  std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
5173  for (auto cd : col_descs) {
5174  import_buffers.emplace_back(
5175  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
5176  }
5177 
5178  return import_buffers;
5179 }
5180 
5181 } // namespace import_export
int8_t tinyintval
Definition: sqltypes.h:203
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2895
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:72
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2518
void set_compression(EncodingType c)
Definition: sqltypes.h:411
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:137
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string s3_secret_key
Definition: CopyParams.h:63
#define NULL_DOUBLE
bool is_time() const
Definition: sqltypes.h:484
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
bool is_string() const
Definition: sqltypes.h:478
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
Definition: sqltypes.h:48
SQLTypes
Definition: sqltypes.h:37
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:541
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4745
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:117
std::vector< OGRFeatureUqPtr > FeaturePtrVector
Definition: Importer.cpp:141
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs)
Definition: Catalog.cpp:2973
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:82
void getColumns(std::vector< double > &coords) const
Definition: Types.cpp:562
#define NULL_FLOAT
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
::FILE * fopen(const char *filename, const char *mode)
Definition: omnisci_fs.cpp:72
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor *> &col_descs)
Definition: Importer.cpp:1685
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1719
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:2861
#define LOG(tag)
Definition: Logger.h:188
bool boolval
Definition: sqltypes.h:202
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3120
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params, const int64_t replicate_count=0)
Definition: Importer.cpp:503
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4296
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:400
void checkpointWithAutoRollback(const int logical_table_id)
Definition: Catalog.cpp:3931
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
ArrayDatum NullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:372
auto value_getter(const arrow::Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:154
Datum TDatumToDatum(const TDatum &datum, SQLTypeInfo &ti)
Definition: Importer.cpp:411
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:545
#define UNREACHABLE()
Definition: Logger.h:241
virtual void checkpoint()
Definition: Importer.cpp:4175
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5166
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:402
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
static void init()
Definition: GDAL.cpp:59
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:743
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:152
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4678
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:609
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1114
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:98
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3267
int8_t * getAsBytes() const
Definition: Importer.h:286
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
Definition: Importer.cpp:4182
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:751
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:107
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4543
#define CHECK_GT(x, y)
Definition: Logger.h:209
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
bool is_decimal() const
Definition: sqltypes.h:481
int32_t intval
Definition: sqltypes.h:205
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4576
std::string sourceName
std::string to_string(char const *&&v)
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:954
int8_t * pointer
Definition: sqltypes.h:143
#define DEBUG_TIMING
Definition: Importer.cpp:143
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2871
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1434
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:199
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
void set_input_srid(int d)
Definition: sqltypes.h:405
float floatval
Definition: sqltypes.h:207
ImportHeaderRow has_header
Definition: CopyParams.h:48
std::optional< int64_t > dateTimeParseOptional< kTIME >(std::string_view str, unsigned const dim)
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:3008
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:237
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3196
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void close(const int fd)
Definition: omnisci_fs.cpp:68
size_t g_archive_read_buf_size
Definition: Importer.cpp:80
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
ArrayDatum TDatumToArrayDatum(const TDatum &datum, const SQLTypeInfo &ti)
Definition: Importer.cpp:456
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:140
std::string cat(Ts &&... args)
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4193
CONSTEXPR DEVICE bool is_null(const T &value)
void set_fixed_size()
Definition: sqltypes.h:410
std::optional< int64_t > dateTimeParseOptional< kDATE >(std::string_view str, unsigned const dim)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::vector< std::string > glob(const std::string &pattern)
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4453
std::shared_timed_mutex mapd_shared_mutex
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3708
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4319
bool transforms() const
Definition: sqltypes.h:496
int64_t bigintval
Definition: sqltypes.h:206
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:312
#define NULL_ARRAY_FLOAT
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4586
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:149
int16_t smallintval
Definition: sqltypes.h:204
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:786
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:124
specifies the content in-memory of a row in the column metadata table
std::optional< int64_t > dateTimeParseOptional< kTIMESTAMP >(std::string_view str, unsigned const dim)
int8_t * getStringDictBuffer() const
Definition: Importer.h:354
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:823
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
auto getLoader() const
Definition: Importer.h:810
#define DEBUG_RENDER_GROUP_ANALYZER
Definition: Importer.cpp:144
void set_output_srid(int s)
Definition: sqltypes.h:407
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:3962
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2569
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2524
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:303
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4581
std::set< int64_t > rows
Definition: Importer.h:76
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2819
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:825
bool try_cast(const std::string &str)
Definition: Importer.cpp:2999
void set_comp_param(int p)
Definition: sqltypes.h:412
std::string geo_layer_name
Definition: CopyParams.h:78
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:907
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:218
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:51
Definition: sqltypes.h:52
static OGRGeometry * createOGRGeometry(const std::string &wkt_or_wkb_hex)
Definition: Types.cpp:873
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:319
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:922
void startThread(FuncType &&func, Args &&... args)
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3098
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
Definition: Datum.cpp:319
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2562
std::map< std::string, std::string > ColumnNameToSourceNameMapType
Definition: Importer.cpp:138
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1809
Definition: sqltypes.h:40
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3247
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3232
#define IS_STRING(T)
Definition: sqltypes.h:241
int64_t inline_fixed_encoding_null_array_val(const SQL_TYPE_INFO &ti)
std::string import_id
Definition: Importer.h:819
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:721
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2531
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:853
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3274
std::chrono::steady_clock::time_point start
Definition: Importer.h:604
ThreadId thread_id()
Definition: Logger.cpp:732
mapd_shared_lock< mapd_shared_mutex > read_lock
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3090
#define NULL_ARRAY_DOUBLE
std::vector< std::string > get_headers()
Definition: Importer.cpp:3255
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2726
const CopyParams & get_copy_params() const
Definition: Importer.h:742
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:697
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:126
#define CHECK(condition)
Definition: Logger.h:197
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:213
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
std::vector< int > ChunkKey
Definition: types.h:37
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords, SQLTypeInfo &ti)
Definition: Importer.cpp:1414
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2578
ImportStatus import()
Definition: Importer.cpp:3958
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:4653
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
DEVICE void swap(ARGS &&... args)
Definition: gpu_enabled.h:114
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2150
Definition: sqltypes.h:44
</