OmniSciDB  ab4938a6a3
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "Import/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <unistd.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
58 #include "Shared/Logger.h"
59 #include "Shared/SqlTypesLayout.h"
60 #include "Shared/geo_compression.h"
61 #include "Shared/geo_types.h"
62 #include "Shared/geosupport.h"
63 #include "Shared/import_helpers.h"
64 #include "Shared/mapd_glob.h"
65 #include "Shared/mapdpath.h"
66 #include "Shared/measure.h"
67 #include "Shared/misc.h"
68 #include "Shared/scope.h"
69 #include "Shared/shard_key.h"
70 #include "Shared/thread_count.h"
72 
73 #include "gen-cpp/OmniSci.h"
74 
75 size_t g_archive_read_buf_size = 1 << 20;
76 
77 inline auto get_filesize(const std::string& file_path) {
78  boost::filesystem::path boost_file_path{file_path};
80  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
81  return ec ? 0 : filesize;
82 }
83 
84 namespace {
85 
87  void operator()(OGRDataSource* datasource) {
88  if (datasource) {
89  GDALClose(datasource);
90  }
91  }
92 };
93 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
94 
96  void operator()(OGRFeature* feature) {
97  if (feature) {
98  OGRFeature::DestroyFeature(feature);
99  }
100  }
101 };
102 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
103 
105  void operator()(OGRSpatialReference* ref) {
106  if (ref) {
107  OGRSpatialReference::DestroySpatialReference(ref);
108  }
109  }
110 };
112  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
113 
114 } // namespace
115 
116 // For logging std::vector<std::string> row.
117 namespace boost {
118 namespace log {
119 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
120  out << '[';
121  for (size_t i = 0; i < row.size(); ++i) {
122  out << (i ? ", " : "") << row[i];
123  }
124  out << ']';
125  return out;
126 }
127 } // namespace log
128 } // namespace boost
129 
130 namespace Importer_NS {
131 
132 using FieldNameToIndexMapType = std::map<std::string, size_t>;
133 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
135  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
136 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
137 
138 #define DEBUG_TIMING false
139 #define DEBUG_RENDER_GROUP_ANALYZER 0
140 #define DEBUG_AWS_AUTHENTICATION 0
141 
142 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
143 
144 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
145 
147 static std::map<std::string, ImportStatus> import_status_map;
148 
149 Importer::Importer(Catalog_Namespace::Catalog& c,
150  const TableDescriptor* t,
151  const std::string& f,
152  const CopyParams& p)
153  : Importer(new Loader(c, t), f, p) {}
154 
155 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
195 
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }
207 
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
212 
213 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
217  import_status_map[import_id] = is;
218 }
219 
220 static const std::string trim_space(const char* field, const size_t len) {
221  size_t i = 0;
222  size_t j = len;
223  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
224  i++;
225  }
226  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
227  j--;
228  }
229  return std::string(field + i, j - i);
230 }
231 
233  Datum d;
234  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
235  switch (type) {
236  case kBOOLEAN:
238  break;
239  case kBIGINT:
241  break;
242  case kINT:
244  break;
245  case kSMALLINT:
247  break;
248  case kTINYINT:
250  break;
251  case kFLOAT:
252  d.floatval = NULL_FLOAT;
253  break;
254  case kDOUBLE:
256  break;
257  case kTIME:
258  case kTIMESTAMP:
259  case kDATE:
261  break;
262  case kPOINT:
263  case kLINESTRING:
264  case kPOLYGON:
265  case kMULTIPOLYGON:
266  throw std::runtime_error("Internal error: geometry type in NullDatum.");
267  default:
268  throw std::runtime_error("Internal error: invalid type in NullDatum.");
269  }
270  return d;
271 }
272 
274  Datum d;
275  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
276  switch (type) {
277  case kBOOLEAN:
279  break;
280  case kBIGINT:
282  break;
283  case kINT:
285  break;
286  case kSMALLINT:
288  break;
289  case kTINYINT:
291  break;
292  case kFLOAT:
294  break;
295  case kDOUBLE:
297  break;
298  case kTIME:
299  case kTIMESTAMP:
300  case kDATE:
302  break;
303  case kPOINT:
304  case kLINESTRING:
305  case kPOLYGON:
306  case kMULTIPOLYGON:
307  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
308  default:
309  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
310  }
311  return d;
312 }
313 
314 ArrayDatum StringToArray(const std::string& s,
315  const SQLTypeInfo& ti,
316  const CopyParams& copy_params) {
317  SQLTypeInfo elem_ti = ti.get_elem_type();
318  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
319  return ArrayDatum(0, NULL, true);
320  }
321  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
322  LOG(WARNING) << "Malformed array: " << s;
323  return ArrayDatum(0, NULL, true);
324  }
325  std::vector<std::string> elem_strs;
326  size_t last = 1;
327  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
328  i = s.find(copy_params.array_delim, last)) {
329  elem_strs.push_back(s.substr(last, i - last));
330  last = i + 1;
331  }
332  if (last + 1 <= s.size()) {
333  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
334  }
335  if (elem_strs.size() == 1) {
336  auto str = elem_strs.front();
337  auto str_trimmed = trim_space(str.c_str(), str.length());
338  if (str_trimmed == "") {
339  elem_strs.clear(); // Empty array
340  }
341  }
342  if (!elem_ti.is_string()) {
343  size_t len = elem_strs.size() * elem_ti.get_size();
344  int8_t* buf = (int8_t*)checked_malloc(len);
345  int8_t* p = buf;
346  for (auto& es : elem_strs) {
347  auto e = trim_space(es.c_str(), es.length());
348  bool is_null = (e == copy_params.null_str) || e == "NULL";
349  if (!elem_ti.is_string() && e == "") {
350  is_null = true;
351  }
352  if (elem_ti.is_number() || elem_ti.is_time()) {
353  if (!isdigit(e[0]) && e[0] != '-') {
354  is_null = true;
355  }
356  }
357  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
358  p = appendDatum(p, d, elem_ti);
359  }
360  return ArrayDatum(len, buf, false);
361  }
362  // must not be called for array of strings
363  CHECK(false);
364  return ArrayDatum(0, NULL, true);
365 }
366 
368  SQLTypeInfo elem_ti = ti.get_elem_type();
369  auto len = ti.get_size();
370 
371  if (elem_ti.is_string()) {
372  // must not be called for array of strings
373  CHECK(false);
374  return ArrayDatum(0, NULL, true);
375  }
376 
377  if (len > 0) {
378  // Compose a NULL fixlen array
379  int8_t* buf = (int8_t*)checked_malloc(len);
380  // First scalar is a NULL_ARRAY sentinel
381  Datum d = NullArrayDatum(elem_ti);
382  int8_t* p = appendDatum(buf, d, elem_ti);
383  // Rest is filled with normal NULL sentinels
384  Datum d0 = NullDatum(elem_ti);
385  while ((p - buf) < len) {
386  p = appendDatum(p, d0, elem_ti);
387  }
388  CHECK((p - buf) == len);
389  return ArrayDatum(len, buf, true);
390  }
391  // NULL varlen array
392  return ArrayDatum(0, NULL, true);
393 }
394 
396  return NullArray(ti);
397 }
398 
399 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
400  const auto& arr = datum.val.arr_val;
401  for (const auto& elem_datum : arr) {
402  string_vec.push_back(elem_datum.val.str_val);
403  }
404 }
405 
406 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
407  Datum d;
408  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
409  switch (type) {
410  case kBOOLEAN:
411  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
412  break;
413  case kBIGINT:
414  d.bigintval =
415  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
416  break;
417  case kINT:
418  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
419  break;
420  case kSMALLINT:
421  d.smallintval =
422  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
423  break;
424  case kTINYINT:
425  d.tinyintval =
426  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
427  break;
428  case kFLOAT:
429  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
430  break;
431  case kDOUBLE:
432  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
433  break;
434  case kTIME:
435  case kTIMESTAMP:
436  case kDATE:
437  d.bigintval =
438  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
439  break;
440  case kPOINT:
441  case kLINESTRING:
442  case kPOLYGON:
443  case kMULTIPOLYGON:
444  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
445  default:
446  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
447  }
448  return d;
449 }
450 
451 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
452  SQLTypeInfo elem_ti = ti.get_elem_type();
453 
454  CHECK(!elem_ti.is_string());
455 
456  if (datum.is_null) {
457  return NullArray(ti);
458  }
459 
460  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
461  int8_t* buf = (int8_t*)checked_malloc(len);
462  int8_t* p = buf;
463  for (auto& e : datum.val.arr_val) {
464  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
465  }
466 
467  return ArrayDatum(len, buf, false);
468 }
469 
470 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
471  CHECK(string_dict_);
472  std::vector<std::string_view> string_view_vec;
473  string_view_vec.reserve(string_vec.size());
474  for (const auto& str : string_vec) {
475  if (str.size() > StringDictionary::MAX_STRLEN) {
476  throw std::runtime_error("String too long for dictionary encoding.");
477  }
478  string_view_vec.push_back(str);
479  }
480  switch (column_desc_->columnType.get_size()) {
481  case 1:
482  string_dict_i8_buffer_->resize(string_view_vec.size());
483  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
484  break;
485  case 2:
486  string_dict_i16_buffer_->resize(string_view_vec.size());
487  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
488  break;
489  case 4:
490  string_dict_i32_buffer_->resize(string_view_vec.size());
491  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
492  break;
493  default:
494  CHECK(false);
495  }
496 }
497 
499  const std::string_view val,
500  const bool is_null,
501  const CopyParams& copy_params,
502  const int64_t replicate_count) {
503  set_replicate_count(replicate_count);
504  const auto type = cd->columnType.get_type();
505  switch (type) {
506  case kBOOLEAN: {
507  if (is_null) {
508  if (cd->columnType.get_notnull()) {
509  throw std::runtime_error("NULL for column " + cd->columnName);
510  }
512  } else {
513  auto ti = cd->columnType;
514  Datum d = StringToDatum(val, ti);
515  addBoolean(static_cast<int8_t>(d.boolval));
516  }
517  break;
518  }
519  case kTINYINT: {
520  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
521  auto ti = cd->columnType;
522  Datum d = StringToDatum(val, ti);
523  addTinyint(d.tinyintval);
524  } else {
525  if (cd->columnType.get_notnull()) {
526  throw std::runtime_error("NULL for column " + cd->columnName);
527  }
529  }
530  break;
531  }
532  case kSMALLINT: {
533  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
534  auto ti = cd->columnType;
535  Datum d = StringToDatum(val, ti);
536  addSmallint(d.smallintval);
537  } else {
538  if (cd->columnType.get_notnull()) {
539  throw std::runtime_error("NULL for column " + cd->columnName);
540  }
541  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
542  }
543  break;
544  }
545  case kINT: {
546  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
547  auto ti = cd->columnType;
548  Datum d = StringToDatum(val, ti);
549  addInt(d.intval);
550  } else {
551  if (cd->columnType.get_notnull()) {
552  throw std::runtime_error("NULL for column " + cd->columnName);
553  }
555  }
556  break;
557  }
558  case kBIGINT: {
559  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
560  auto ti = cd->columnType;
561  Datum d = StringToDatum(val, ti);
562  addBigint(d.bigintval);
563  } else {
564  if (cd->columnType.get_notnull()) {
565  throw std::runtime_error("NULL for column " + cd->columnName);
566  }
568  }
569  break;
570  }
571  case kDECIMAL:
572  case kNUMERIC: {
573  if (!is_null) {
574  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
575  Datum d = StringToDatum(val, ti);
576  const auto converted_decimal_value =
578  addBigint(converted_decimal_value);
579  } else {
580  if (cd->columnType.get_notnull()) {
581  throw std::runtime_error("NULL for column " + cd->columnName);
582  }
584  }
585  break;
586  }
587  case kFLOAT:
588  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
589  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
590  } else {
591  if (cd->columnType.get_notnull()) {
592  throw std::runtime_error("NULL for column " + cd->columnName);
593  }
594  addFloat(NULL_FLOAT);
595  }
596  break;
597  case kDOUBLE:
598  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
599  addDouble(std::atof(std::string(val).c_str()));
600  } else {
601  if (cd->columnType.get_notnull()) {
602  throw std::runtime_error("NULL for column " + cd->columnName);
603  }
604  addDouble(NULL_DOUBLE);
605  }
606  break;
607  case kTEXT:
608  case kVARCHAR:
609  case kCHAR: {
610  // @TODO(wei) for now, use empty string for nulls
611  if (is_null) {
612  if (cd->columnType.get_notnull()) {
613  throw std::runtime_error("NULL for column " + cd->columnName);
614  }
615  addString(std::string());
616  } else {
617  if (val.length() > StringDictionary::MAX_STRLEN) {
618  throw std::runtime_error("String too long for column " + cd->columnName +
619  " was " + std::to_string(val.length()) + " max is " +
621  }
622  addString(val);
623  }
624  break;
625  }
626  case kTIME:
627  case kTIMESTAMP:
628  case kDATE:
629  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
630  SQLTypeInfo ti = cd->columnType;
631  Datum d = StringToDatum(val, ti);
632  addBigint(d.bigintval);
633  } else {
634  if (cd->columnType.get_notnull()) {
635  throw std::runtime_error("NULL for column " + cd->columnName);
636  }
638  }
639  break;
640  case kARRAY: {
641  if (is_null && cd->columnType.get_notnull()) {
642  throw std::runtime_error("NULL for column " + cd->columnName);
643  }
644  SQLTypeInfo ti = cd->columnType;
645  if (IS_STRING(ti.get_subtype())) {
646  std::vector<std::string> string_vec;
647  // Just parse string array, don't push it to buffer yet as we might throw
649  std::string(val), copy_params, string_vec);
650  if (!is_null) {
651  // TODO: add support for NULL string arrays
652  if (ti.get_size() > 0) {
653  auto sti = ti.get_elem_type();
654  size_t expected_size = ti.get_size() / sti.get_size();
655  size_t actual_size = string_vec.size();
656  if (actual_size != expected_size) {
657  throw std::runtime_error("Fixed length array column " + cd->columnName +
658  " expects " + std::to_string(expected_size) +
659  " values, received " +
660  std::to_string(actual_size));
661  }
662  }
663  addStringArray(string_vec);
664  } else {
665  if (ti.get_size() > 0) {
666  // TODO: remove once NULL fixlen arrays are allowed
667  throw std::runtime_error("Fixed length array column " + cd->columnName +
668  " currently cannot accept NULL arrays");
669  }
670  // TODO: add support for NULL string arrays, replace with addStringArray(),
671  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
672  addStringArray(string_vec);
673  }
674  } else {
675  if (!is_null) {
676  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
677  if (d.is_null) { // val could be "NULL"
678  addArray(NullArray(ti));
679  } else {
680  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
681  throw std::runtime_error("Fixed length array for column " + cd->columnName +
682  " has incorrect length: " + std::string(val));
683  }
684  addArray(d);
685  }
686  } else {
687  addArray(NullArray(ti));
688  }
689  }
690  break;
691  }
692  case kPOINT:
693  case kLINESTRING:
694  case kPOLYGON:
695  case kMULTIPOLYGON:
696  addGeoString(val);
697  break;
698  default:
699  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
700  }
701 }
702 
704  const auto type = column_desc_->columnType.is_decimal()
705  ? decimal_to_int_type(column_desc_->columnType)
706  : column_desc_->columnType.get_type();
707  switch (type) {
708  case kBOOLEAN:
709  bool_buffer_->pop_back();
710  break;
711  case kTINYINT:
712  tinyint_buffer_->pop_back();
713  break;
714  case kSMALLINT:
715  smallint_buffer_->pop_back();
716  break;
717  case kINT:
718  int_buffer_->pop_back();
719  break;
720  case kBIGINT:
721  bigint_buffer_->pop_back();
722  break;
723  case kFLOAT:
724  float_buffer_->pop_back();
725  break;
726  case kDOUBLE:
727  double_buffer_->pop_back();
728  break;
729  case kTEXT:
730  case kVARCHAR:
731  case kCHAR:
732  string_buffer_->pop_back();
733  break;
734  case kDATE:
735  case kTIME:
736  case kTIMESTAMP:
737  bigint_buffer_->pop_back();
738  break;
739  case kARRAY:
740  if (IS_STRING(column_desc_->columnType.get_subtype())) {
741  string_array_buffer_->pop_back();
742  } else {
743  array_buffer_->pop_back();
744  }
745  break;
746  case kPOINT:
747  case kLINESTRING:
748  case kPOLYGON:
749  case kMULTIPOLYGON:
750  geo_string_buffer_->pop_back();
751  break;
752  default:
753  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
754  }
755 }
756 
757 struct GeoImportException : std::runtime_error {
758  using std::runtime_error::runtime_error;
759 };
760 
761 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
762 template <typename DATA_TYPE>
764  const ColumnDescriptor* cd,
765  const Array& array,
766  std::vector<DATA_TYPE>& buffer,
767  const ArraySliceRange& slice_range,
768  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
769  auto data =
770  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
771  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
772  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
773  if (bad_rows_tracker && cd->columnType.is_geometry()) {
774  f_add_geo_phy_cols = [&](const int64_t row) {
775  // Populate physical columns (ref. DBHandler::load_table)
776  std::vector<double> coords, bounds;
777  std::vector<int> ring_sizes, poly_rings;
778  int render_group = 0;
779  SQLTypeInfo ti;
780  // replace any unexpected exception from getGeoColumns or other
781  // on this path with a GeoImportException so that we wont over
782  // push a null to the logical column...
783  try {
784  SQLTypeInfo import_ti{ti};
785  if (array.IsNull(row)) {
787  import_ti, coords, bounds, ring_sizes, poly_rings, false);
788  } else {
789  arrow_throw_if<GeoImportException>(
790  !Geo_namespace::GeoTypesFactory::getGeoColumns(geo_string_buffer_->back(),
791  ti,
792  coords,
793  bounds,
794  ring_sizes,
795  poly_rings,
796  false),
797  error_context(cd, bad_rows_tracker) + "Invalid geometry");
798  arrow_throw_if<GeoImportException>(
799  cd->columnType.get_type() != ti.get_type(),
800  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
801  }
802  auto col_idx_workpad = col_idx; // what a pitfall!!
804  bad_rows_tracker->importer->getCatalog(),
805  cd,
806  *import_buffers,
807  col_idx_workpad,
808  coords,
809  bounds,
810  ring_sizes,
811  poly_rings,
812  render_group);
813  } catch (GeoImportException&) {
814  throw;
815  } catch (std::runtime_error& e) {
816  throw GeoImportException(e.what());
817  } catch (const std::exception& e) {
818  throw GeoImportException(e.what());
819  } catch (...) {
820  throw GeoImportException("unknown exception");
821  }
822  };
823  }
824  auto f_mark_a_bad_row = [&](const auto row) {
825  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
826  bad_rows_tracker->rows.insert(row - slice_range.first);
827  };
828  buffer.reserve(slice_range.second - slice_range.first);
829  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
830  try {
831  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
832  f_add_geo_phy_cols(row);
833  } catch (GeoImportException&) {
834  f_mark_a_bad_row(row);
835  } catch (ArrowImporterException&) {
836  // trace bad rows of each column; otherwise rethrow.
837  if (bad_rows_tracker) {
838  *data << nullptr;
839  f_mark_a_bad_row(row);
840  } else {
841  throw;
842  }
843  }
844  }
845  return buffer.size();
846 }
847 
849  const Array& col,
850  const bool exact_type_match,
851  const ArraySliceRange& slice_range,
852  BadRowsTracker* const bad_rows_tracker) {
853  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
854  : cd->columnType.get_type();
855  if (cd->columnType.get_notnull()) {
856  // We can't have any null values for this column; to have them is an error
857  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
858  }
859 
860  switch (type) {
861  case kBOOLEAN:
862  if (exact_type_match) {
863  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
864  }
865  return convert_arrow_val_to_import_buffer(
866  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
867  case kTINYINT:
868  if (exact_type_match) {
869  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
870  }
871  return convert_arrow_val_to_import_buffer(
872  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
873  case kSMALLINT:
874  if (exact_type_match) {
875  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
876  }
877  return convert_arrow_val_to_import_buffer(
878  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
879  case kINT:
880  if (exact_type_match) {
881  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
882  }
883  return convert_arrow_val_to_import_buffer(
884  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
885  case kBIGINT:
886  if (exact_type_match) {
887  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
888  }
889  return convert_arrow_val_to_import_buffer(
890  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
891  case kFLOAT:
892  if (exact_type_match) {
893  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
894  }
895  return convert_arrow_val_to_import_buffer(
896  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
897  case kDOUBLE:
898  if (exact_type_match) {
899  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
900  }
901  return convert_arrow_val_to_import_buffer(
902  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
903  case kTEXT:
904  case kVARCHAR:
905  case kCHAR:
906  if (exact_type_match) {
907  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
908  "Expected string type");
909  }
910  return convert_arrow_val_to_import_buffer(
911  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
912  case kTIME:
913  if (exact_type_match) {
914  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
915  "Expected time32 or time64 type");
916  }
917  return convert_arrow_val_to_import_buffer(
918  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
919  case kTIMESTAMP:
920  if (exact_type_match) {
921  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
922  }
923  return convert_arrow_val_to_import_buffer(
924  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
925  case kDATE:
926  if (exact_type_match) {
927  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
928  "Expected date32 or date64 type");
929  }
930  return convert_arrow_val_to_import_buffer(
931  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
932  case kPOINT:
933  case kLINESTRING:
934  case kPOLYGON:
935  case kMULTIPOLYGON:
936  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
937  "Expected string type");
938  return convert_arrow_val_to_import_buffer(
939  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
940  case kARRAY:
941  throw std::runtime_error("Arrow array appends not yet supported");
942  default:
943  throw std::runtime_error("Invalid Type");
944  }
945 }
946 
947 // this is exclusively used by load_table_binary_columnar
948 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
949  size_t dataSize = 0;
950  if (cd->columnType.get_notnull()) {
951  // We can't have any null values for this column; to have them is an error
952  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
953  throw std::runtime_error("NULL for column " + cd->columnName);
954  }
955  }
956 
957  switch (cd->columnType.get_type()) {
958  case kBOOLEAN: {
959  dataSize = col.data.int_col.size();
960  bool_buffer_->reserve(dataSize);
961  for (size_t i = 0; i < dataSize; i++) {
962  if (col.nulls[i]) {
963  bool_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
964  } else {
965  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
966  }
967  }
968  break;
969  }
970  case kTINYINT: {
971  dataSize = col.data.int_col.size();
972  tinyint_buffer_->reserve(dataSize);
973  for (size_t i = 0; i < dataSize; i++) {
974  if (col.nulls[i]) {
975  tinyint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
976  } else {
977  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
978  }
979  }
980  break;
981  }
982  case kSMALLINT: {
983  dataSize = col.data.int_col.size();
984  smallint_buffer_->reserve(dataSize);
985  for (size_t i = 0; i < dataSize; i++) {
986  if (col.nulls[i]) {
987  smallint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
988  } else {
989  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
990  }
991  }
992  break;
993  }
994  case kINT: {
995  dataSize = col.data.int_col.size();
996  int_buffer_->reserve(dataSize);
997  for (size_t i = 0; i < dataSize; i++) {
998  if (col.nulls[i]) {
999  int_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1000  } else {
1001  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1002  }
1003  }
1004  break;
1005  }
1006  case kBIGINT:
1007  case kNUMERIC:
1008  case kDECIMAL: {
1009  dataSize = col.data.int_col.size();
1010  bigint_buffer_->reserve(dataSize);
1011  for (size_t i = 0; i < dataSize; i++) {
1012  if (col.nulls[i]) {
1013  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1014  } else {
1015  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1016  }
1017  }
1018  break;
1019  }
1020  case kFLOAT: {
1021  dataSize = col.data.real_col.size();
1022  float_buffer_->reserve(dataSize);
1023  for (size_t i = 0; i < dataSize; i++) {
1024  if (col.nulls[i]) {
1025  float_buffer_->push_back(NULL_FLOAT);
1026  } else {
1027  float_buffer_->push_back((float)col.data.real_col[i]);
1028  }
1029  }
1030  break;
1031  }
1032  case kDOUBLE: {
1033  dataSize = col.data.real_col.size();
1034  double_buffer_->reserve(dataSize);
1035  for (size_t i = 0; i < dataSize; i++) {
1036  if (col.nulls[i]) {
1037  double_buffer_->push_back(NULL_DOUBLE);
1038  } else {
1039  double_buffer_->push_back((double)col.data.real_col[i]);
1040  }
1041  }
1042  break;
1043  }
1044  case kTEXT:
1045  case kVARCHAR:
1046  case kCHAR: {
1047  // TODO: for now, use empty string for nulls
1048  dataSize = col.data.str_col.size();
1049  string_buffer_->reserve(dataSize);
1050  for (size_t i = 0; i < dataSize; i++) {
1051  if (col.nulls[i]) {
1052  string_buffer_->push_back(std::string());
1053  } else {
1054  string_buffer_->push_back(col.data.str_col[i]);
1055  }
1056  }
1057  break;
1058  }
1059  case kTIME:
1060  case kTIMESTAMP:
1061  case kDATE: {
1062  dataSize = col.data.int_col.size();
1063  bigint_buffer_->reserve(dataSize);
1064  for (size_t i = 0; i < dataSize; i++) {
1065  if (col.nulls[i]) {
1066  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1067  } else {
1068  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1069  }
1070  }
1071  break;
1072  }
1073  case kPOINT:
1074  case kLINESTRING:
1075  case kPOLYGON:
1076  case kMULTIPOLYGON: {
1077  dataSize = col.data.str_col.size();
1078  geo_string_buffer_->reserve(dataSize);
1079  for (size_t i = 0; i < dataSize; i++) {
1080  if (col.nulls[i]) {
1081  // TODO: add support for NULL geo
1082  geo_string_buffer_->push_back(std::string());
1083  } else {
1084  geo_string_buffer_->push_back(col.data.str_col[i]);
1085  }
1086  }
1087  break;
1088  }
1089  case kARRAY: {
1090  dataSize = col.data.arr_col.size();
1091  if (IS_STRING(cd->columnType.get_subtype())) {
1092  for (size_t i = 0; i < dataSize; i++) {
1093  std::vector<std::string>& string_vec = addStringArray();
1094  if (!col.nulls[i]) {
1095  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1096  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1097  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1098  }
1099  }
1100  }
1101  } else {
1102  auto elem_ti = cd->columnType.get_subtype();
1103  switch (elem_ti) {
1104  case kBOOLEAN: {
1105  for (size_t i = 0; i < dataSize; i++) {
1106  if (col.nulls[i]) {
1107  addArray(NullArray(cd->columnType));
1108  } else {
1109  size_t len = col.data.arr_col[i].data.int_col.size();
1110  size_t byteSize = len * sizeof(int8_t);
1111  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1112  int8_t* p = buf;
1113  for (size_t j = 0; j < len; ++j) {
1114  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1115  p += sizeof(bool);
1116  }
1117  addArray(ArrayDatum(byteSize, buf, false));
1118  }
1119  }
1120  break;
1121  }
1122  case kTINYINT: {
1123  for (size_t i = 0; i < dataSize; i++) {
1124  if (col.nulls[i]) {
1125  addArray(NullArray(cd->columnType));
1126  } else {
1127  size_t len = col.data.arr_col[i].data.int_col.size();
1128  size_t byteSize = len * sizeof(int8_t);
1129  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1130  int8_t* p = buf;
1131  for (size_t j = 0; j < len; ++j) {
1132  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1133  p += sizeof(int8_t);
1134  }
1135  addArray(ArrayDatum(byteSize, buf, false));
1136  }
1137  }
1138  break;
1139  }
1140  case kSMALLINT: {
1141  for (size_t i = 0; i < dataSize; i++) {
1142  if (col.nulls[i]) {
1143  addArray(NullArray(cd->columnType));
1144  } else {
1145  size_t len = col.data.arr_col[i].data.int_col.size();
1146  size_t byteSize = len * sizeof(int16_t);
1147  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1148  int8_t* p = buf;
1149  for (size_t j = 0; j < len; ++j) {
1150  *(int16_t*)p =
1151  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1152  p += sizeof(int16_t);
1153  }
1154  addArray(ArrayDatum(byteSize, buf, false));
1155  }
1156  }
1157  break;
1158  }
1159  case kINT: {
1160  for (size_t i = 0; i < dataSize; i++) {
1161  if (col.nulls[i]) {
1162  addArray(NullArray(cd->columnType));
1163  } else {
1164  size_t len = col.data.arr_col[i].data.int_col.size();
1165  size_t byteSize = len * sizeof(int32_t);
1166  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1167  int8_t* p = buf;
1168  for (size_t j = 0; j < len; ++j) {
1169  *(int32_t*)p =
1170  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1171  p += sizeof(int32_t);
1172  }
1173  addArray(ArrayDatum(byteSize, buf, false));
1174  }
1175  }
1176  break;
1177  }
1178  case kBIGINT:
1179  case kNUMERIC:
1180  case kDECIMAL: {
1181  for (size_t i = 0; i < dataSize; i++) {
1182  if (col.nulls[i]) {
1183  addArray(NullArray(cd->columnType));
1184  } else {
1185  size_t len = col.data.arr_col[i].data.int_col.size();
1186  size_t byteSize = len * sizeof(int64_t);
1187  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1188  int8_t* p = buf;
1189  for (size_t j = 0; j < len; ++j) {
1190  *(int64_t*)p =
1191  static_cast<int64_t>(col.data.arr_col[j].data.int_col[j]);
1192  p += sizeof(int64_t);
1193  }
1194  addArray(ArrayDatum(byteSize, buf, false));
1195  }
1196  }
1197  break;
1198  }
1199  case kFLOAT: {
1200  for (size_t i = 0; i < dataSize; i++) {
1201  if (col.nulls[i]) {
1202  addArray(NullArray(cd->columnType));
1203  } else {
1204  size_t len = col.data.arr_col[i].data.real_col.size();
1205  size_t byteSize = len * sizeof(float);
1206  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1207  int8_t* p = buf;
1208  for (size_t j = 0; j < len; ++j) {
1209  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1210  p += sizeof(float);
1211  }
1212  addArray(ArrayDatum(byteSize, buf, false));
1213  }
1214  }
1215  break;
1216  }
1217  case kDOUBLE: {
1218  for (size_t i = 0; i < dataSize; i++) {
1219  if (col.nulls[i]) {
1220  addArray(NullArray(cd->columnType));
1221  } else {
1222  size_t len = col.data.arr_col[i].data.real_col.size();
1223  size_t byteSize = len * sizeof(double);
1224  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1225  int8_t* p = buf;
1226  for (size_t j = 0; j < len; ++j) {
1227  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1228  p += sizeof(double);
1229  }
1230  addArray(ArrayDatum(byteSize, buf, false));
1231  }
1232  }
1233  break;
1234  }
1235  case kTIME:
1236  case kTIMESTAMP:
1237  case kDATE: {
1238  for (size_t i = 0; i < dataSize; i++) {
1239  if (col.nulls[i]) {
1240  addArray(NullArray(cd->columnType));
1241  } else {
1242  size_t len = col.data.arr_col[i].data.int_col.size();
1243  size_t byteWidth = sizeof(int64_t);
1244  size_t byteSize = len * byteWidth;
1245  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1246  int8_t* p = buf;
1247  for (size_t j = 0; j < len; ++j) {
1248  *reinterpret_cast<int64_t*>(p) =
1249  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1250  p += sizeof(int64_t);
1251  }
1252  addArray(ArrayDatum(byteSize, buf, false));
1253  }
1254  }
1255  break;
1256  }
1257  default:
1258  throw std::runtime_error("Invalid Array Type");
1259  }
1260  }
1261  break;
1262  }
1263  default:
1264  throw std::runtime_error("Invalid Type");
1265  }
1266  return dataSize;
1267 }
1268 
1270  const TDatum& datum,
1271  const bool is_null,
1272  const int64_t replicate_count) {
1273  set_replicate_count(replicate_count);
1274  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1275  : cd->columnType.get_type();
1276  switch (type) {
1277  case kBOOLEAN: {
1278  if (is_null) {
1279  if (cd->columnType.get_notnull()) {
1280  throw std::runtime_error("NULL for column " + cd->columnName);
1281  }
1282  addBoolean(inline_fixed_encoding_null_val(cd->columnType));
1283  } else {
1284  addBoolean((int8_t)datum.val.int_val);
1285  }
1286  break;
1287  }
1288  case kTINYINT:
1289  if (!is_null) {
1290  addTinyint((int8_t)datum.val.int_val);
1291  } else {
1292  if (cd->columnType.get_notnull()) {
1293  throw std::runtime_error("NULL for column " + cd->columnName);
1294  }
1295  addTinyint(inline_fixed_encoding_null_val(cd->columnType));
1296  }
1297  break;
1298  case kSMALLINT:
1299  if (!is_null) {
1300  addSmallint((int16_t)datum.val.int_val);
1301  } else {
1302  if (cd->columnType.get_notnull()) {
1303  throw std::runtime_error("NULL for column " + cd->columnName);
1304  }
1305  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
1306  }
1307  break;
1308  case kINT:
1309  if (!is_null) {
1310  addInt((int32_t)datum.val.int_val);
1311  } else {
1312  if (cd->columnType.get_notnull()) {
1313  throw std::runtime_error("NULL for column " + cd->columnName);
1314  }
1316  }
1317  break;
1318  case kBIGINT:
1319  if (!is_null) {
1320  addBigint(datum.val.int_val);
1321  } else {
1322  if (cd->columnType.get_notnull()) {
1323  throw std::runtime_error("NULL for column " + cd->columnName);
1324  }
1326  }
1327  break;
1328  case kFLOAT:
1329  if (!is_null) {
1330  addFloat((float)datum.val.real_val);
1331  } else {
1332  if (cd->columnType.get_notnull()) {
1333  throw std::runtime_error("NULL for column " + cd->columnName);
1334  }
1335  addFloat(NULL_FLOAT);
1336  }
1337  break;
1338  case kDOUBLE:
1339  if (!is_null) {
1340  addDouble(datum.val.real_val);
1341  } else {
1342  if (cd->columnType.get_notnull()) {
1343  throw std::runtime_error("NULL for column " + cd->columnName);
1344  }
1345  addDouble(NULL_DOUBLE);
1346  }
1347  break;
1348  case kTEXT:
1349  case kVARCHAR:
1350  case kCHAR: {
1351  // @TODO(wei) for now, use empty string for nulls
1352  if (is_null) {
1353  if (cd->columnType.get_notnull()) {
1354  throw std::runtime_error("NULL for column " + cd->columnName);
1355  }
1356  addString(std::string());
1357  } else {
1358  addString(datum.val.str_val);
1359  }
1360  break;
1361  }
1362  case kTIME:
1363  case kTIMESTAMP:
1364  case kDATE: {
1365  if (!is_null) {
1366  addBigint(datum.val.int_val);
1367  } else {
1368  if (cd->columnType.get_notnull()) {
1369  throw std::runtime_error("NULL for column " + cd->columnName);
1370  }
1372  }
1373  break;
1374  }
1375  case kARRAY:
1376  if (is_null && cd->columnType.get_notnull()) {
1377  throw std::runtime_error("NULL for column " + cd->columnName);
1378  }
1379  if (IS_STRING(cd->columnType.get_subtype())) {
1380  std::vector<std::string>& string_vec = addStringArray();
1381  addBinaryStringArray(datum, string_vec);
1382  } else {
1383  if (!is_null) {
1384  addArray(TDatumToArrayDatum(datum, cd->columnType));
1385  } else {
1386  addArray(NullArray(cd->columnType));
1387  }
1388  }
1389  break;
1390  case kPOINT:
1391  case kLINESTRING:
1392  case kPOLYGON:
1393  case kMULTIPOLYGON:
1394  if (is_null) {
1395  if (cd->columnType.get_notnull()) {
1396  throw std::runtime_error("NULL for column " + cd->columnName);
1397  }
1398  addGeoString(std::string());
1399  } else {
1400  addGeoString(datum.val.str_val);
1401  }
1402  break;
1403  default:
1404  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1405  }
1406 }
1407 
1408 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1409  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1410  return false;
1411  }
1412  // we don't need to do any coordinate-system transformation
1413  // here (yet) so we don't need to use any OGR API or types
1414  // just use the values directly (assumed to be in 4326)
1415  coords.push_back(lon);
1416  coords.push_back(lat);
1417  return true;
1418 }
1419 
1421  const Catalog_Namespace::Catalog& catalog,
1422  const ColumnDescriptor* cd,
1423  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1424  size_t& col_idx,
1425  std::vector<double>& coords,
1426  std::vector<double>& bounds,
1427  std::vector<int>& ring_sizes,
1428  std::vector<int>& poly_rings,
1429  int render_group,
1430  const int64_t replicate_count) {
1431  const auto col_ti = cd->columnType;
1432  const auto col_type = col_ti.get_type();
1433  auto columnId = cd->columnId;
1434  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1435  bool is_null_geo = false;
1436  bool is_null_point = false;
1437  if (!col_ti.get_notnull()) {
1438  // Check for NULL geo
1439  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1440  is_null_point = true;
1441  coords.clear();
1442  }
1443  is_null_geo = coords.empty();
1444  if (is_null_point) {
1445  coords.push_back(NULL_ARRAY_DOUBLE);
1446  coords.push_back(NULL_DOUBLE);
1447  // Treating POINT coords as notnull, need to store actual encoding
1448  // [un]compressed+[not]null
1449  is_null_geo = false;
1450  }
1451  }
1452  std::vector<TDatum> td_coords_data;
1453  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1454  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1455  // in a fixlen array, compressed and uncompressed.
1456  if (!is_null_geo) {
1457  std::vector<uint8_t> compressed_coords = geospatial::compress_coords(coords, col_ti);
1458  for (auto cc : compressed_coords) {
1459  TDatum td_byte;
1460  td_byte.val.int_val = cc;
1461  td_coords_data.push_back(td_byte);
1462  }
1463  }
1464  TDatum tdd_coords;
1465  tdd_coords.val.arr_val = td_coords_data;
1466  tdd_coords.is_null = is_null_geo;
1467  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1468 
1469  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1470  // Create ring_sizes array value and add it to the physical column
1471  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1472  std::vector<TDatum> td_ring_sizes;
1473  if (!is_null_geo) {
1474  for (auto ring_size : ring_sizes) {
1475  TDatum td_ring_size;
1476  td_ring_size.val.int_val = ring_size;
1477  td_ring_sizes.push_back(td_ring_size);
1478  }
1479  }
1480  TDatum tdd_ring_sizes;
1481  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1482  tdd_ring_sizes.is_null = is_null_geo;
1483  import_buffers[col_idx++]->add_value(
1484  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1485  }
1486 
1487  if (col_type == kMULTIPOLYGON) {
1488  // Create poly_rings array value and add it to the physical column
1489  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1490  std::vector<TDatum> td_poly_rings;
1491  if (!is_null_geo) {
1492  for (auto num_rings : poly_rings) {
1493  TDatum td_num_rings;
1494  td_num_rings.val.int_val = num_rings;
1495  td_poly_rings.push_back(td_num_rings);
1496  }
1497  }
1498  TDatum tdd_poly_rings;
1499  tdd_poly_rings.val.arr_val = td_poly_rings;
1500  tdd_poly_rings.is_null = is_null_geo;
1501  import_buffers[col_idx++]->add_value(
1502  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1503  }
1504 
1505  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1506  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1507  std::vector<TDatum> td_bounds_data;
1508  if (!is_null_geo) {
1509  for (auto b : bounds) {
1510  TDatum td_double;
1511  td_double.val.real_val = b;
1512  td_bounds_data.push_back(td_double);
1513  }
1514  }
1515  TDatum tdd_bounds;
1516  tdd_bounds.val.arr_val = td_bounds_data;
1517  tdd_bounds.is_null = is_null_geo;
1518  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1519  }
1520 
1521  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1522  // Create render_group value and add it to the physical column
1523  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1524  TDatum td_render_group;
1525  td_render_group.val.int_val = render_group;
1526  td_render_group.is_null = is_null_geo;
1527  import_buffers[col_idx++]->add_value(
1528  cd_render_group, td_render_group, false, replicate_count);
1529  }
1530 }
1531 
1533  const Catalog_Namespace::Catalog& catalog,
1534  const ColumnDescriptor* cd,
1535  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1536  size_t& col_idx,
1537  std::vector<std::vector<double>>& coords_column,
1538  std::vector<std::vector<double>>& bounds_column,
1539  std::vector<std::vector<int>>& ring_sizes_column,
1540  std::vector<std::vector<int>>& poly_rings_column,
1541  int render_group,
1542  const int64_t replicate_count) {
1543  const auto col_ti = cd->columnType;
1544  const auto col_type = col_ti.get_type();
1545  auto columnId = cd->columnId;
1546 
1547  auto coords_row_count = coords_column.size();
1548  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1549  for (auto coords : coords_column) {
1550  bool is_null_geo = false;
1551  bool is_null_point = false;
1552  if (!col_ti.get_notnull()) {
1553  // Check for NULL geo
1554  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1555  is_null_point = true;
1556  coords.clear();
1557  }
1558  is_null_geo = coords.empty();
1559  if (is_null_point) {
1560  coords.push_back(NULL_ARRAY_DOUBLE);
1561  coords.push_back(NULL_DOUBLE);
1562  // Treating POINT coords as notnull, need to store actual encoding
1563  // [un]compressed+[not]null
1564  is_null_geo = false;
1565  }
1566  }
1567  std::vector<TDatum> td_coords_data;
1568  if (!is_null_geo) {
1569  std::vector<uint8_t> compressed_coords =
1570  geospatial::compress_coords(coords, col_ti);
1571  for (auto cc : compressed_coords) {
1572  TDatum td_byte;
1573  td_byte.val.int_val = cc;
1574  td_coords_data.push_back(td_byte);
1575  }
1576  }
1577  TDatum tdd_coords;
1578  tdd_coords.val.arr_val = td_coords_data;
1579  tdd_coords.is_null = is_null_geo;
1580  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1581  }
1582  col_idx++;
1583 
1584  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1585  if (ring_sizes_column.size() != coords_row_count) {
1586  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1587  }
1588  // Create ring_sizes array value and add it to the physical column
1589  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1590  for (auto ring_sizes : ring_sizes_column) {
1591  bool is_null_geo = false;
1592  if (!col_ti.get_notnull()) {
1593  // Check for NULL geo
1594  is_null_geo = ring_sizes.empty();
1595  }
1596  std::vector<TDatum> td_ring_sizes;
1597  for (auto ring_size : ring_sizes) {
1598  TDatum td_ring_size;
1599  td_ring_size.val.int_val = ring_size;
1600  td_ring_sizes.push_back(td_ring_size);
1601  }
1602  TDatum tdd_ring_sizes;
1603  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1604  tdd_ring_sizes.is_null = is_null_geo;
1605  import_buffers[col_idx]->add_value(
1606  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1607  }
1608  col_idx++;
1609  }
1610 
1611  if (col_type == kMULTIPOLYGON) {
1612  if (poly_rings_column.size() != coords_row_count) {
1613  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1614  }
1615  // Create poly_rings array value and add it to the physical column
1616  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1617  for (auto poly_rings : poly_rings_column) {
1618  bool is_null_geo = false;
1619  if (!col_ti.get_notnull()) {
1620  // Check for NULL geo
1621  is_null_geo = poly_rings.empty();
1622  }
1623  std::vector<TDatum> td_poly_rings;
1624  for (auto num_rings : poly_rings) {
1625  TDatum td_num_rings;
1626  td_num_rings.val.int_val = num_rings;
1627  td_poly_rings.push_back(td_num_rings);
1628  }
1629  TDatum tdd_poly_rings;
1630  tdd_poly_rings.val.arr_val = td_poly_rings;
1631  tdd_poly_rings.is_null = is_null_geo;
1632  import_buffers[col_idx]->add_value(
1633  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1634  }
1635  col_idx++;
1636  }
1637 
1638  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1639  if (bounds_column.size() != coords_row_count) {
1640  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1641  }
1642  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1643  for (auto bounds : bounds_column) {
1644  bool is_null_geo = false;
1645  if (!col_ti.get_notnull()) {
1646  // Check for NULL geo
1647  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1648  }
1649  std::vector<TDatum> td_bounds_data;
1650  for (auto b : bounds) {
1651  TDatum td_double;
1652  td_double.val.real_val = b;
1653  td_bounds_data.push_back(td_double);
1654  }
1655  TDatum tdd_bounds;
1656  tdd_bounds.val.arr_val = td_bounds_data;
1657  tdd_bounds.is_null = is_null_geo;
1658  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1659  }
1660  col_idx++;
1661  }
1662 
1663  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1664  // Create render_group value and add it to the physical column
1665  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1666  TDatum td_render_group;
1667  td_render_group.val.int_val = render_group;
1668  td_render_group.is_null = false;
1669  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1670  import_buffers[col_idx]->add_value(
1671  cd_render_group, td_render_group, false, replicate_count);
1672  }
1673  col_idx++;
1674  }
1675 }
1676 
1677 namespace {
1678 
1679 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1680  const std::list<const ColumnDescriptor*>& col_descs) {
1681  // validate the columns
1682  // for now we can only explode into a single destination column
1683  // which must be of the child type (POLYGON, LINESTRING, POINT)
1684  int collection_col_idx = -1;
1685  int col_idx = 0;
1686  std::string collection_col_name;
1687  SQLTypes collection_child_type = kNULLT;
1688  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1689  auto const& cd = *cd_it;
1690  auto const col_type = cd->columnType.get_type();
1691  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1692  if (collection_col_idx >= 0) {
1693  throw std::runtime_error(
1694  "Explode Collections: Found more than one destination column");
1695  }
1696  collection_col_idx = col_idx;
1697  collection_child_type = col_type;
1698  collection_col_name = cd->columnName;
1699  }
1700  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1701  ++cd_it;
1702  }
1703  col_idx++;
1704  }
1705  if (collection_col_idx < 0) {
1706  throw std::runtime_error(
1707  "Explode Collections: Failed to find a supported column type to explode "
1708  "into");
1709  }
1710  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1711 }
1712 
1714  OGRGeometry* ogr_geometry,
1715  SQLTypes collection_child_type,
1716  const std::string& collection_col_name,
1717  size_t row_or_feature_idx,
1718  std::function<void(OGRGeometry*)> execute_import_lambda) {
1719  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1720  bool is_collection = false;
1721  switch (collection_child_type) {
1722  case kPOINT:
1723  switch (ogr_geometry_type) {
1724  case wkbMultiPoint:
1725  is_collection = true;
1726  break;
1727  case wkbPoint:
1728  break;
1729  default:
1730  throw std::runtime_error(
1731  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1732  }
1733  break;
1734  case kLINESTRING:
1735  switch (ogr_geometry_type) {
1736  case wkbMultiLineString:
1737  is_collection = true;
1738  break;
1739  case wkbLineString:
1740  break;
1741  default:
1742  throw std::runtime_error(
1743  "Explode Collections: Source geo type must be MULTILINESTRING or "
1744  "LINESTRING");
1745  }
1746  break;
1747  case kPOLYGON:
1748  switch (ogr_geometry_type) {
1749  case wkbMultiPolygon:
1750  is_collection = true;
1751  break;
1752  case wkbPolygon:
1753  break;
1754  default:
1755  throw std::runtime_error(
1756  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1757  }
1758  break;
1759  default:
1760  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1761  }
1762 
1763  int64_t us = 0LL;
1764 
1765  // explode or just import
1766  if (is_collection) {
1767  // cast to collection
1768  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1769  CHECK(collection_geometry);
1770 
1771 #if LOG_EXPLODE_COLLECTIONS
1772  // log number of children
1773  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1774  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1775  << " child rows";
1776 #endif
1777 
1778  // loop over children
1779  uint32_t child_geometry_count = 0;
1780  auto child_geometry_it = collection_geometry->begin();
1781  while (child_geometry_it != collection_geometry->end()) {
1782  // get and import this child
1783  OGRGeometry* import_geometry = *child_geometry_it;
1785  [&] { execute_import_lambda(import_geometry); });
1786 
1787  // next child
1788  child_geometry_it++;
1789  child_geometry_count++;
1790  }
1791  } else {
1792  // import non-collection row just once
1794  [&] { execute_import_lambda(ogr_geometry); });
1795  }
1796 
1797  // done
1798  return us;
1799 }
1800 
1801 } // namespace
1802 
1804  int thread_id,
1805  Importer* importer,
1806  std::unique_ptr<char[]> scratch_buffer,
1807  size_t begin_pos,
1808  size_t end_pos,
1809  size_t total_size,
1810  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1811  size_t first_row_index_this_buffer) {
1813  int64_t total_get_row_time_us = 0;
1814  int64_t total_str_to_val_time_us = 0;
1815  CHECK(scratch_buffer);
1816  auto buffer = scratch_buffer.get();
1817  auto load_ms = measure<>::execution([]() {});
1818  auto ms = measure<>::execution([&]() {
1819  const CopyParams& copy_params = importer->get_copy_params();
1820  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1821  size_t begin =
1822  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
1823  const char* thread_buf = buffer + begin_pos + begin;
1824  const char* thread_buf_end = buffer + end_pos;
1825  const char* buf_end = buffer + total_size;
1826  bool try_single_thread = false;
1827  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1828  importer->get_import_buffers(thread_id);
1830  int phys_cols = 0;
1831  int point_cols = 0;
1832  for (const auto cd : col_descs) {
1833  const auto& col_ti = cd->columnType;
1834  phys_cols += col_ti.get_physical_cols();
1835  if (cd->columnType.get_type() == kPOINT) {
1836  point_cols++;
1837  }
1838  }
1839  auto num_cols = col_descs.size() - phys_cols;
1840  for (const auto& p : import_buffers) {
1841  p->clear();
1842  }
1843  std::vector<std::string_view> row;
1844  size_t row_index_plus_one = 0;
1845  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1846  row.clear();
1847  std::vector<std::unique_ptr<char[]>>
1848  tmp_buffers; // holds string w/ removed escape chars, etc
1849  if (DEBUG_TIMING) {
1852  thread_buf_end,
1853  buf_end,
1854  copy_params,
1855  importer->get_is_array(),
1856  row,
1857  tmp_buffers,
1858  try_single_thread);
1859  });
1860  total_get_row_time_us += us;
1861  } else {
1863  thread_buf_end,
1864  buf_end,
1865  copy_params,
1866  importer->get_is_array(),
1867  row,
1868  tmp_buffers,
1869  try_single_thread);
1870  }
1871  row_index_plus_one++;
1872  // Each POINT could consume two separate coords instead of a single WKT
1873  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1874  import_status.rows_rejected++;
1875  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1876  << row.size() << "): " << shared::printContainer(row);
1877  if (import_status.rows_rejected > copy_params.max_reject) {
1878  break;
1879  }
1880  continue;
1881  }
1882 
1883  //
1884  // lambda for importing a row (perhaps multiple times if exploding a collection)
1885  //
1886 
1887  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1888  size_t import_idx = 0;
1889  size_t col_idx = 0;
1890  try {
1891  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1892  auto cd = *cd_it;
1893  const auto& col_ti = cd->columnType;
1894 
1895  bool is_null =
1896  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1897  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1898  // So initially nullness may be missed and not passed to add_value,
1899  // which then might also check and still decide it's actually a NULL, e.g.
1900  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1901  // So "NULL" is not recognized as NULL but then it's not recognized as
1902  // a valid kINT, so it's a NULL after all.
1903  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1904 
1905  // Treating empty as NULL
1906  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1907  is_null = true;
1908  }
1909 
1910  if (col_ti.get_physical_cols() == 0) {
1911  // not geo
1912 
1913  import_buffers[col_idx]->add_value(
1914  cd, row[import_idx], is_null, copy_params);
1915 
1916  // next
1917  ++import_idx;
1918  ++col_idx;
1919  } else {
1920  // geo
1921 
1922  // store null string in the base column
1923  import_buffers[col_idx]->add_value(
1924  cd, copy_params.null_str, true, copy_params);
1925 
1926  // WKT from string we're not storing
1927  auto const& wkt = row[import_idx];
1928 
1929  // next
1930  ++import_idx;
1931  ++col_idx;
1932 
1933  SQLTypes col_type = col_ti.get_type();
1934  CHECK(IS_GEO(col_type));
1935 
1936  std::vector<double> coords;
1937  std::vector<double> bounds;
1938  std::vector<int> ring_sizes;
1939  std::vector<int> poly_rings;
1940  int render_group = 0;
1941 
1942  if (!is_null && col_type == kPOINT && wkt.size() > 0 &&
1943  (wkt[0] == '.' || isdigit(wkt[0]) || wkt[0] == '-')) {
1944  // Invalid WKT, looks more like a scalar.
1945  // Try custom POINT import: from two separate scalars rather than WKT
1946  // string
1947  double lon = std::atof(std::string(wkt).c_str());
1948  double lat = NAN;
1949  auto lat_str = row[import_idx];
1950  ++import_idx;
1951  if (lat_str.size() > 0 &&
1952  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1953  lat = std::atof(std::string(lat_str).c_str());
1954  }
1955  // Swap coordinates if this table uses a reverse order: lat/lon
1956  if (!copy_params.lonlat) {
1957  std::swap(lat, lon);
1958  }
1959  // TODO: should check if POINT column should have been declared with
1960  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1961  // throw std::runtime_error("POINT column " + cd->columnName + " is
1962  // not WGS84, cannot insert lon/lat");
1963  // }
1964  if (!importGeoFromLonLat(lon, lat, coords)) {
1965  throw std::runtime_error(
1966  "Cannot read lon/lat to insert into POINT column " +
1967  cd->columnName);
1968  }
1969  } else {
1970  // import it
1971  SQLTypeInfo import_ti{col_ti};
1972  if (is_null) {
1973  if (col_ti.get_notnull()) {
1974  throw std::runtime_error("NULL geo for column " + cd->columnName);
1975  }
1977  import_ti,
1978  coords,
1979  bounds,
1980  ring_sizes,
1981  poly_rings,
1982  PROMOTE_POLYGON_TO_MULTIPOLYGON);
1983  } else {
1984  if (import_geometry) {
1985  // geometry already exploded
1987  import_geometry,
1988  import_ti,
1989  coords,
1990  bounds,
1991  ring_sizes,
1992  poly_rings,
1993  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
1994  std::string msg =
1995  "Failed to extract valid geometry from exploded row " +
1996  std::to_string(first_row_index_this_buffer +
1997  row_index_plus_one) +
1998  " for column " + cd->columnName;
1999  throw std::runtime_error(msg);
2000  }
2001  } else {
2002  // extract geometry directly from WKT
2004  std::string(wkt),
2005  import_ti,
2006  coords,
2007  bounds,
2008  ring_sizes,
2009  poly_rings,
2010  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2011  std::string msg = "Failed to extract valid geometry from row " +
2012  std::to_string(first_row_index_this_buffer +
2013  row_index_plus_one) +
2014  " for column " + cd->columnName;
2015  throw std::runtime_error(msg);
2016  }
2017  }
2018 
2019  // validate types
2020  if (col_type != import_ti.get_type()) {
2021  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2022  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2023  col_type == SQLTypes::kMULTIPOLYGON)) {
2024  throw std::runtime_error(
2025  "Imported geometry doesn't match the type of column " +
2026  cd->columnName);
2027  }
2028  }
2029  }
2030 
2031  // assign render group?
2032  if (columnIdToRenderGroupAnalyzerMap.size()) {
2033  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2034  if (ring_sizes.size()) {
2035  // get a suitable render group for these poly coords
2036  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2037  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2038  render_group =
2039  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2040  } else {
2041  // empty poly
2042  render_group = -1;
2043  }
2044  }
2045  }
2046  }
2047 
2048  // import extracted geo
2050  cd,
2051  import_buffers,
2052  col_idx,
2053  coords,
2054  bounds,
2055  ring_sizes,
2056  poly_rings,
2057  render_group);
2058 
2059  // skip remaining physical columns
2060  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2061  ++cd_it;
2062  }
2063  }
2064  }
2065  import_status.rows_completed++;
2066  } catch (const std::exception& e) {
2067  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2068  import_buffers[col_idx_to_pop]->pop_value();
2069  }
2070  import_status.rows_rejected++;
2071  LOG(ERROR) << "Input exception thrown: " << e.what()
2072  << ". Row discarded. Data: " << shared::printContainer(row);
2073  }
2074  };
2075 
2076  if (copy_params.geo_explode_collections) {
2077  // explode and import
2078  // @TODO(se) convert to structure-bindings when we can use C++17 here
2079  auto collection_idx_type_name = explode_collections_step1(col_descs);
2080  int collection_col_idx = std::get<0>(collection_idx_type_name);
2081  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2082  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2083  // pull out the collection WKT
2084  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2085  auto const& collection_wkt = row[collection_col_idx];
2086  // convert to OGR
2087  OGRGeometry* ogr_geometry = nullptr;
2088  ScopeGuard destroy_ogr_geometry = [&] {
2089  if (ogr_geometry) {
2090  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2091  }
2092  };
2093  OGRErr ogr_status = OGRGeometryFactory::createFromWkt(
2094  collection_wkt.data(), nullptr, &ogr_geometry);
2095  if (ogr_status != OGRERR_NONE) {
2096  throw std::runtime_error("Failed to convert WKT to geometry");
2097  }
2098  // do the explode and import
2099  us = explode_collections_step2(ogr_geometry,
2100  collection_child_type,
2101  collection_col_name,
2102  first_row_index_this_buffer + row_index_plus_one,
2103  execute_import_row);
2104  } else {
2105  // import non-collection row just once
2107  [&] { execute_import_row(nullptr); });
2108  }
2109  total_str_to_val_time_us += us;
2110  } // end thread
2111  if (import_status.rows_completed > 0) {
2112  load_ms = measure<>::execution(
2113  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2114  }
2115  });
2116  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2117  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2118  << import_status.rows_completed << " rows inserted in "
2119  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2120  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2121  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2122  << "sec" << std::endl;
2123  }
2124 
2125  import_status.thread_id = thread_id;
2126  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2127 
2128  return import_status;
2129 }
2130 
2132  int thread_id,
2133  Importer* importer,
2134  OGRSpatialReference* poGeographicSR,
2135  const FeaturePtrVector& features,
2136  size_t firstFeature,
2137  size_t numFeatures,
2138  const FieldNameToIndexMapType& fieldNameToIndexMap,
2139  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2140  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2142  const CopyParams& copy_params = importer->get_copy_params();
2143  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2144  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2145  importer->get_import_buffers(thread_id);
2146 
2147  for (const auto& p : import_buffers) {
2148  p->clear();
2149  }
2150 
2151  auto convert_timer = timer_start();
2152 
2153  // we create this on the fly based on the first feature's SR
2154  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2155 
2156  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2157  if (!features[iFeature]) {
2158  continue;
2159  }
2160 
2161  // get this feature's geometry
2162  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2163  if (pGeometry) {
2164  // for geodatabase, we need to consider features with no geometry
2165  // as we still want to create a table, even if it has no geo column
2166 
2167  // transform it
2168  // avoid GDAL error if not transformable
2169  auto geometry_sr = pGeometry->getSpatialReference();
2170  if (geometry_sr) {
2171  // create an OGRCoordinateTransformation (CT) on the fly
2172  // we must assume that all geo in this file will have
2173  // the same source SR, so the CT will be valid for all
2174  // transforming to a reusable CT is faster than to an SR
2175  if (coordinate_transformation == nullptr) {
2176  coordinate_transformation.reset(
2177  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2178  if (coordinate_transformation == nullptr) {
2179  throw std::runtime_error(
2180  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2181  }
2182  }
2183  pGeometry->transform(coordinate_transformation.get());
2184  }
2185  }
2186 
2187  //
2188  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2189  //
2190 
2191  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2192  size_t col_idx = 0;
2193  try {
2194  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2195  auto cd = *cd_it;
2196 
2197  // is this a geo column?
2198  const auto& col_ti = cd->columnType;
2199  if (col_ti.is_geometry()) {
2200  // Note that this assumes there is one and only one geo column in the table.
2201  // Currently, the importer only supports reading a single geospatial feature
2202  // from an input shapefile / geojson file, but this code will need to be
2203  // modified if that changes
2204  SQLTypes col_type = col_ti.get_type();
2205 
2206  // store null string in the base column
2207  import_buffers[col_idx]->add_value(
2208  cd, copy_params.null_str, true, copy_params);
2209  ++col_idx;
2210 
2211  // the data we now need to extract for the other columns
2212  std::vector<double> coords;
2213  std::vector<double> bounds;
2214  std::vector<int> ring_sizes;
2215  std::vector<int> poly_rings;
2216  int render_group = 0;
2217 
2218  // extract it
2219  SQLTypeInfo import_ti{col_ti};
2220  bool is_null_geo = !import_geometry;
2221  if (is_null_geo) {
2222  if (col_ti.get_notnull()) {
2223  throw std::runtime_error("NULL geo for column " + cd->columnName);
2224  }
2226  import_ti,
2227  coords,
2228  bounds,
2229  ring_sizes,
2230  poly_rings,
2231  PROMOTE_POLYGON_TO_MULTIPOLYGON);
2232  } else {
2234  import_geometry,
2235  import_ti,
2236  coords,
2237  bounds,
2238  ring_sizes,
2239  poly_rings,
2240  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2241  std::string msg = "Failed to extract valid geometry from feature " +
2242  std::to_string(firstFeature + iFeature + 1) +
2243  " for column " + cd->columnName;
2244  throw std::runtime_error(msg);
2245  }
2246 
2247  // validate types
2248  if (col_type != import_ti.get_type()) {
2249  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2250  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2251  col_type == SQLTypes::kMULTIPOLYGON)) {
2252  throw std::runtime_error(
2253  "Imported geometry doesn't match the type of column " +
2254  cd->columnName);
2255  }
2256  }
2257  }
2258 
2259  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2260  if (ring_sizes.size()) {
2261  // get a suitable render group for these poly coords
2262  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2263  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2264  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2265  } else {
2266  // empty poly
2267  render_group = -1;
2268  }
2269  }
2270 
2271  // create coords array value and add it to the physical column
2272  ++cd_it;
2273  auto cd_coords = *cd_it;
2274  std::vector<TDatum> td_coord_data;
2275  if (!is_null_geo) {
2276  std::vector<uint8_t> compressed_coords =
2277  geospatial::compress_coords(coords, col_ti);
2278  for (auto cc : compressed_coords) {
2279  TDatum td_byte;
2280  td_byte.val.int_val = cc;
2281  td_coord_data.push_back(td_byte);
2282  }
2283  }
2284  TDatum tdd_coords;
2285  tdd_coords.val.arr_val = td_coord_data;
2286  tdd_coords.is_null = is_null_geo;
2287  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2288  ++col_idx;
2289 
2290  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2291  // Create ring_sizes array value and add it to the physical column
2292  ++cd_it;
2293  auto cd_ring_sizes = *cd_it;
2294  std::vector<TDatum> td_ring_sizes;
2295  if (!is_null_geo) {
2296  for (auto ring_size : ring_sizes) {
2297  TDatum td_ring_size;
2298  td_ring_size.val.int_val = ring_size;
2299  td_ring_sizes.push_back(td_ring_size);
2300  }
2301  }
2302  TDatum tdd_ring_sizes;
2303  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2304  tdd_ring_sizes.is_null = is_null_geo;
2305  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2306  ++col_idx;
2307  }
2308 
2309  if (col_type == kMULTIPOLYGON) {
2310  // Create poly_rings array value and add it to the physical column
2311  ++cd_it;
2312  auto cd_poly_rings = *cd_it;
2313  std::vector<TDatum> td_poly_rings;
2314  if (!is_null_geo) {
2315  for (auto num_rings : poly_rings) {
2316  TDatum td_num_rings;
2317  td_num_rings.val.int_val = num_rings;
2318  td_poly_rings.push_back(td_num_rings);
2319  }
2320  }
2321  TDatum tdd_poly_rings;
2322  tdd_poly_rings.val.arr_val = td_poly_rings;
2323  tdd_poly_rings.is_null = is_null_geo;
2324  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2325  ++col_idx;
2326  }
2327 
2328  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2329  col_type == kMULTIPOLYGON) {
2330  // Create bounds array value and add it to the physical column
2331  ++cd_it;
2332  auto cd_bounds = *cd_it;
2333  std::vector<TDatum> td_bounds_data;
2334  if (!is_null_geo) {
2335  for (auto b : bounds) {
2336  TDatum td_double;
2337  td_double.val.real_val = b;
2338  td_bounds_data.push_back(td_double);
2339  }
2340  }
2341  TDatum tdd_bounds;
2342  tdd_bounds.val.arr_val = td_bounds_data;
2343  tdd_bounds.is_null = is_null_geo;
2344  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2345  ++col_idx;
2346  }
2347 
2348  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2349  // Create render_group value and add it to the physical column
2350  ++cd_it;
2351  auto cd_render_group = *cd_it;
2352  TDatum td_render_group;
2353  td_render_group.val.int_val = render_group;
2354  td_render_group.is_null = is_null_geo;
2355  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2356  ++col_idx;
2357  }
2358  } else {
2359  // regular column
2360  // pull from GDAL metadata
2361  const auto cit = columnNameToSourceNameMap.find(cd->columnName);
2362  CHECK(cit != columnNameToSourceNameMap.end());
2363  const std::string& fieldName = cit->second;
2364  const auto fit = fieldNameToIndexMap.find(fieldName);
2365  CHECK(fit != fieldNameToIndexMap.end());
2366  size_t iField = fit->second;
2367  CHECK(iField < fieldNameToIndexMap.size());
2368  std::string fieldContents = features[iFeature]->GetFieldAsString(iField);
2369  import_buffers[col_idx]->add_value(cd, fieldContents, false, copy_params);
2370  ++col_idx;
2371  }
2372  }
2373  import_status.rows_completed++;
2374  } catch (const std::exception& e) {
2375  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2376  import_buffers[col_idx_to_pop]->pop_value();
2377  }
2378  import_status.rows_rejected++;
2379  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2380  }
2381  };
2382 
2383  if (pGeometry && copy_params.geo_explode_collections) {
2384  // explode and import
2385  // @TODO(se) convert to structure-bindings when we can use C++17 here
2386  auto collection_idx_type_name = explode_collections_step1(col_descs);
2387  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2388  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2389  explode_collections_step2(pGeometry,
2390  collection_child_type,
2391  collection_col_name,
2392  firstFeature + iFeature + 1,
2393  execute_import_feature);
2394  } else {
2395  // import non-collection or null feature just once
2396  execute_import_feature(pGeometry);
2397  }
2398  } // end features
2399 
2400  float convert_ms =
2401  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2402  convert_timer)) /
2403  1000.0f;
2404 
2405  float load_ms = 0.0f;
2406  if (import_status.rows_completed > 0) {
2407  auto load_timer = timer_start();
2408  importer->load(import_buffers, import_status.rows_completed);
2409  load_ms =
2410  float(
2411  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2412  load_timer)) /
2413  1000.0f;
2414  }
2415 
2416  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2417  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2418  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2419  }
2420 
2421  import_status.thread_id = thread_id;
2422 
2423  if (DEBUG_TIMING) {
2424  LOG(INFO) << "DEBUG: Total "
2425  << float(timer_stop<std::chrono::steady_clock::time_point,
2426  std::chrono::microseconds>(convert_timer)) /
2427  1000.0f
2428  << "ms";
2429  }
2430 
2431  return import_status;
2432 }
2433 
2435  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2436  size_t row_count) {
2437  return loadImpl(import_buffers, row_count, false);
2438 }
2439 
2440 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2441  size_t row_count) {
2442  return loadImpl(import_buffers, row_count, true);
2443 }
2444 
2445 namespace {
2446 
2447 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2448  const auto& ti = import_buffer.getTypeInfo();
2449  const int8_t* values_buffer{nullptr};
2450  if (ti.is_string()) {
2451  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2452  values_buffer = import_buffer.getStringDictBuffer();
2453  } else {
2454  values_buffer = import_buffer.getAsBytes();
2455  }
2456  CHECK(values_buffer);
2457  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2458  switch (logical_size) {
2459  case 1: {
2460  return values_buffer[index];
2461  }
2462  case 2: {
2463  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2464  }
2465  case 4: {
2466  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2467  }
2468  case 8: {
2469  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2470  }
2471  default:
2472  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2473  }
2474  UNREACHABLE();
2475  return 0;
2476 }
2477 
2478 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2479  const auto& ti = import_buffer.getTypeInfo();
2480  CHECK_EQ(kFLOAT, ti.get_type());
2481  const auto values_buffer = import_buffer.getAsBytes();
2482  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2483 }
2484 
2485 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2486  const auto& ti = import_buffer.getTypeInfo();
2487  CHECK_EQ(kDOUBLE, ti.get_type());
2488  const auto values_buffer = import_buffer.getAsBytes();
2489  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2490 }
2491 
2492 } // namespace
2493 
2494 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2495  std::vector<size_t>& all_shard_row_counts,
2496  const OneShardBuffers& import_buffers,
2497  const size_t row_count,
2498  const size_t shard_count) {
2499  all_shard_row_counts.resize(shard_count);
2500  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2501  all_shard_import_buffers.emplace_back();
2502  for (const auto& typed_import_buffer : import_buffers) {
2503  all_shard_import_buffers.back().emplace_back(
2504  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2505  typed_import_buffer->getStringDictionary()));
2506  }
2507  }
2508  CHECK_GT(table_desc_->shardedColumnId, 0);
2509  int col_idx{0};
2510  const ColumnDescriptor* shard_col_desc{nullptr};
2511  for (const auto col_desc : column_descs_) {
2512  ++col_idx;
2513  if (col_idx == table_desc_->shardedColumnId) {
2514  shard_col_desc = col_desc;
2515  break;
2516  }
2517  }
2518  CHECK(shard_col_desc);
2519  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2520  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2521  const auto& shard_col_ti = shard_col_desc->columnType;
2522  CHECK(shard_col_ti.is_integer() ||
2523  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2524  shard_col_ti.is_time());
2525  if (shard_col_ti.is_string()) {
2526  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2527  CHECK(payloads_ptr);
2528  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2529  }
2530 
2531  // for each replicated (alter added) columns, number of rows in a shard is
2532  // inferred from that of the sharding column, not simply evenly distributed.
2533  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2534  // Here the loop count is overloaded. For normal imports, we loop thru all
2535  // input values (rows), so the loop count is the number of input rows.
2536  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2537  // all shards, so the loop count is the number of shards.
2538  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2539  for (size_t i = 0; i < loop_count; ++i) {
2540  const size_t shard =
2541  getReplicating()
2542  ? i
2543  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2544  auto& shard_output_buffers = all_shard_import_buffers[shard];
2545 
2546  // when replicate a column, populate 'rows' to all shards only once
2547  // and its value is fetch from the first and the single row
2548  const auto row_index = getReplicating() ? 0 : i;
2549 
2550  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2551  const auto& input_buffer = import_buffers[col_idx];
2552  const auto& col_ti = input_buffer->getTypeInfo();
2553  const auto type =
2554  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2555 
2556  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2557  // count. and, bypass non-replicated column.
2558  if (getReplicating()) {
2559  if (input_buffer->get_replicate_count() > 0) {
2560  shard_output_buffers[col_idx]->set_replicate_count(
2561  shard_tds[shard]->fragmenter->getNumRows());
2562  } else {
2563  continue;
2564  }
2565  }
2566 
2567  switch (type) {
2568  case kBOOLEAN:
2569  shard_output_buffers[col_idx]->addBoolean(
2570  int_value_at(*input_buffer, row_index));
2571  break;
2572  case kTINYINT:
2573  shard_output_buffers[col_idx]->addTinyint(
2574  int_value_at(*input_buffer, row_index));
2575  break;
2576  case kSMALLINT:
2577  shard_output_buffers[col_idx]->addSmallint(
2578  int_value_at(*input_buffer, row_index));
2579  break;
2580  case kINT:
2581  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2582  break;
2583  case kBIGINT:
2584  shard_output_buffers[col_idx]->addBigint(
2585  int_value_at(*input_buffer, row_index));
2586  break;
2587  case kFLOAT:
2588  shard_output_buffers[col_idx]->addFloat(
2589  float_value_at(*input_buffer, row_index));
2590  break;
2591  case kDOUBLE:
2592  shard_output_buffers[col_idx]->addDouble(
2593  double_value_at(*input_buffer, row_index));
2594  break;
2595  case kTEXT:
2596  case kVARCHAR:
2597  case kCHAR: {
2598  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2599  shard_output_buffers[col_idx]->addString(
2600  (*input_buffer->getStringBuffer())[row_index]);
2601  break;
2602  }
2603  case kTIME:
2604  case kTIMESTAMP:
2605  case kDATE:
2606  shard_output_buffers[col_idx]->addBigint(
2607  int_value_at(*input_buffer, row_index));
2608  break;
2609  case kARRAY:
2610  if (IS_STRING(col_ti.get_subtype())) {
2611  CHECK(input_buffer->getStringArrayBuffer());
2612  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2613  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2614  shard_output_buffers[col_idx]->addStringArray(input_arr);
2615  } else {
2616  shard_output_buffers[col_idx]->addArray(
2617  (*input_buffer->getArrayBuffer())[row_index]);
2618  }
2619  break;
2620  case kPOINT:
2621  case kLINESTRING:
2622  case kPOLYGON:
2623  case kMULTIPOLYGON: {
2624  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2625  shard_output_buffers[col_idx]->addGeoString(
2626  (*input_buffer->getGeoStringBuffer())[row_index]);
2627  break;
2628  }
2629  default:
2630  CHECK(false);
2631  }
2632  }
2633  ++all_shard_row_counts[shard];
2634  // when replicating a column, row count of a shard == replicate count of the column on
2635  // the shard
2636  if (getReplicating()) {
2637  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2638  }
2639  }
2640 }
2641 
2643  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2644  size_t row_count,
2645  bool checkpoint) {
2646  if (load_callback_) {
2647  auto data_blocks = get_data_block_pointers(import_buffers);
2648  return load_callback_(import_buffers, data_blocks, row_count);
2649  }
2650  if (table_desc_->nShards) {
2651  std::vector<OneShardBuffers> all_shard_import_buffers;
2652  std::vector<size_t> all_shard_row_counts;
2653  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2654  distributeToShards(all_shard_import_buffers,
2655  all_shard_row_counts,
2656  import_buffers,
2657  row_count,
2658  shard_tables.size());
2659  bool success = true;
2660  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2661  if (!all_shard_row_counts[shard_idx]) {
2662  continue;
2663  }
2664  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2665  all_shard_row_counts[shard_idx],
2666  shard_tables[shard_idx],
2667  checkpoint);
2668  }
2669  return success;
2670  }
2671  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2672 }
2673 
2674 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2675  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2676  std::vector<DataBlockPtr> result(import_buffers.size());
2677  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2678  encoded_data_block_ptrs_futures;
2679  // make all async calls to string dictionary here and then continue execution
2680  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2681  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2682  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2683  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2684  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2685 
2686  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2687  buf_idx,
2688  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2689  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2690  return import_buffers[buf_idx]->getStringDictBuffer();
2691  })));
2692  }
2693  }
2694 
2695  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2696  DataBlockPtr p;
2697  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2698  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2699  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2700  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2701  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2702  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2703  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2704  p.stringsPtr = string_payload_ptr;
2705  } else {
2706  // This condition means we have column which is ENCODED string. We already made
2707  // Async request to gain the encoded integer values above so we should skip this
2708  // iteration and continue.
2709  continue;
2710  }
2711  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2712  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2713  p.stringsPtr = geo_payload_ptr;
2714  } else {
2715  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2716  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2717  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2718  import_buffers[buf_idx]->addDictEncodedStringArray(
2719  *import_buffers[buf_idx]->getStringArrayBuffer());
2720  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2721  } else {
2722  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2723  }
2724  }
2725  result[buf_idx] = p;
2726  }
2727 
2728  // wait for the async requests we made for string dictionary
2729  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2730  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2731  }
2732  return result;
2733 }
2734 
2736  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2737  size_t row_count,
2738  const TableDescriptor* shard_table,
2739  bool checkpoint) {
2740  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2741  // patch insert_data with new column
2742  if (this->getReplicating()) {
2743  for (const auto& import_buff : import_buffers) {
2744  insert_data_.replicate_count = import_buff->get_replicate_count();
2745  }
2746  }
2747 
2748  Fragmenter_Namespace::InsertData ins_data(insert_data_);
2749  ins_data.numRows = row_count;
2750  bool success = true;
2751 
2752  ins_data.data = get_data_block_pointers(import_buffers);
2753 
2754  for (const auto& import_buffer : import_buffers) {
2755  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2756  }
2757 
2758  // release loader_lock so that in InsertOrderFragmenter::insertDat
2759  // we can have multiple threads sort/shuffle InsertData
2760  loader_lock.unlock();
2761 
2762  {
2763  try {
2764  if (checkpoint) {
2765  shard_table->fragmenter->insertData(ins_data);
2766  } else {
2767  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2768  }
2769  } catch (std::exception& e) {
2770  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2771  success = false;
2772  }
2773  }
2774  return success;
2775 }
2776 
2777 void Loader::dropColumns(const std::vector<int>& columnIds) {
2778  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2779  if (table_desc_->nShards) {
2780  table_descs = catalog_.getPhysicalTablesDescriptors(table_desc_);
2781  }
2782  for (auto table_desc : table_descs) {
2783  table_desc->fragmenter->dropColumns(columnIds);
2784  }
2785 }
2786 
2788  insert_data_.databaseId = catalog_.getCurrentDB().dbId;
2789  insert_data_.tableId = table_desc_->tableId;
2790  for (auto cd : column_descs_) {
2791  insert_data_.columnIds.push_back(cd->columnId);
2792  if (cd->columnType.get_compression() == kENCODING_DICT) {
2793  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2794  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2795  CHECK(dd);
2796  dict_map_[cd->columnId] = dd->stringDict.get();
2797  }
2798  }
2799  insert_data_.numRows = 0;
2800 }
2801 
2803  detect_row_delimiter();
2804  split_raw_data();
2805  find_best_sqltypes_and_headers();
2806 }
2807 
2809  const bool decompressed) {
2810  if (!p_file) {
2811  p_file = fopen(file_path.c_str(), "rb");
2812  }
2813  if (!p_file) {
2814  throw std::runtime_error("failed to open file '" + file_path +
2815  "': " + strerror(errno));
2816  }
2817 
2818  // somehow clang does not support ext/stdio_filebuf.h, so
2819  // need to diy readline with customized copy_params.line_delim...
2820  std::string line;
2821  line.reserve(1 * 1024 * 1024);
2822  auto end_time = std::chrono::steady_clock::now() +
2823  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2824  try {
2825  while (!feof(p_file)) {
2826  int c;
2827  size_t n = 0;
2828  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2829  if (n++ >= line.capacity()) {
2830  break;
2831  }
2832  line += c;
2833  }
2834  if (0 == n) {
2835  break;
2836  }
2837  // remember the first line, which is possibly a header line, to
2838  // ignore identical header line(s) in 2nd+ files of a archive;
2839  // otherwise, 2nd+ header may be mistaken as an all-string row
2840  // and so be final column types.
2841  if (line1.empty()) {
2842  line1 = line;
2843  } else if (line == line1) {
2844  line.clear();
2845  continue;
2846  }
2847 
2848  raw_data += line;
2849  raw_data += copy_params.line_delim;
2850  line.clear();
2852  if (std::chrono::steady_clock::now() > end_time) {
2853  if (import_status.rows_completed > 10000) {
2854  break;
2855  }
2856  }
2857  }
2858  } catch (std::exception& e) {
2859  }
2860 
2861  // as if load truncated
2863  load_failed = true;
2864 
2865  fclose(p_file);
2866  p_file = nullptr;
2867  return import_status;
2868 }
2869 
2871  // this becomes analogous to Importer::import()
2873 }
2874 
2876  if (copy_params.delimiter == '\0') {
2877  copy_params.delimiter = ',';
2878  if (boost::filesystem::extension(file_path) == ".tsv") {
2879  copy_params.delimiter = '\t';
2880  }
2881  }
2882 }
2883 
2885  const char* buf = raw_data.c_str();
2886  const char* buf_end = buf + raw_data.size();
2887  bool try_single_thread = false;
2888  for (const char* p = buf; p < buf_end; p++) {
2889  std::vector<std::string> row;
2890  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2892  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2893  raw_rows.push_back(row);
2894  if (try_single_thread) {
2895  break;
2896  }
2897  }
2898  if (try_single_thread) {
2899  copy_params.threads = 1;
2900  raw_rows.clear();
2901  for (const char* p = buf; p < buf_end; p++) {
2902  std::vector<std::string> row;
2903  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2905  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2906  raw_rows.push_back(row);
2907  }
2908  }
2909 }
2910 
2911 template <class T>
2912 bool try_cast(const std::string& str) {
2913  try {
2914  boost::lexical_cast<T>(str);
2915  } catch (const boost::bad_lexical_cast& e) {
2916  return false;
2917  }
2918  return true;
2919 }
2920 
2921 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2922  std::tm tm_struct;
2923  char* buf;
2924  for (auto format : formats) {
2925  buf = strptime(str, format.c_str(), &tm_struct);
2926  if (buf) {
2927  return buf;
2928  }
2929  }
2930  return nullptr;
2931 }
2932 
2933 SQLTypes Detector::detect_sqltype(const std::string& str) {
2934  SQLTypes type = kTEXT;
2935  if (try_cast<double>(str)) {
2936  type = kDOUBLE;
2937  /*if (try_cast<bool>(str)) {
2938  type = kBOOLEAN;
2939  }*/
2940  if (try_cast<int16_t>(str)) {
2941  type = kSMALLINT;
2942  } else if (try_cast<int32_t>(str)) {
2943  type = kINT;
2944  } else if (try_cast<int64_t>(str)) {
2945  type = kBIGINT;
2946  } else if (try_cast<float>(str)) {
2947  type = kFLOAT;
2948  }
2949  }
2950 
2951  // check for geo types
2952  if (type == kTEXT) {
2953  // convert to upper case
2954  std::string str_upper_case = str;
2955  std::transform(
2956  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
2957 
2958  // then test for leading words
2959  if (str_upper_case.find("POINT") == 0) {
2960  type = kPOINT;
2961  } else if (str_upper_case.find("LINESTRING") == 0) {
2962  type = kLINESTRING;
2963  } else if (str_upper_case.find("POLYGON") == 0) {
2964  if (PROMOTE_POLYGON_TO_MULTIPOLYGON) {
2965  type = kMULTIPOLYGON;
2966  } else {
2967  type = kPOLYGON;
2968  }
2969  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
2970  type = kMULTIPOLYGON;
2971  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
2972  std::string::npos &&
2973  (str_upper_case.size() % 2) == 0) {
2974  // could be a WKB hex blob
2975  // we can't handle these yet
2976  // leave as TEXT for now
2977  // deliberate return here, as otherwise this would get matched as TIME
2978  // @TODO
2979  // implement WKB import
2980  return type;
2981  }
2982  }
2983 
2984  // check for time types
2985  if (type == kTEXT) {
2986  // @TODO
2987  // make these tests more robust so they don't match stuff they should not
2988  char* buf;
2989  buf = try_strptimes(str.c_str(), {"%Y-%m-%d", "%m/%d/%Y", "%d-%b-%y", "%d/%b/%Y"});
2990  if (buf) {
2991  type = kDATE;
2992  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
2993  buf++;
2994  }
2995  }
2996  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
2997  {"%T %z", "%T", "%H%M%S", "%R"});
2998  if (buf) {
2999  if (type == kDATE) {
3000  type = kTIMESTAMP;
3001  } else {
3002  type = kTIME;
3003  }
3004  }
3005  }
3006 
3007  return type;
3008 }
3009 
3010 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3011  std::vector<SQLTypes> types(row.size());
3012  for (size_t i = 0; i < row.size(); i++) {
3013  types[i] = detect_sqltype(row[i]);
3014  }
3015  return types;
3016 }
3017 
3019  static std::array<int, kSQLTYPE_LAST> typeorder;
3020  typeorder[kCHAR] = 0;
3021  typeorder[kBOOLEAN] = 2;
3022  typeorder[kSMALLINT] = 3;
3023  typeorder[kINT] = 4;
3024  typeorder[kBIGINT] = 5;
3025  typeorder[kFLOAT] = 6;
3026  typeorder[kDOUBLE] = 7;
3027  typeorder[kTIMESTAMP] = 8;
3028  typeorder[kTIME] = 9;
3029  typeorder[kDATE] = 10;
3030  typeorder[kPOINT] = 11;
3031  typeorder[kLINESTRING] = 11;
3032  typeorder[kPOLYGON] = 11;
3033  typeorder[kMULTIPOLYGON] = 11;
3034  typeorder[kTEXT] = 12;
3035 
3036  // note: b < a instead of a < b because the map is ordered most to least restrictive
3037  return typeorder[b] < typeorder[a];
3038 }
3039 
3041  best_sqltypes = find_best_sqltypes(raw_rows.begin() + 1, raw_rows.end(), copy_params);
3042  best_encodings =
3043  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3044  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3045  switch (copy_params.has_header) {
3047  has_headers = detect_headers(head_types, best_sqltypes);
3048  if (has_headers) {
3050  } else {
3052  }
3053  break;
3055  has_headers = false;
3056  break;
3058  has_headers = true;
3059  break;
3060  }
3061 }
3062 
3064  best_sqltypes = find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3065 }
3066 
3067 std::vector<SQLTypes> Detector::find_best_sqltypes(
3068  const std::vector<std::vector<std::string>>& raw_rows,
3069  const CopyParams& copy_params) {
3070  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3071 }
3072 
3073 std::vector<SQLTypes> Detector::find_best_sqltypes(
3074  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3075  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3076  const CopyParams& copy_params) {
3077  if (raw_rows.size() < 1) {
3078  throw std::runtime_error("No rows found in: " +
3079  boost::filesystem::basename(file_path));
3080  }
3081  auto end_time = std::chrono::steady_clock::now() + timeout;
3082  size_t num_cols = raw_rows.front().size();
3083  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3084  std::vector<size_t> non_null_col_counts(num_cols, 0);
3085  for (auto row = row_begin; row != row_end; row++) {
3086  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3087  best_types.push_back(kCHAR);
3088  non_null_col_counts.push_back(0);
3089  }
3090  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3091  // do not count nulls
3092  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3093  continue;
3094  }
3095  SQLTypes t = detect_sqltype(row->at(col_idx));
3096  non_null_col_counts[col_idx]++;
3097  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3098  best_types[col_idx] = t;
3099  }
3100  }
3101  if (std::chrono::steady_clock::now() > end_time) {
3102  break;
3103  }
3104  }
3105  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3106  // if we don't have any non-null values for this column make it text to be
3107  // safe b/c that is least restrictive type
3108  if (non_null_col_counts[col_idx] == 0) {
3109  best_types[col_idx] = kTEXT;
3110  }
3111  }
3112 
3113  return best_types;
3114 }
3115 
3116 std::vector<EncodingType> Detector::find_best_encodings(
3117  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3118  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3119  const std::vector<SQLTypes>& best_types) {
3120  if (raw_rows.size() < 1) {
3121  throw std::runtime_error("No rows found in: " +
3122  boost::filesystem::basename(file_path));
3123  }
3124  size_t num_cols = best_types.size();
3125  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3126  std::vector<size_t> num_rows_per_col(num_cols, 1);
3127  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3128  for (auto row = row_begin; row != row_end; row++) {
3129  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3130  if (IS_STRING(best_types[col_idx])) {
3131  count_set[col_idx].insert(row->at(col_idx));
3132  num_rows_per_col[col_idx]++;
3133  }
3134  }
3135  }
3136  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3137  if (IS_STRING(best_types[col_idx])) {
3138  float uniqueRatio =
3139  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3140  if (uniqueRatio < 0.75) {
3141  best_encodes[col_idx] = kENCODING_DICT;
3142  }
3143  }
3144  }
3145  return best_encodes;
3146 }
3147 
3148 // detect_headers returns true if:
3149 // - all elements of the first argument are kTEXT
3150 // - there is at least one instance where tail_types is more restrictive than head_types
3151 // (ie, not kTEXT)
3152 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3153  const std::vector<SQLTypes>& tail_types) {
3154  if (head_types.size() != tail_types.size()) {
3155  return false;
3156  }
3157  bool has_headers = false;
3158  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3159  if (head_types[col_idx] != kTEXT) {
3160  return false;
3161  }
3162  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3163  }
3164  return has_headers;
3165 }
3166 
3167 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3168  n = std::min(n, raw_rows.size());
3169  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3170  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3171  raw_rows.begin() + n);
3172  return sample_rows;
3173 }
3174 
3175 std::vector<std::string> Detector::get_headers() {
3176  std::vector<std::string> headers(best_sqltypes.size());
3177  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3178  if (has_headers && i < raw_rows[0].size()) {
3179  headers[i] = raw_rows[0][i];
3180  } else {
3181  headers[i] = "column_" + std::to_string(i + 1);
3182  }
3183  }
3184  return headers;
3185 }
3186 
3187 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3188  size_t row_count) {
3189  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3190  load_failed = true;
3191  }
3192 }
3193 
3194 void Importer::checkpoint(const int32_t start_epoch) {
3195  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3196  if (load_failed) {
3197  // rollback to starting epoch - undo all the added records
3198  loader->setTableEpoch(start_epoch);
3199  } else {
3200  loader->checkpoint();
3201  }
3202  }
3203 
3204  if (loader->getTableDesc()->persistenceLevel ==
3205  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3206  auto ms = measure<>::execution([&]() {
3207  if (!load_failed) {
3208  for (auto& p : import_buffers_vec[0]) {
3209  if (!p->stringDictCheckpoint()) {
3210  LOG(ERROR) << "Checkpointing Dictionary for Column "
3211  << p->getColumnDesc()->columnName << " failed.";
3212  load_failed = true;
3213  break;
3214  }
3215  }
3216  }
3217  });
3218  if (DEBUG_TIMING) {
3219  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3220  << std::endl;
3221  }
3222  }
3223 }
3224 
3226  // in generalized importing scheme, reaching here file_path may
3227  // contain a file path, a url or a wildcard of file paths.
3228  // see CopyTableStmt::execute.
3229  auto file_paths = mapd_glob(file_path);
3230  if (file_paths.size() == 0) {
3231  file_paths.push_back(file_path);
3232  }
3233 
3234  // sum up sizes of all local files -- only for local files. if
3235  // file_path is a s3 url, sizes will be obtained via S3Archive.
3236  for (const auto& file_path : file_paths) {
3238  }
3239 
3240 #ifdef ENABLE_IMPORT_PARQUET
3241  // s3 parquet goes different route because the files do not use libarchive
3242  // but parquet api, and they need to landed like .7z files.
3243  //
3244  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3245  // because for example spark sql users may specify a output url w/o file
3246  // extension like this:
3247  // df.write
3248  // .mode("overwrite")
3249  // .parquet("s3://bucket/folder/parquet/mydata")
3250  // without the parameter, it means plain or compressed csv files.
3251  // note: .ORC and AVRO files should follow a similar path to Parquet?
3252  if (copy_params.file_type == FileType::PARQUET) {
3253  import_parquet(file_paths);
3254  } else
3255 #endif
3256  {
3257  import_compressed(file_paths);
3258  }
3259  return import_status;
3260 }
3261 
3262 #ifdef ENABLE_IMPORT_PARQUET
3263 inline auto open_parquet_table(const std::string& file_path,
3264  std::shared_ptr<arrow::io::ReadableFile>& infile,
3265  std::unique_ptr<parquet::arrow::FileReader>& reader,
3266  std::shared_ptr<arrow::Table>& table) {
3267  using namespace parquet::arrow;
3268  auto file_result = arrow::io::ReadableFile::Open(file_path);
3269  PARQUET_THROW_NOT_OK(file_result.status());
3270  infile = file_result.ValueOrDie();
3271 
3272  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3273  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3274  const auto num_row_groups = reader->num_row_groups();
3275  const auto num_columns = table->num_columns();
3276  const auto num_rows = table->num_rows();
3277  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3278  << " columns in " << num_row_groups << " groups.";
3279  return std::make_tuple(num_row_groups, num_columns, num_rows);
3280 }
3281 
3282 void Detector::import_local_parquet(const std::string& file_path) {
3283  std::shared_ptr<arrow::io::ReadableFile> infile;
3284  std::unique_ptr<parquet::arrow::FileReader> reader;
3285  std::shared_ptr<arrow::Table> table;
3286  int num_row_groups, num_columns;
3287  int64_t num_rows;
3288  std::tie(num_row_groups, num_columns, num_rows) =
3289  open_parquet_table(file_path, infile, reader, table);
3290  // make up header line if not yet
3291  if (0 == raw_data.size()) {
3293  copy_params.line_delim = '\n';
3294  copy_params.delimiter = ',';
3295  // must quote values to skip any embedded delimiter
3296  copy_params.quoted = true;
3297  copy_params.quote = '"';
3298  copy_params.escape = '"';
3299  for (int c = 0; c < num_columns; ++c) {
3300  if (c) {
3301  raw_data += copy_params.delimiter;
3302  }
3303  raw_data += table->ColumnNames().at(c);
3304  }
3305  raw_data += copy_params.line_delim;
3306  }
3307  // make up raw data... rowwize...
3308  const ColumnDescriptor cd;
3309  for (int g = 0; g < num_row_groups; ++g) {
3310  // data is columnwise
3311  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3312  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3313  arrays.resize(num_columns);
3314  for (int c = 0; c < num_columns; ++c) {
3315  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3316  for (auto chunk : arrays[c]->chunks()) {
3317  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3318  }
3319  }
3320  for (int r = 0; r < num_rows; ++r) {
3321  for (int c = 0; c < num_columns; ++c) {
3322  std::vector<std::string> buffer;
3323  for (auto chunk : arrays[c]->chunks()) {
3324  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3325  if (c) {
3326  raw_data += copy_params.delimiter;
3327  }
3328  if (!chunk->IsNull(r)) {
3329  raw_data += copy_params.quote;
3330  raw_data += boost::replace_all_copy(
3331  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3332  raw_data += copy_params.quote;
3333  }
3334  }
3335  }
3336  raw_data += copy_params.line_delim;
3337  if (++import_status.rows_completed >= 10000) {
3338  // as if load truncated
3340  load_failed = true;
3341  return;
3342  }
3343  }
3344  }
3345 }
3346 
3347 template <typename DATA_TYPE>
3348 auto TypedImportBuffer::del_values(std::vector<DATA_TYPE>& buffer,
3349  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
3350  const auto old_size = buffer.size();
3351  // erase backward to minimize memory movement overhead
3352  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3353  ++rit) {
3354  buffer.erase(buffer.begin() + *rit);
3355  }
3356  return std::make_tuple(old_size, buffer.size());
3357 }
3358 
3360  BadRowsTracker* const bad_rows_tracker) {
3361  switch (type) {
3362  case kBOOLEAN:
3363  return del_values(*bool_buffer_, bad_rows_tracker);
3364  case kTINYINT:
3365  return del_values(*tinyint_buffer_, bad_rows_tracker);
3366  case kSMALLINT:
3367  return del_values(*smallint_buffer_, bad_rows_tracker);
3368  case kINT:
3369  return del_values(*int_buffer_, bad_rows_tracker);
3370  case kBIGINT:
3371  case kNUMERIC:
3372  case kDECIMAL:
3373  case kTIME:
3374  case kTIMESTAMP:
3375  case kDATE:
3376  return del_values(*bigint_buffer_, bad_rows_tracker);
3377  case kFLOAT:
3378  return del_values(*float_buffer_, bad_rows_tracker);
3379  case kDOUBLE:
3380  return del_values(*double_buffer_, bad_rows_tracker);
3381  case kTEXT:
3382  case kVARCHAR:
3383  case kCHAR:
3384  return del_values(*string_buffer_, bad_rows_tracker);
3385  case kPOINT:
3386  case kLINESTRING:
3387  case kPOLYGON:
3388  case kMULTIPOLYGON:
3389  return del_values(*geo_string_buffer_, bad_rows_tracker);
3390  case kARRAY:
3391  return del_values(*array_buffer_, bad_rows_tracker);
3392  default:
3393  throw std::runtime_error("Invalid Type");
3394  }
3395 }
3396 
3397 void Importer::import_local_parquet(const std::string& file_path) {
3398  std::shared_ptr<arrow::io::ReadableFile> infile;
3399  std::unique_ptr<parquet::arrow::FileReader> reader;
3400  std::shared_ptr<arrow::Table> table;
3401  int num_row_groups, num_columns;
3402  int64_t nrow_in_file;
3403  std::tie(num_row_groups, num_columns, nrow_in_file) =
3404  open_parquet_table(file_path, infile, reader, table);
3405  // column_list has no $deleted
3406  const auto& column_list = get_column_descs();
3407  // for now geo columns expect a wkt string
3408  std::vector<const ColumnDescriptor*> cds;
3409  int num_physical_cols = 0;
3410  for (auto& cd : column_list) {
3411  cds.push_back(cd);
3412  num_physical_cols += cd->columnType.get_physical_cols();
3413  }
3414  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3415  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3416  std::to_string(num_columns) + " columns in file vs " +
3417  std::to_string(column_list.size() - num_physical_cols) +
3418  " columns in table.");
3419  // slice each group to import slower columns faster, eg. geo or string
3421  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3422  // init row estimate for this file
3423  const auto filesize = get_filesize(file_path);
3424  size_t nrow_completed{0};
3425  file_offsets.push_back(0);
3426  // map logic column index to physical column index
3427  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3428  int physical_col_idx = 0;
3429  for (int i = 0; i < logic_col_idx; ++i) {
3430  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3431  }
3432  return physical_col_idx;
3433  };
3434  // load a file = nested iteration of row groups, row slices and logical columns
3435  auto ms_load_a_file = measure<>::execution([&]() {
3436  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3437  // a sliced row group will be handled like a (logic) parquet file, with
3438  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3439  import_buffers_vec.resize(num_slices);
3440  for (int slice = 0; slice < num_slices; slice++) {
3441  import_buffers_vec[slice].clear();
3442  for (const auto cd : cds) {
3443  import_buffers_vec[slice].emplace_back(
3444  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3445  }
3446  }
3447  /*
3448  * A caveat here is: Parquet files or arrow data is imported column wise.
3449  * Unlike importing row-wise csv files, a error on any row of any column
3450  * forces to give up entire row group of all columns, unless there is a
3451  * sophisticated method to trace erroneous rows in individual columns so
3452  * that we can union bad rows and drop them from corresponding
3453  * import_buffers_vec; otherwise, we may exceed maximum number of
3454  * truncated rows easily even with very sparse errors in the files.
3455  */
3456  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3457  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3458  auto& bad_rows_tracker = bad_rows_trackers[slice];
3459  bad_rows_tracker.file_name = file_path;
3460  bad_rows_tracker.row_group = slice;
3461  bad_rows_tracker.importer = this;
3462  }
3463  // process arrow arrays to import buffers
3464  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3465  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3466  const auto cd = cds[physical_col_idx];
3467  std::shared_ptr<arrow::ChunkedArray> array;
3468  PARQUET_THROW_NOT_OK(
3469  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3470  const size_t array_size = array->length();
3471  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3472  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3473  for (int slice = 0; slice < num_slices; ++slice) {
3474  thread_controller.startThread([&, slice] {
3475  const auto slice_offset = slice % num_slices;
3476  ArraySliceRange slice_range(
3477  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3478  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3479  auto& bad_rows_tracker = bad_rows_trackers[slice];
3480  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3481  import_buffer->import_buffers = &import_buffers_vec[slice];
3482  import_buffer->col_idx = physical_col_idx + 1;
3483  for (auto chunk : array->chunks()) {
3484  import_buffer->add_arrow_values(
3485  cd, *chunk, false, slice_range, &bad_rows_tracker);
3486  }
3487  });
3488  }
3489  thread_controller.finish();
3490  }
3491  std::vector<size_t> nrow_in_slice_raw(num_slices);
3492  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3493  // trim bad rows from import buffers
3494  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3495  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3496  const auto cd = cds[physical_col_idx];
3497  for (int slice = 0; slice < num_slices; ++slice) {
3498  auto& bad_rows_tracker = bad_rows_trackers[slice];
3499  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3500  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3501  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3502  }
3503  }
3504  // flush slices of this row group to chunks
3505  for (int slice = 0; slice < num_slices; ++slice) {
3506  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3507  }
3508  // update import stats
3509  const auto nrow_original =
3510  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3511  const auto nrow_imported =
3512  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3513  nrow_in_slice_successfully_loaded.end(),
3514  0);
3515  const auto nrow_dropped = nrow_original - nrow_imported;
3516  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3517  << " rows, drop " << nrow_dropped << " rows.";
3518  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3519  import_status.rows_completed += nrow_imported;
3520  import_status.rows_rejected += nrow_dropped;
3523  load_failed = true;
3524  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3525  << ") rows rejected exceeded. Halting load.";
3526  }
3527  // row estimate
3528  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3529  nrow_completed += nrow_imported;
3530  file_offsets.back() =
3531  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3532  // sum up current total file offsets
3533  const auto total_file_offset =
3534  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3535  // estimate number of rows per current total file offset
3536  if (total_file_offset) {
3538  (float)total_file_size / total_file_offset * import_status.rows_completed;
3539  VLOG(3) << "rows_completed " << import_status.rows_completed
3540  << ", rows_estimated " << import_status.rows_estimated
3541  << ", total_file_size " << total_file_size << ", total_file_offset "
3542  << total_file_offset;
3543  }
3544  }
3545  });
3546  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3547  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3548 }
3549 
3550 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3551  auto importer = dynamic_cast<Importer*>(this);
3552  auto start_epoch = importer ? importer->getLoader()->getTableEpoch() : 0;
3553  try {
3554  std::exception_ptr teptr;
3555  // file_paths may contain one local file path, a list of local file paths
3556  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3557  for (auto const& file_path : file_paths) {
3558  std::map<int, std::string> url_parts;
3559  Archive::parse_url(file_path, url_parts);
3560 
3561  // for a s3 url we need to know the obj keys that it comprises
3562  std::vector<std::string> objkeys;
3563  std::unique_ptr<S3ParquetArchive> us3arch;
3564  if ("s3" == url_parts[2]) {
3565 #ifdef HAVE_AWS_S3
3566  us3arch.reset(new S3ParquetArchive(file_path,
3572  us3arch->init_for_read();
3573  total_file_size += us3arch->get_total_file_size();
3574  objkeys = us3arch->get_objkeys();
3575 #else
3576  throw std::runtime_error("AWS S3 support not available");
3577 #endif // HAVE_AWS_S3
3578  } else {
3579  objkeys.emplace_back(file_path);
3580  }
3581 
3582  // for each obj key of a s3 url we need to land it before
3583  // importing it like doing with a 'local file'.
3584  for (auto const& objkey : objkeys) {
3585  try {
3586  auto file_path =
3587  us3arch
3588  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3589  : objkey;
3590  import_local_parquet(file_path);
3591  if (us3arch) {
3592  us3arch->vacuum(objkey);
3593  }
3594  } catch (...) {
3595  if (us3arch) {
3596  us3arch->vacuum(objkey);
3597  }
3598  throw;
3599  }
3601  break;
3602  }
3603  }
3604  }
3605  // rethrow any exception happened herebefore
3606  if (teptr) {
3607  std::rethrow_exception(teptr);
3608  }
3609  } catch (...) {
3610  load_failed = true;
3612  throw;
3613  }
3614  }
3615 
3616  if (importer) {
3617  importer->checkpoint(start_epoch);
3618  }
3619 }
3620 #endif // ENABLE_IMPORT_PARQUET
3621 
3622 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3623  // a new requirement is to have one single input stream into
3624  // Importer::importDelimited, so need to move pipe related
3625  // stuff to the outmost block.
3626  int fd[2];
3627  if (pipe(fd) < 0) {
3628  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3629  }
3630  signal(SIGPIPE, SIG_IGN);
3631 
3632  std::exception_ptr teptr;
3633  // create a thread to read uncompressed byte stream out of pipe and
3634  // feed into importDelimited()
3635  ImportStatus ret;
3636  auto th_pipe_reader = std::thread([&]() {
3637  try {
3638  // importDelimited will read from FILE* p_file
3639  if (0 == (p_file = fdopen(fd[0], "r"))) {
3640  throw std::runtime_error(std::string("failed to open a pipe: ") +
3641  strerror(errno));
3642  }
3643 
3644  // in future, depending on data types of this uncompressed stream
3645  // it can be feed into other function such like importParquet, etc
3646  ret = importDelimited(file_path, true);
3647  } catch (...) {
3648  if (!teptr) { // no replace
3649  teptr = std::current_exception();
3650  }
3651  }
3652 
3653  if (p_file) {
3654  fclose(p_file);
3655  }
3656  p_file = 0;
3657  });
3658 
3659  // create a thread to iterate all files (in all archives) and
3660  // forward the uncompressed byte stream to fd[1] which is
3661  // then feed into importDelimited, importParquet, and etc.
3662  auto th_pipe_writer = std::thread([&]() {
3663  std::unique_ptr<S3Archive> us3arch;
3664  bool stop = false;
3665  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3666  try {
3667  auto file_path = file_paths[fi];
3668  std::unique_ptr<Archive> uarch;
3669  std::map<int, std::string> url_parts;
3670  Archive::parse_url(file_path, url_parts);
3671  const std::string S3_objkey_url_scheme = "s3ok";
3672  if ("file" == url_parts[2] || "" == url_parts[2]) {
3673  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3674  } else if ("s3" == url_parts[2]) {
3675 #ifdef HAVE_AWS_S3
3676  // new a S3Archive with a shared s3client.
3677  // should be safe b/c no wildcard with s3 url
3678  us3arch.reset(new S3Archive(file_path,
3684  us3arch->init_for_read();
3685  total_file_size += us3arch->get_total_file_size();
3686  // not land all files here but one by one in following iterations
3687  for (const auto& objkey : us3arch->get_objkeys()) {
3688  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3689  }
3690  continue;
3691 #else
3692  throw std::runtime_error("AWS S3 support not available");
3693 #endif // HAVE_AWS_S3
3694  } else if (S3_objkey_url_scheme == url_parts[2]) {
3695 #ifdef HAVE_AWS_S3
3696  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3697  auto file_path =
3698  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3699  if (0 == file_path.size()) {
3700  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3701  }
3702  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3703  // file not removed until file closed
3704  us3arch->vacuum(objkey);
3705 #else
3706  throw std::runtime_error("AWS S3 support not available");
3707 #endif // HAVE_AWS_S3
3708  }
3709 #if 0 // TODO(ppan): implement and enable any other archive class
3710  else
3711  if ("hdfs" == url_parts[2])
3712  uarch.reset(new HdfsArchive(file_path));
3713 #endif
3714  else {
3715  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3716  }
3717 
3718  // init the archive for read
3719  auto& arch = *uarch;
3720 
3721  // coming here, the archive of url should be ready to be read, unarchived
3722  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3723  const void* buf;
3724  size_t size;
3725  bool just_saw_archive_header;
3726  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3727  bool first_text_header_skipped = false;
3728  // start reading uncompressed bytes of this archive from libarchive
3729  // note! this archive may contain more than one files!
3730  file_offsets.push_back(0);
3731  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3732  bool insert_line_delim_after_this_file = false;
3733  while (!stop) {
3734  int64_t offset{-1};
3735  auto ok = arch.read_data_block(&buf, &size, &offset);
3736  // can't use (uncompressed) size, so track (max) file offset.
3737  // also we want to capture offset even on e.o.f.
3738  if (offset > 0) {
3739  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3740  file_offsets.back() = offset;
3741  }
3742  if (!ok) {
3743  break;
3744  }
3745  // one subtle point here is now we concatenate all files
3746  // to a single FILE stream with which we call importDelimited
3747  // only once. this would make it misunderstand that only one
3748  // header line is with this 'single' stream, while actually
3749  // we may have one header line for each of the files.
3750  // so we need to skip header lines here instead in importDelimited.
3751  const char* buf2 = (const char*)buf;
3752  int size2 = size;
3754  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3755  while (size2-- > 0) {
3756  if (*buf2++ == copy_params.line_delim) {
3757  break;
3758  }
3759  }
3760  if (size2 <= 0) {
3761  LOG(WARNING) << "No line delimiter in block." << std::endl;
3762  } else {
3763  just_saw_archive_header = false;
3764  first_text_header_skipped = true;
3765  }
3766  }
3767  // In very rare occasions the write pipe somehow operates in a mode similar to
3768  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3769  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3770  // loop reading till no bytes left, otherwise the annoying `failed to write
3771  // pipe: Success`...
3772  if (size2 > 0) {
3773  int nremaining = size2;
3774  while (nremaining > 0) {
3775  // try to write the entire remainder of the buffer to the pipe
3776  int nwritten = write(fd[1], buf2, nremaining);
3777  // how did we do?
3778  if (nwritten < 0) {
3779  // something bad happened
3780  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3781  // ignore these, assume nothing written, try again
3782  nwritten = 0;
3783  } else {
3784  // a real error
3785  throw std::runtime_error(
3786  std::string("failed or interrupted write to pipe: ") +
3787  strerror(errno));
3788  }
3789  } else if (nwritten == nremaining) {
3790  // we wrote everything; we're done
3791  break;
3792  }
3793  // only wrote some (or nothing), try again
3794  nremaining -= nwritten;
3795  buf2 += nwritten;
3796  // no exception when too many rejected
3797  // @simon.eves how would this get set? from the other thread? mutex
3798  // needed?
3800  stop = true;
3801  break;
3802  }
3803  }
3804  // check that this file (buf for size) ended with a line delim
3805  if (size > 0) {
3806  const char* plast = static_cast<const char*>(buf) + (size - 1);
3807  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3808  }
3809  }
3810  }
3811  // if that file didn't end with a line delim, we insert one here to terminate
3812  // that file's stream use a loop for the same reason as above
3813  if (insert_line_delim_after_this_file) {
3814  while (true) {
3815  // write the delim char to the pipe
3816  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3817  // how did we do?
3818  if (nwritten < 0) {
3819  // something bad happened
3820  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3821  // ignore these, assume nothing written, try again
3822  nwritten = 0;
3823  } else {
3824  // a real error
3825  throw std::runtime_error(
3826  std::string("failed or interrupted write to pipe: ") +
3827  strerror(errno));
3828  }
3829  } else if (nwritten == 1) {
3830  // we wrote it; we're done
3831  break;
3832  }
3833  }
3834  }
3835  }
3836  } catch (...) {
3837  // when import is aborted because too many data errors or because end of a
3838  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3839  // suppressed.
3840  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3842  break;
3843  }
3844  if (import_status.rows_completed > 0) {
3845  if (nullptr != dynamic_cast<Detector*>(this)) {
3846  break;
3847  }
3848  }
3849  if (!teptr) { // no replace
3850  teptr = std::current_exception();
3851  }
3852  break;
3853  }
3854  }
3855  // close writer end
3856  close(fd[1]);
3857  });
3858 
3859  th_pipe_reader.join();
3860  th_pipe_writer.join();
3861 
3862  // rethrow any exception happened herebefore
3863  if (teptr) {
3864  std::rethrow_exception(teptr);
3865  }
3866 }
3867 
3870 }
3871 
3872 ImportStatus Importer::importDelimited(const std::string& file_path,
3873  const bool decompressed) {
3874  bool load_truncated = false;
3876 
3877  if (!p_file) {
3878  p_file = fopen(file_path.c_str(), "rb");
3879  }
3880  if (!p_file) {
3881  throw std::runtime_error("failed to open file '" + file_path +
3882  "': " + strerror(errno));
3883  }
3884 
3885  if (!decompressed) {
3886  (void)fseek(p_file, 0, SEEK_END);
3887  file_size = ftell(p_file);
3888  }
3889 
3890  if (copy_params.threads == 0) {
3891  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3892  } else {
3893  max_threads = static_cast<size_t>(copy_params.threads);
3894  }
3895 
3896  // deal with small files
3897  size_t alloc_size = copy_params.buffer_size;
3898  if (!decompressed && file_size < alloc_size) {
3899  alloc_size = file_size;
3900  }
3901 
3902  for (size_t i = 0; i < max_threads; i++) {
3903  import_buffers_vec.emplace_back();
3904  for (const auto cd : loader->get_column_descs()) {
3905  import_buffers_vec[i].emplace_back(
3906  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
3907  }
3908  }
3909 
3910  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3911  size_t current_pos = 0;
3912  size_t end_pos;
3913  size_t begin_pos = 0;
3914 
3915  (void)fseek(p_file, current_pos, SEEK_SET);
3916  size_t size =
3917  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3918 
3919  // make render group analyzers for each poly column
3920  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3922  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3923  loader->getTableDesc()->tableId, false, false, false);
3924  for (auto cd : columnDescriptors) {
3925  SQLTypes ct = cd->columnType.get_type();
3926  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3927  auto rga = std::make_shared<RenderGroupAnalyzer>();
3928  rga->seedFromExistingTableContents(loader, cd->columnName);
3929  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3930  }
3931  }
3932  }
3933 
3934  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3935  loader->getTableDesc()->tableId};
3936  auto start_epoch = loader->getTableEpoch();
3937  {
3938  std::list<std::future<ImportStatus>> threads;
3939 
3940  // use a stack to track thread_ids which must not overlap among threads
3941  // because thread_id is used to index import_buffers_vec[]
3942  std::stack<size_t> stack_thread_ids;
3943  for (size_t i = 0; i < max_threads; i++) {
3944  stack_thread_ids.push(i);
3945  }
3946  // added for true row index on error
3947  size_t first_row_index_this_buffer = 0;
3948 
3949  while (size > 0) {
3950  unsigned int num_rows_this_buffer = 0;
3951  CHECK(scratch_buffer);
3952  end_pos = delimited_parser::find_end(
3953  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
3954 
3955  // unput residual
3956  int nresidual = size - end_pos;
3957  std::unique_ptr<char[]> unbuf;
3958  if (nresidual > 0) {
3959  unbuf = std::make_unique<char[]>(nresidual);
3960  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3961  }
3962 
3963  // get a thread_id not in use
3964  auto thread_id = stack_thread_ids.top();
3965  stack_thread_ids.pop();
3966  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3967 
3968  threads.push_back(std::async(std::launch::async,
3970  thread_id,
3971  this,
3972  std::move(scratch_buffer),
3973  begin_pos,
3974  end_pos,
3975  end_pos,
3976  columnIdToRenderGroupAnalyzerMap,
3977  first_row_index_this_buffer));
3978 
3979  first_row_index_this_buffer += num_rows_this_buffer;
3980 
3981  current_pos += end_pos;
3982  scratch_buffer = std::make_unique<char[]>(alloc_size);
3983  CHECK(scratch_buffer);
3984  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3985  size = nresidual + fread(scratch_buffer.get() + nresidual,
3986  1,
3987  copy_params.buffer_size - nresidual,
3988  p_file);
3989 
3990  begin_pos = 0;
3991 
3992  while (threads.size() > 0) {
3993  int nready = 0;
3994  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3995  it != threads.end();) {
3996  auto& p = *it;
3997  std::chrono::milliseconds span(
3998  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3999  if (p.wait_for(span) == std::future_status::ready) {
4000  auto ret_import_status = p.get();
4001  import_status += ret_import_status;
4002  // sum up current total file offsets
4003  size_t total_file_offset{0};
4004  if (decompressed) {
4005  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4006  for (const auto file_offset : file_offsets) {
4007  total_file_offset += file_offset;
4008  }
4009  }
4010  // estimate number of rows per current total file offset
4011  if (decompressed ? total_file_offset : current_pos) {
4013  (decompressed ? (float)total_file_size / total_file_offset
4014  : (float)file_size / current_pos) *
4016  }
4017  VLOG(3) << "rows_completed " << import_status.rows_completed
4018  << ", rows_estimated " << import_status.rows_estimated
4019  << ", total_file_size " << total_file_size << ", total_file_offset "
4020  << total_file_offset;
4022  // recall thread_id for reuse
4023  stack_thread_ids.push(ret_import_status.thread_id);
4024  threads.erase(it++);
4025  ++nready;
4026  } else {
4027  ++it;
4028  }
4029  }
4030 
4031  if (nready == 0) {
4032  std::this_thread::yield();
4033  }
4034 
4035  // on eof, wait all threads to finish
4036  if (0 == size) {
4037  continue;
4038  }
4039 
4040  // keep reading if any free thread slot
4041  // this is one of the major difference from old threading model !!
4042  if (threads.size() < max_threads) {
4043  break;
4044  }
4045  }
4046 
4047  if (import_status.rows_rejected > copy_params.max_reject) {
4048  load_truncated = true;
4049  load_failed = true;
4050  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4051  break;
4052  }
4053  if (load_failed) {
4054  load_truncated = true;
4055  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
4056  "more details";
4057  break;
4058  }
4059  }
4060 
4061  // join dangling threads in case of LOG(ERROR) above
4062  for (auto& p : threads) {
4063  p.wait();
4064  }
4065  }
4066 
4067  checkpoint(start_epoch);
4068 
4069  // must set import_status.load_truncated before closing this end of pipe
4070  // otherwise, the thread on the other end would throw an unwanted 'write()'
4071  // exception
4072  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
4073  import_status.load_truncated = load_truncated;
4074 
4075  fclose(p_file);
4076  p_file = nullptr;
4077  return import_status;
4078 }
4079 
4081  if (getTableDesc()->persistenceLevel ==
4082  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4083  getCatalog().checkpoint(getTableDesc()->tableId);
4084  }
4085 }
4086 
4088  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4089  getTableDesc()->tableId);
4090 }
4091 
4092 void Loader::setTableEpoch(const int32_t start_epoch) {
4094  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4095 }
4096 
4097 void GDALErrorHandler(CPLErr eErrClass, int err_no, const char* msg) {
4098  CHECK(eErrClass >= CE_None && eErrClass <= CE_Fatal);
4099  static const char* errClassStrings[5] = {
4100  "Info",
4101  "Debug",
4102  "Warning",
4103  "Failure",
4104  "Fatal",
4105  };
4106  std::string log_msg = std::string("GDAL ") + errClassStrings[eErrClass] +
4107  std::string(": ") + msg + std::string(" (") +
4108  std::to_string(err_no) + std::string(")");
4109  if (eErrClass >= CE_Failure) {
4110  throw std::runtime_error(log_msg);
4111  } else {
4112  LOG(INFO) << log_msg;
4113  }
4114 }
4115 
4116 /* static */
4117 std::mutex Importer::init_gdal_mutex;
4118 
4119 /* static */
4121  // this should not be called from multiple threads, but...
4122  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
4123  // init under mutex
4124  static bool gdal_initialized = false;
4125  if (!gdal_initialized) {
4126  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
4127  setenv("GDAL_DATA",
4128  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
4129  true);
4130 
4131  // configure SSL certificate path (per S3Archive::init_for_read)
4132  // in a production build, GDAL and Curl will have been built on
4133  // CentOS, so the baked-in system path will be wrong for Ubuntu
4134  // and other Linux distros. Unless the user is deliberately
4135  // overriding it by setting SSL_CERT_FILE explicitly in the server
4136  // environment, we set it to whichever CA bundle directory exists
4137  // on the machine we're running on
4138  std::list<std::string> v_known_ca_paths({
4139  "/etc/ssl/certs/ca-certificates.crt",
4140  "/etc/pki/tls/certs/ca-bundle.crt",
4141  "/usr/share/ssl/certs/ca-bundle.crt",
4142  "/usr/local/share/certs/ca-root.crt",
4143  "/etc/ssl/cert.pem",
4144  "/etc/ssl/ca-bundle.pem",
4145  });
4146  for (const auto& known_ca_path : v_known_ca_paths) {
4147  if (boost::filesystem::exists(known_ca_path)) {
4148  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4149  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4150  break;
4151  }
4152  }
4153 
4154  GDALAllRegister();
4155  OGRRegisterAll();
4156  CPLSetErrorHandler(*GDALErrorHandler);
4157  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4158  gdal_initialized = true;
4159  }
4160 }
4161 
4163  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4164 }
4165 
4166 /* static */
4168  // for now we only support S3
4169  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4170  // only set if non-empty to allow GDAL defaults to persist
4171  // explicitly clear if empty to revert to default and not reuse a previous session's
4172  // keys
4173  if (copy_params.s3_region.size()) {
4174 #if DEBUG_AWS_AUTHENTICATION
4175  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4176 #endif
4177  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4178  } else {
4179 #if DEBUG_AWS_AUTHENTICATION
4180  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4181 #endif
4182  CPLSetConfigOption("AWS_REGION", nullptr);
4183  }
4184  if (copy_params.s3_endpoint.size()) {
4185 #if DEBUG_AWS_AUTHENTICATION
4186  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4187 #endif
4188  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4189  } else {
4190 #if DEBUG_AWS_AUTHENTICATION
4191  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4192 #endif
4193  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4194  }
4195  if (copy_params.s3_access_key.size()) {
4196 #if DEBUG_AWS_AUTHENTICATION
4197  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4198  << "'";
4199 #endif
4200  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4201  } else {
4202 #if DEBUG_AWS_AUTHENTICATION
4203  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4204 #endif
4205  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4206  }
4207  if (copy_params.s3_secret_key.size()) {
4208 #if DEBUG_AWS_AUTHENTICATION
4209  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4210  << "'";
4211 #endif
4212  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4213  } else {
4214 #if DEBUG_AWS_AUTHENTICATION
4215  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4216 #endif
4217  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4218  }
4219 
4220 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4221  // if we haven't set keys, we need to disable signed access
4222  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4223 #if DEBUG_AWS_AUTHENTICATION
4224  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4225 #endif
4226  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4227  } else {
4228 #if DEBUG_AWS_AUTHENTICATION
4229  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4230 #endif
4231  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4232  }
4233 #endif
4234 }
4235 
4236 /* static */
4237 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4238  const CopyParams& copy_params) {
4239  // lazy init GDAL
4240  initGDAL();
4241 
4242  // set authorization tokens
4243  setGDALAuthorizationTokens(copy_params);
4244 
4245  // open the file
4246  OGRDataSource* poDS;
4247 #if GDAL_VERSION_MAJOR == 1
4248  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4249 #else
4250  poDS = (OGRDataSource*)GDALOpenEx(
4251  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4252  if (poDS == nullptr) {
4253  poDS = (OGRDataSource*)GDALOpenEx(
4254  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4255  if (poDS) {
4256  LOG(INFO) << "openGDALDataset had to open as read-only";
4257  }
4258  }
4259 #endif
4260  if (poDS == nullptr) {
4261  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4262  }
4263  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4264  // in a memory leak if GDAL successfully opened the input dataset.
4265  return poDS;
4266 }
4267 
4268 namespace {
4269 
4270 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4271  const OGRDataSourceUqPtr& poDS,
4272  const std::string& file_name) {
4273  // get layer with specified name, or default to first layer
4274  OGRLayer* poLayer = nullptr;
4275  if (geo_layer_name.size()) {
4276  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4277  if (poLayer == nullptr) {
4278  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4279  file_name);
4280  }
4281  } else {
4282  poLayer = poDS->GetLayer(0);
4283  if (poLayer == nullptr) {
4284  throw std::runtime_error("No layers found in " + file_name);
4285  }
4286  }
4287  return *poLayer;
4288 }
4289 
4290 } // namespace
4291 
4292 /* static */
4294  const std::string& file_name,
4295  const std::string& geo_column_name,
4296  std::map<std::string, std::vector<std::string>>& metadata,
4297  int rowLimit,
4298  const CopyParams& copy_params) {
4299  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4300  if (poDS == nullptr) {
4301  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4302  file_name);
4303  }
4304 
4305  OGRLayer& layer =
4306  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4307 
4308  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4309  CHECK(poFDefn);
4310 
4311  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4312  auto nFeats = layer.GetFeatureCount();
4313  size_t numFeatures =
4314  std::max(static_cast<decltype(nFeats)>(0),
4315  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4316  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4317  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4318  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4319  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4320  }
4321  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4322  layer.ResetReading();
4323  size_t iFeature = 0;
4324  while (iFeature < numFeatures) {
4325  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4326  if (!poFeature) {
4327  break;
4328  }
4329 
4330  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4331  if (poGeometry != nullptr) {
4332  // validate geom type (again?)
4333  switch (wkbFlatten(poGeometry->getGeometryType())) {
4334  case wkbPoint:
4335  case wkbLineString:
4336  case wkbPolygon:
4337  case wkbMultiPolygon:
4338  break;
4339  default:
4340  throw std::runtime_error("Unsupported geometry type: " +
4341  std::string(poGeometry->getGeometryName()));
4342  }
4343 
4344  // populate metadata for regular fields
4345  for (auto i : metadata) {
4346  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4347  if (iField >= 0) { // geom is -1
4348  metadata[i.first].at(iFeature) =
4349  std::string(poFeature->GetFieldAsString(iField));
4350  }
4351  }
4352 
4353  // populate metadata for geo column with WKT string
4354  char* wkts = nullptr;
4355  poGeometry->exportToWkt(&wkts);
4356  CHECK(wkts);
4357  metadata[geo_column_name].at(iFeature) = wkts;
4358  CPLFree(wkts);
4359  }
4360  iFeature++;
4361  }
4362 }
4363 
4364 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4365  switch (ogr_type) {
4366  case OFTInteger:
4367  return std::make_pair(kINT, false);
4368  case OFTIntegerList:
4369  return std::make_pair(kINT, true);
4370 #if GDAL_VERSION_MAJOR > 1
4371  case OFTInteger64:
4372  return std::make_pair(kBIGINT, false);
4373  case OFTInteger64List:
4374  return std::make_pair(kBIGINT, true);
4375 #endif
4376  case OFTReal:
4377  return std::make_pair(kDOUBLE, false);
4378  case OFTRealList:
4379  return std::make_pair(kDOUBLE, true);
4380  case OFTString:
4381  return std::make_pair(kTEXT, false);
4382  case OFTStringList:
4383  return std::make_pair(kTEXT, true);
4384  case OFTDate:
4385  return std::make_pair(kDATE, false);
4386  case OFTTime:
4387  return std::make_pair(kTIME, false);
4388  case OFTDateTime:
4389  return std::make_pair(kTIMESTAMP, false);
4390  case OFTBinary:
4391  // Interpret binary blobs as byte arrays here
4392  // but actual import will store NULL as GDAL will not
4393  // extract the blob (OGRFeature::GetFieldAsString will
4394  // result in the import buffers having an empty string)
4395  return std::make_pair(kTINYINT, true);
4396  default:
4397  break;
4398  }
4399  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4400 }
4401 
4402 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4403  switch (ogr_type) {
4404  case wkbPoint:
4405  return kPOINT;
4406  case wkbLineString:
4407  return kLINESTRING;
4408  case wkbPolygon:
4409  return kPOLYGON;
4410  case wkbMultiPolygon:
4411  return kMULTIPOLYGON;
4412  default:
4413  break;
4414  }
4415  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4416 }
4417 
4418 /* static */
4419 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4420  const std::string& file_name,
4421  const std::string& geo_column_name,
4422  const CopyParams& copy_params) {
4423  std::list<ColumnDescriptor> cds;
4424 
4425  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4426  if (poDS == nullptr) {
4427  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4428  file_name);
4429  }
4430 
4431  OGRLayer& layer =
4432  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4433 
4434  layer.ResetReading();
4435  // TODO(andrewseidl): support multiple features
4436  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4437  if (poFeature == nullptr) {
4438  throw std::runtime_error("No features found in " + file_name);
4439  }
4440  // get fields as regular columns
4441  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4442  CHECK(poFDefn);
4443  int iField;
4444  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4445  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4446  auto typePair = ogr_to_type(poFieldDefn->GetType());
4447  ColumnDescriptor cd;
4448  cd.columnName = poFieldDefn->GetNameRef();
4449  cd.sourceName = poFieldDefn->GetNameRef();
4450  SQLTypeInfo ti;
4451  if (typePair.second) {
4452  ti.set_type(kARRAY);
4453  ti.set_subtype(typePair.first);
4454  } else {
4455  ti.set_type(typePair.first);
4456  }
4457  if (typePair.first == kTEXT) {
4459  ti.set_comp_param(32);
4460  }
4461  ti.set_fixed_size();
4462  cd.columnType = ti;
4463  cds.push_back(cd);
4464  }
4465  // get geo column, if any
4466  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4467  if (poGeometry) {
4468  ColumnDescriptor cd;
4469  cd.columnName = geo_column_name;
4470  cd.sourceName = geo_column_name;
4471 
4472  // get GDAL type
4473  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4474 
4475  // if exploding, override any collection type to child type
4476  if (copy_params.geo_explode_collections) {
4477  if (ogr_type == wkbMultiPolygon) {
4478  ogr_type = wkbPolygon;
4479  } else if (ogr_type == wkbMultiLineString) {
4480  ogr_type = wkbLineString;
4481  } else if (ogr_type == wkbMultiPoint) {
4482  ogr_type = wkbPoint;
4483  }
4484  }
4485 
4486  // convert to internal type
4487  SQLTypes geoType = ogr_to_type(ogr_type);
4488 
4489  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4490  if (PROMOTE_POLYGON_TO_MULTIPOLYGON && !copy_params.geo_explode_collections) {
4491  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4492  }
4493 
4494  // build full internal type
4495  SQLTypeInfo ti;
4496  ti.set_type(geoType);
4497  ti.set_subtype(copy_params.geo_coords_type);
4498  ti.set_input_srid(copy_params.geo_coords_srid);
4499  ti.set_output_srid(copy_params.geo_coords_srid);
4500  ti.set_compression(copy_params.geo_coords_encoding);
4501  ti.set_comp_param(copy_params.geo_coords_comp_param);
4502  cd.columnType = ti;
4503 
4504  cds.push_back(cd);
4505  }
4506  return cds;
4507 }
4508 
4509 bool Importer::gdalStatInternal(const std::string& path,
4510  const CopyParams& copy_params,
4511  bool also_dir) {
4512  // lazy init GDAL
4513  initGDAL();
4514 
4515  // set authorization tokens
4516  setGDALAuthorizationTokens(copy_params);
4517 
4518 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4519  // clear GDAL stat cache
4520  // without this, file existence will be cached, even if authentication changes
4521  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4522  VSICurlClearCache();
4523 #endif
4524 
4525  // stat path
4526  VSIStatBufL sb;
4527  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4528  if (result < 0) {
4529  return false;
4530  }
4531 
4532  // exists?
4533  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4534  return true;
4535  } else if (VSI_ISREG(sb.st_mode)) {
4536  return true;
4537  }
4538  return false;
4539 }
4540 
4541 /* static */
4542 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4543  return gdalStatInternal(path, copy_params, false);
4544 }
4545 
4546 /* static */
4547 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4548  const CopyParams& copy_params) {
4549  return gdalStatInternal(path, copy_params, true);
4550 }
4551 
4552 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4553  std::vector<std::string>& files) {
4554  // prepare to gather subdirectories
4555  std::vector<std::string> subdirectories;
4556 
4557  // get entries
4558  char** entries = VSIReadDir(archive_path.c_str());
4559  if (!entries) {
4560  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4561  return;
4562  }
4563 
4564  // force scope
4565  {
4566  // request clean-up
4567  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4568 
4569  // check all the entries
4570  int index = 0;
4571  while (true) {
4572  // get next entry, or drop out if there isn't one
4573  char* entry_c = entries[index++];
4574  if (!entry_c) {
4575  break;
4576  }
4577  std::string entry(entry_c);
4578 
4579  // ignore '.' and '..'
4580  if (entry == "." || entry == "..") {
4581  continue;
4582  }
4583 
4584  // build the full path
4585  std::string entry_path = archive_path + std::string("/") + entry;
4586 
4587  // is it a file or a sub-folder
4588  VSIStatBufL sb;
4589  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4590  if (result < 0) {
4591  break;
4592  }
4593 
4594  if (VSI_ISDIR(sb.st_mode)) {
4595  // a directory that ends with .gdb could be a Geodatabase bundle
4596  // arguably dangerous to decide this purely by name, but any further
4597  // validation would be very complex especially at this scope
4598  if (boost::iends_with(entry_path, ".gdb")) {
4599  // add the directory as if it was a file and don't recurse into it
4600  files.push_back(entry_path);
4601  } else {
4602  // add subdirectory to be recursed into
4603  subdirectories.push_back(entry_path);
4604  }
4605  } else {
4606  // add this file
4607  files.push_back(entry_path);
4608  }
4609  }
4610  }
4611 
4612  // recurse into each subdirectories we found
4613  for (const auto& subdirectory : subdirectories) {
4614  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4615  }
4616 }
4617 
4618 /* static */
4619 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4620  const std::string& archive_path,
4621  const CopyParams& copy_params) {
4622  // lazy init GDAL
4623  initGDAL();
4624 
4625  // set authorization tokens
4626  setGDALAuthorizationTokens(copy_params);
4627 
4628  // prepare to gather files
4629  std::vector<std::string> files;
4630 
4631  // gather the files recursively
4632  gdalGatherFilesInArchiveRecursive(archive_path, files);
4633 
4634  // convert to relative paths inside archive
4635  for (auto& file : files) {
4636  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4637  }
4638 
4639  // done
4640  return files;
4641 }
4642 
4643 /* static */
4644 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4645  const std::string& file_name,
4646  const CopyParams& copy_params) {
4647  // lazy init GDAL
4648  initGDAL();
4649 
4650  // set authorization tokens
4651  setGDALAuthorizationTokens(copy_params);
4652 
4653  // prepare to gather layer info
4654  std::vector<GeoFileLayerInfo> layer_info;
4655 
4656  // open the data set
4657  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4658  if (poDS == nullptr) {
4659  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4660  file_name);
4661  }
4662 
4663  // enumerate the layers
4664  for (auto&& poLayer : poDS->GetLayers()) {
4666  // prepare to read this layer
4667  poLayer->ResetReading();
4668  // skip layer if empty
4669  if (poLayer->GetFeatureCount() > 0) {
4670  // get first feature
4671  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4672  CHECK(first_feature);
4673  // check feature for geometry
4674  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4675  if (!geometry) {
4676  // layer has no geometry
4677  contents = GeoFileLayerContents::NON_GEO;
4678  } else {
4679  // check the geometry type
4680  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4681  switch (wkbFlatten(geometry_type)) {
4682  case wkbPoint:
4683  case wkbLineString:
4684  case wkbPolygon:
4685  case wkbMultiPolygon:
4686  // layer has supported geo
4687  contents = GeoFileLayerContents::GEO;
4688  break;
4689  default:
4690  // layer has unsupported geometry
4692  break;
4693  }
4694  }
4695  }
4696  // store info for this layer
4697  layer_info.emplace_back(poLayer->GetName(), contents);
4698  }
4699 
4700  // done
4701  return layer_info;
4702 }
4703 
4704 /* static */
4706 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4707  return true;
4708 #else
4709  return false;
4710 #endif
4711 }
4712 
4714  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4715  // initial status
4716  bool load_truncated = false;
4718 
4720  if (poDS == nullptr) {
4721  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4722  file_path);
4723  }
4724 
4725  OGRLayer& layer =
4727 
4728  // get the number of features in this layer
4729  size_t numFeatures = layer.GetFeatureCount();
4730 
4731  // build map of metadata field (additional columns) name to index
4732  // use shared_ptr since we need to pass it to the worker
4733  FieldNameToIndexMapType fieldNameToIndexMap;
4734  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4735  CHECK(poFDefn);
4736  size_t numFields = poFDefn->GetFieldCount();
4737  for (size_t iField = 0; iField < numFields; iField++) {
4738  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4739  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4740  }
4741 
4742  // the geographic spatial reference we want to put everything in
4743  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4744  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4745 
4746 #if GDAL_VERSION_MAJOR >= 3
4747  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4748  // this results in X and Y being transposed for angle-based
4749  // coordinate systems. This restores the previous behavior.
4750  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4751 #endif
4752 
4753 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4754  // just one "thread"
4755  size_t max_threads = 1;
4756 #else
4757  // how many threads to use
4758  size_t max_threads = 0;
4759  if (copy_params.threads == 0) {
4760  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4761  } else {
4762  max_threads = copy_params.threads;
4763  }
4764 #endif
4765 
4766  // make an import buffer for each thread
4767  CHECK_EQ(import_buffers_vec.size(), 0u);
4768  import_buffers_vec.resize(max_threads);
4769  for (size_t i = 0; i < max_threads; i++) {
4770  for (const auto cd : loader->get_column_descs()) {
4771  import_buffers_vec[i].emplace_back(
4772  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4773  }
4774  }
4775 
4776  // make render group analyzers for each poly column
4777  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4779  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4780  loader->getTableDesc()->tableId, false, false, false);
4781  for (auto cd : columnDescriptors) {
4782  SQLTypes ct = cd->columnType.get_type();
4783  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4784  auto rga = std::make_shared<RenderGroupAnalyzer>();
4785  rga->seedFromExistingTableContents(loader, cd->columnName);
4786  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4787  }
4788  }
4789  }
4790 
4791 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4792  // threads
4793  std::list<std::future<ImportStatus>> threads;
4794 
4795  // use a stack to track thread_ids which must not overlap among threads
4796  // because thread_id is used to index import_buffers_vec[]
4797  std::stack<size_t> stack_thread_ids;
4798  for (size_t i = 0; i < max_threads; i++) {
4799  stack_thread_ids.push(i);
4800  }
4801 #endif
4802 
4803  // checkpoint the table
4804  auto start_epoch = loader->getTableEpoch();
4805 
4806  // reset the layer
4807  layer.ResetReading();
4808 
4809  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4810 
4811  // make a features buffer for each thread
4812  std::vector<FeaturePtrVector> features(max_threads);
4813 
4814  // for each feature...
4815  size_t firstFeatureThisChunk = 0;
4816  while (firstFeatureThisChunk < numFeatures) {
4817  // how many features this chunk
4818  size_t numFeaturesThisChunk =
4819  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4820 
4821 // get a thread_id not in use
4822 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4823  size_t thread_id = 0;
4824 #else
4825  auto thread_id = stack_thread_ids.top();
4826  stack_thread_ids.pop();
4827  CHECK(thread_id < max_threads);
4828 #endif
4829 
4830  // fill features buffer for new thread
4831  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4832  features[thread_id].emplace_back(layer.GetNextFeature());
4833  }
4834 
4835 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4836  // call worker function directly
4837  auto ret_import_status = import_thread_shapefile(0,
4838  this,
4839  poGeographicSR.get(),
4840  std::move(features[thread_id]),
4841  firstFeatureThisChunk,
4842  numFeaturesThisChunk,
4843  fieldNameToIndexMap,
4844  columnNameToSourceNameMap,
4845  columnIdToRenderGroupAnalyzerMap);
4846  import_status += ret_import_status;
4847  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4848  import_status.rows_completed;
4849  set_import_status(import_id, import_status);
4850 #else
4851  // fire up that thread to import this geometry
4852  threads.push_back(std::async(std::launch::async,
4854  thread_id,
4855  this,
4856  poGeographicSR.get(),
4857  std::move(features[thread_id]),
4858  firstFeatureThisChunk,
4859  numFeaturesThisChunk,
4860  fieldNameToIndexMap,
4861  columnNameToSourceNameMap,
4862  columnIdToRenderGroupAnalyzerMap));
4863 
4864  // let the threads run
4865  while (threads.size() > 0) {
4866  int nready = 0;
4867  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4868  it != threads.end();) {
4869  auto& p = *it;
4870  std::chrono::milliseconds span(
4871  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4872  if (p.wait_for(span) == std::future_status::ready) {
4873  auto ret_import_status = p.get();
4874  import_status += ret_import_status;
4875  import_status.rows_estimated =
4876  ((float)firstFeatureThisChunk / (float)numFeatures) *
4877  import_status.rows_completed;
4878  set_import_status(import_id, import_status);
4879 
4880  // recall thread_id for reuse
4881  stack_thread_ids.push(ret_import_status.thread_id);
4882 
4883  threads.erase(it++);
4884  ++nready;
4885  } else {
4886  ++it;
4887  }
4888  }
4889 
4890  if (nready == 0) {
4891  std::this_thread::yield();
4892  }
4893 
4894  // keep reading if any free thread slot
4895  // this is one of the major difference from old threading model !!
4896  if (threads.size() < max_threads) {
4897  break;
4898  }
4899  }
4900 #endif
4901 
4902  // out of rows?
4903  if (import_status.rows_rejected > copy_params.max_reject) {
4904  load_truncated = true;
4905  load_failed = true;
4906  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4907  break;
4908  }
4909 
4910  // failed?
4911  if (load_failed) {
4912  load_truncated = true;
4913  LOG(ERROR)
4914  << "A call to the Loader::load failed, Please review the logs for more details";
4915  break;
4916  }
4917 
4918  firstFeatureThisChunk += numFeaturesThisChunk;
4919  }
4920 
4921 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4922  // wait for any remaining threads
4923  if (threads.size()) {
4924  for (auto& p : threads) {
4925  // wait for the thread
4926  p.wait();
4927  // get the result and update the final import status
4928  auto ret_import_status = p.get();
4929  import_status += ret_import_status;
4932  }
4933  }
4934 #endif
4935 
4936  checkpoint(start_epoch);
4937 
4938  // must set import_status.load_truncated before closing this end of pipe
4939  // otherwise, the thread on the other end would throw an unwanted 'write()'
4940  // exception
4941  import_status.load_truncated = load_truncated;
4942  return import_status;
4943 }
4944 
4945 //
4946 // class RenderGroupAnalyzer
4947 //
4948 
4950  const std::unique_ptr<Loader>& loader,
4951  const std::string& geoColumnBaseName) {
4952  // start timer
4953  auto seedTimer = timer_start();
4954 
4955  // start with a fresh tree
4956  _rtree = nullptr;
4957  _numRenderGroups = 0;
4958 
4959  // get the table descriptor
4960  const auto& cat = loader->getCatalog();
4961  if (loader->getTableDesc()->storageType == StorageType::FOREIGN_TABLE) {
4963  LOG(INFO) << "DEBUG: Table is a foreign table";
4964  }
4965  _rtree = std::make_unique<RTree>();
4966  CHECK(_rtree);
4967  return;
4968  }
4969 
4970  const std::string& tableName = loader->getTableDesc()->tableName;
4971  const auto td = cat.getMetadataForTable(tableName);
4972  CHECK(td);
4973  CHECK(td->fragmenter);
4974 
4975  // if the table is empty, just make an empty tree
4976  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4978  LOG(INFO) << "DEBUG: Table is empty!";
4979  }
4980  _rtree = std::make_unique<RTree>();
4981  CHECK(_rtree);
4982  return;
4983  }
4984 
4985  // no seeding possible without these two columns
4986  const auto cd_bounds =
4987  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
4988  const auto cd_render_group =
4989  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
4990  if (!cd_bounds || !cd_render_group) {
4991  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4992  " doesn't have bounds or render_group columns!");
4993  }
4994 
4995  // and validate their types
4996  if (cd_bounds->columnType.get_type() != kARRAY ||
4997  cd_bounds->columnType.get_subtype() != kDOUBLE) {
4998  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4999  " bounds column is wrong type!");
5000  }
5001  if (cd_render_group->columnType.get_type() != kINT) {
5002  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5003  " render_group column is wrong type!");
5004  }
5005 
5006  // get chunk accessor table
5007  auto chunkAccessorTable = getChunkAccessorTable(
5008  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
5009  const auto table_count = std::get<0>(chunkAccessorTable.back());
5010 
5012  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
5013  << "'";
5014  }
5015 
5016  std::vector<Node> nodes;
5017  try {
5018  nodes.resize(table_count);
5019  } catch (const std::exception& e) {
5020  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
5021  std::to_string(table_count) + " rows");
5022  }
5023 
5024  for (size_t row = 0; row < table_count; row++) {
5025  ArrayDatum ad;
5026  VarlenDatum vd;
5027  bool is_end;
5028 
5029  // get ChunkIters and fragment row offset
5030  size_t rowOffset = 0;
5031  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
5032  auto& boundsChunkIter = chunkIters[0];
5033  auto& renderGroupChunkIter = chunkIters[1];
5034 
5035  // get bounds values
5036  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
5037  CHECK(!is_end);
5038  CHECK(ad.pointer);
5039  int numBounds = (int)(ad.length / sizeof(double));
5040  CHECK(numBounds == 4);
5041 
5042  // convert to bounding box
5043  double* bounds = reinterpret_cast<double*>(ad.pointer);
5044  BoundingBox bounding_box;
5045  boost::geometry::assign_inverse(bounding_box);
5046  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5047  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5048 
5049  // get render group
5050  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
5051  CHECK(!is_end);
5052  CHECK(vd.pointer);
5053  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
5054  CHECK_GE(renderGroup, 0);
5055 
5056  // store
5057  nodes[row] = std::make_pair(bounding_box, renderGroup);
5058 
5059  // how many render groups do we have now?
5060  if (renderGroup >= _numRenderGroups) {
5061  _numRenderGroups = renderGroup + 1;
5062  }
5063 
5065  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
5066  }
5067  }
5068 
5069  // bulk-load the tree
5070  auto bulk_load_timer = timer_start();
5071  _rtree = std::make_unique<RTree>(nodes);
5072  CHECK(_rtree);
5073  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
5074  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
5075  << timer_stop(bulk_load_timer) << " ms for tree)";
5076 
5078  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
5079  }
5080 }
5081 
5083  const std::vector<double>& bounds) {
5084  // validate
5085  CHECK(bounds.size() == 4);
5086 
5087  // get bounds
5088  BoundingBox bounding_box;
5089  boost::geometry::assign_inverse(bounding_box);
5090  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5091  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5092 
5093  // remainder under mutex to allow this to be multi-threaded
5094  std::lock_guard<std::mutex> guard(_rtreeMutex);
5095 
5096  // get the intersecting nodes
5097  std::vector<Node> intersects;
5098  _rtree->query(boost::geometry::index::intersects(bounding_box),
5099  std::back_inserter(intersects));
5100 
5101  // build bitset of render groups of the intersecting rectangles
5102  // clear bit means available, allows use of find_first()
5103  boost::dynamic_bitset<> bits(_numRenderGroups);
5104  bits.set();
5105  for (const auto& intersection : intersects) {
5106  CHECK(intersection.second < _numRenderGroups);
5107  bits.reset(intersection.second);
5108  }
5109 
5110  // find first available group
5111  int firstAvailableRenderGroup = 0;
5112  size_t firstSetBit = bits.find_first();
5113  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5114  // all known groups represented, add a new one
5115  firstAvailableRenderGroup = _numRenderGroups;
5116  _numRenderGroups++;
5117  } else {
5118  firstAvailableRenderGroup = (int)firstSetBit;
5119  }
5120 
5121  // insert new node
5122  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5123 
5124  // return it
5125  return firstAvailableRenderGroup;
5126 }
5127 
5128 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
5129  const TableDescriptor* td,
5130  Loader* loader) {
5131  CHECK(td);
5132  auto col_descs = loader->get_column_descs();
5133 
5134  std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
5135  for (auto cd : col_descs) {
5136  import_buffers.emplace_back(
5137  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
5138  }
5139 
5140  return import_buffers;
5141 }
5142 
5143 } // namespace Importer_NS
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
int8_t tinyintval
Definition: sqltypes.h:133
std::unique_ptr< Loader > loader
Definition: Importer.h:820
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4644
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4167
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4364
void set_compression(EncodingType c)
Definition: sqltypes.h:358
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2494
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:208
#define NULL_DOUBLE
Definition: sqltypes.h:185
bool is_time() const
Definition: sqltypes.h:415
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3116
std::vector< size_t > file_offsets
Definition: Importer.h:646
int8_t * getStringDictBuffer() const
Definition: Importer.h:354
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3622
auto getLoader() const
Definition: Importer.h:805
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
bool is_string() const
Definition: sqltypes.h:409
const int8_t const int64_t * num_rows
const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const Importer_NS::CopyParams &copy_params, const bool *is_array, std::vector< T > &row, std::vector< std::unique_ptr< char[]>> &tmp_buffers, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
std::string null_str
Definition: CopyParams.h:47
Definition: sqltypes.h:50
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2808
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor *> &col_descs)
Definition: Importer.cpp:1679
SQLTypes
Definition: sqltypes.h:39
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:148
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:848
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:149
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:77
std::string error_context(const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
Definition: ArrowImporter.h:75
virtual void checkpoint()
Definition: Importer.cpp:4080
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2933
void parse_string_array(const std::string &s, const Importer_NS::CopyParams &copy_params, std::vector< std::string > &string_vec)
Parses given string array and inserts into given vector of strings.
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:193
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2447
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
#define LOG(tag)
Definition: Logger.h:188
bool boolval
Definition: sqltypes.h:132
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:5082
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4713
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
std::chrono::steady_clock::time_point end
Definition: Importer.h:598
~Importer() override
Definition: Importer.cpp:196
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
bool try_cast(const std::string &str)
Definition: Importer.cpp:2912
const CopyParams & get_copy_params() const
Definition: Importer.h:736
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:395
#define UNREACHABLE()
Definition: Logger.h:241
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:349
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5128
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2649
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2440
HOST DEVICE int get_size() const
Definition: sqltypes.h:268
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:821
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
std::vector< std::unique_ptr< TypedImportBuffer > > OneShardBuffers
Definition: Importer.h:570
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:713
auto value_getter(const Array &array, const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
void addDictEncodedString(const std::vector< std::string > &string_vec)
Definition: Importer.cpp:470
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:737
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
std::chrono::steady_clock::time_point start
Definition: Importer.h:597
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2642
#define CHECK_GT(x, y)
Definition: Logger.h:209
size_t find_end(const char *buffer, size_t size, const Importer_NS::CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
bool is_decimal() const
Definition: sqltypes.h:412
int32_t intval
Definition: sqltypes.h:135
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:220
std::string sourceName
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:314
std::string to_string(char const *&&v)
int8_t * pointer
Definition: sqltypes.h:73
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:2777
EncodingType geo_coords_encoding
Definition: CopyParams.h:73
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4547
#define DEBUG_TIMING
Definition: Importer.cpp:138
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:129
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4419
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2921
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:232
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:265
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
void set_input_srid(int d)
Definition: sqltypes.h:352
float floatval
Definition: sqltypes.h:137
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1713
virtual void setTableEpoch(const int32_t new_epoch)
Definition: Importer.cpp:4092
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:602
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:3526
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:781
size_t g_archive_read_buf_size
Definition: Importer.cpp:75
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4270
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3167
std::string cat(Ts &&... args)
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
Definition: Importer.cpp:4949
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:819
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:714
std::string s3_access_key
Definition: CopyParams.h:62
static bool hasGDALLibKML()
Definition: Importer.cpp:4162
void set_fixed_size()
Definition: sqltypes.h:357
static bool gdalSupportsNetworkFileAccess()
Definition: Importer.cpp:4705
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::string import_id
Definition: Importer.h:815
std::shared_timed_mutex mapd_shared_mutex
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
ImportStatus archivePlumber()
Definition: Importer.cpp:3225
int64_t bigintval
Definition: sqltypes.h:136
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:259
#define NULL_FLOAT
Definition: sqltypes.h:184
int16_t smallintval
Definition: sqltypes.h:134
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:122
ImportHeaderRow has_header
Definition: CopyParams.h:48
specifies the content in-memory of a row in the column metadata table
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2131
std::vector< std::string > get_headers()
Definition: Importer.cpp:3175
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
#define DEBUG_RENDER_GROUP_ANALYZER
Definition: Importer.cpp:139
void set_output_srid(int s)
Definition: sqltypes.h:354
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:302
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1532
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1420
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
virtual int32_t getTableEpoch()
Definition: Importer.cpp:4087
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4237
std::string s3_endpoint
Definition: CopyParams.h:65
size_t find_beginning(const char *buffer, size_t begin, size_t end, const Importer_NS::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
void set_comp_param(int p)
Definition: sqltypes.h:359
std::mutex file_offsets_mutex
Definition: Importer.h:647
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3010
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords)
Definition: Importer.cpp:1408
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
Definition: sqltypes.h:54
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4542
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3018
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2478
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:859
void startThread(FuncType &&func, Args &&... args)
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2679
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
Definition: Datum.cpp:318
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: geo_types.cpp:626
Datum NullArrayDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:273
std::map< std::string, std::string > ColumnNameToSourceNameMapType
Definition: Importer.cpp:133
std::string s3_region
Definition: CopyParams.h:64
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3040
ArrayDatum TDatumToArrayDatum(const TDatum &datum, const SQLTypeInfo &ti)
Definition: Importer.cpp:451
ImportStatus import()
Definition: Importer.cpp:3868
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:535
std::vector< OGRFeatureUqPtr > FeaturePtrVector
Definition: Importer.cpp:136
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3194
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4509
static bool getGeoColumns(const std::string &wkt, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: geo_types.cpp:459
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4293
std::string geo_layer_name
Definition: CopyParams.h:78
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
ImportStatus import_status
Definition: Importer.h:643
Definition: sqltypes.h:42
ArrayDatum NullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:367
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
#define IS_STRING(T)
Definition: sqltypes.h:172
int64_t inline_fixed_encoding_null_array_val(const SQL_TYPE_INFO &ti)
const bool * get_is_array() const
Definition: Importer.h:748
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3152
std::vector< DataBlockPtr > get_data_block_pointers(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers)
Definition: Importer.cpp:2674
ThreadId thread_id()
Definition: Logger.cpp:715
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3187
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1803
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:617
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
#define CHECK(condition)
Definition: Logger.h:197
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:149
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4552
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
static void initGDAL()
Definition: Importer.cpp:4120
std::vector< int > ChunkKey
Definition: types.h:35
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:745
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
int8_t * getAsBytes() const
Definition: Importer.h:286
static constexpr size_t MAX_STRLEN
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2485
Definition: sqltypes.h:46
SQLTypeInfo columnType
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:60
std::set< int64_t > rows
Definition: Importer.h:76
static std::mutex init_gdal_mutex
Definition: Importer.h:822
specifies the content in-memory of a row in the table metadata table
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:132
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:539