OmniSciDB  2e3a973ef4
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <unistd.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
56 #include "Geospatial/Compression.h"
57 #include "Geospatial/Transforms.h"
58 #include "Geospatial/Types.h"
60 #include "ImportExport/GDAL.h"
61 #include "Logger/Logger.h"
64 #include "Shared/SqlTypesLayout.h"
65 #include "Shared/import_helpers.h"
66 #include "Shared/measure.h"
67 #include "Shared/misc.h"
68 #include "Shared/scope.h"
69 #include "Shared/shard_key.h"
70 #include "Shared/thread_count.h"
72 
73 #include "gen-cpp/OmniSci.h"
74 
75 size_t g_archive_read_buf_size = 1 << 20;
76 
77 inline auto get_filesize(const std::string& file_path) {
78  boost::filesystem::path boost_file_path{file_path};
80  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
81  return ec ? 0 : filesize;
82 }
83 
84 namespace {
85 
87  void operator()(OGRDataSource* datasource) {
88  if (datasource) {
89  GDALClose(datasource);
90  }
91  }
92 };
93 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
94 
96  void operator()(OGRFeature* feature) {
97  if (feature) {
98  OGRFeature::DestroyFeature(feature);
99  }
100  }
101 };
102 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
103 
105  void operator()(OGRSpatialReference* ref) {
106  if (ref) {
107  OGRSpatialReference::DestroySpatialReference(ref);
108  }
109  }
110 };
112  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
113 
114 } // namespace
115 
116 // For logging std::vector<std::string> row.
117 namespace boost {
118 namespace log {
119 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
120  out << '[';
121  for (size_t i = 0; i < row.size(); ++i) {
122  out << (i ? ", " : "") << row[i];
123  }
124  out << ']';
125  return out;
126 }
127 } // namespace log
128 } // namespace boost
129 
130 namespace import_export {
131 
132 using FieldNameToIndexMapType = std::map<std::string, size_t>;
133 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
135  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
136 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
137 
138 #define DEBUG_TIMING false
139 #define DEBUG_RENDER_GROUP_ANALYZER 0
140 #define DEBUG_AWS_AUTHENTICATION 0
141 
142 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
143 
144 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
145 
147 static std::map<std::string, ImportStatus> import_status_map;
148 
149 Importer::Importer(Catalog_Namespace::Catalog& c,
150  const TableDescriptor* t,
151  const std::string& f,
152  const CopyParams& p)
153  : Importer(new Loader(c, t), f, p) {}
154 
155 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
195 
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }
207 
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
212 
213 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
217  import_status_map[import_id] = is;
218 }
219 
220 static const std::string trim_space(const char* field, const size_t len) {
221  size_t i = 0;
222  size_t j = len;
223  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
224  i++;
225  }
226  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
227  j--;
228  }
229  return std::string(field + i, j - i);
230 }
231 
233  Datum d;
234  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
235  switch (type) {
236  case kBOOLEAN:
238  break;
239  case kBIGINT:
241  break;
242  case kINT:
244  break;
245  case kSMALLINT:
247  break;
248  case kTINYINT:
250  break;
251  case kFLOAT:
252  d.floatval = NULL_FLOAT;
253  break;
254  case kDOUBLE:
256  break;
257  case kTIME:
258  case kTIMESTAMP:
259  case kDATE:
261  break;
262  case kPOINT:
263  case kLINESTRING:
264  case kPOLYGON:
265  case kMULTIPOLYGON:
266  throw std::runtime_error("Internal error: geometry type in NullDatum.");
267  default:
268  throw std::runtime_error("Internal error: invalid type in NullDatum.");
269  }
270  return d;
271 }
272 
274  Datum d;
275  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
276  switch (type) {
277  case kBOOLEAN:
279  break;
280  case kBIGINT:
282  break;
283  case kINT:
285  break;
286  case kSMALLINT:
288  break;
289  case kTINYINT:
291  break;
292  case kFLOAT:
294  break;
295  case kDOUBLE:
297  break;
298  case kTIME:
299  case kTIMESTAMP:
300  case kDATE:
302  break;
303  case kPOINT:
304  case kLINESTRING:
305  case kPOLYGON:
306  case kMULTIPOLYGON:
307  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
308  default:
309  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
310  }
311  return d;
312 }
313 
314 ArrayDatum StringToArray(const std::string& s,
315  const SQLTypeInfo& ti,
316  const CopyParams& copy_params) {
317  SQLTypeInfo elem_ti = ti.get_elem_type();
318  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
319  return ArrayDatum(0, NULL, true);
320  }
321  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
322  LOG(WARNING) << "Malformed array: " << s;
323  return ArrayDatum(0, NULL, true);
324  }
325  std::vector<std::string> elem_strs;
326  size_t last = 1;
327  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
328  i = s.find(copy_params.array_delim, last)) {
329  elem_strs.push_back(s.substr(last, i - last));
330  last = i + 1;
331  }
332  if (last + 1 <= s.size()) {
333  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
334  }
335  if (elem_strs.size() == 1) {
336  auto str = elem_strs.front();
337  auto str_trimmed = trim_space(str.c_str(), str.length());
338  if (str_trimmed == "") {
339  elem_strs.clear(); // Empty array
340  }
341  }
342  if (!elem_ti.is_string()) {
343  size_t len = elem_strs.size() * elem_ti.get_size();
344  int8_t* buf = (int8_t*)checked_malloc(len);
345  int8_t* p = buf;
346  for (auto& es : elem_strs) {
347  auto e = trim_space(es.c_str(), es.length());
348  bool is_null = (e == copy_params.null_str) || e == "NULL";
349  if (!elem_ti.is_string() && e == "") {
350  is_null = true;
351  }
352  if (elem_ti.is_number() || elem_ti.is_time()) {
353  if (!isdigit(e[0]) && e[0] != '-') {
354  is_null = true;
355  }
356  }
357  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
358  p = appendDatum(p, d, elem_ti);
359  }
360  return ArrayDatum(len, buf, false);
361  }
362  // must not be called for array of strings
363  CHECK(false);
364  return ArrayDatum(0, NULL, true);
365 }
366 
368  SQLTypeInfo elem_ti = ti.get_elem_type();
369  auto len = ti.get_size();
370 
371  if (elem_ti.is_string()) {
372  // must not be called for array of strings
373  CHECK(false);
374  return ArrayDatum(0, NULL, true);
375  }
376 
377  if (len > 0) {
378  // Compose a NULL fixlen array
379  int8_t* buf = (int8_t*)checked_malloc(len);
380  // First scalar is a NULL_ARRAY sentinel
381  Datum d = NullArrayDatum(elem_ti);
382  int8_t* p = appendDatum(buf, d, elem_ti);
383  // Rest is filled with normal NULL sentinels
384  Datum d0 = NullDatum(elem_ti);
385  while ((p - buf) < len) {
386  p = appendDatum(p, d0, elem_ti);
387  }
388  CHECK((p - buf) == len);
389  return ArrayDatum(len, buf, true);
390  }
391  // NULL varlen array
392  return ArrayDatum(0, NULL, true);
393 }
394 
396  return NullArray(ti);
397 }
398 
399 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
400  const auto& arr = datum.val.arr_val;
401  for (const auto& elem_datum : arr) {
402  string_vec.push_back(elem_datum.val.str_val);
403  }
404 }
405 
406 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
407  Datum d;
408  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
409  switch (type) {
410  case kBOOLEAN:
411  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
412  break;
413  case kBIGINT:
414  d.bigintval =
415  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
416  break;
417  case kINT:
418  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
419  break;
420  case kSMALLINT:
421  d.smallintval =
422  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
423  break;
424  case kTINYINT:
425  d.tinyintval =
426  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
427  break;
428  case kFLOAT:
429  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
430  break;
431  case kDOUBLE:
432  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
433  break;
434  case kTIME:
435  case kTIMESTAMP:
436  case kDATE:
437  d.bigintval =
438  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
439  break;
440  case kPOINT:
441  case kLINESTRING:
442  case kPOLYGON:
443  case kMULTIPOLYGON:
444  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
445  default:
446  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
447  }
448  return d;
449 }
450 
451 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
452  SQLTypeInfo elem_ti = ti.get_elem_type();
453 
454  CHECK(!elem_ti.is_string());
455 
456  if (datum.is_null) {
457  return NullArray(ti);
458  }
459 
460  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
461  int8_t* buf = (int8_t*)checked_malloc(len);
462  int8_t* p = buf;
463  for (auto& e : datum.val.arr_val) {
464  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
465  }
466 
467  return ArrayDatum(len, buf, false);
468 }
469 
470 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
471  CHECK(string_dict_);
472  std::vector<std::string_view> string_view_vec;
473  string_view_vec.reserve(string_vec.size());
474  for (const auto& str : string_vec) {
475  if (str.size() > StringDictionary::MAX_STRLEN) {
476  throw std::runtime_error("String too long for dictionary encoding.");
477  }
478  string_view_vec.push_back(str);
479  }
480  switch (column_desc_->columnType.get_size()) {
481  case 1:
482  string_dict_i8_buffer_->resize(string_view_vec.size());
483  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
484  break;
485  case 2:
486  string_dict_i16_buffer_->resize(string_view_vec.size());
487  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
488  break;
489  case 4:
490  string_dict_i32_buffer_->resize(string_view_vec.size());
491  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
492  break;
493  default:
494  CHECK(false);
495  }
496 }
497 
499  const std::string_view val,
500  const bool is_null,
501  const CopyParams& copy_params,
502  const int64_t replicate_count) {
503  set_replicate_count(replicate_count);
504  const auto type = cd->columnType.get_type();
505  switch (type) {
506  case kBOOLEAN: {
507  if (is_null) {
508  if (cd->columnType.get_notnull()) {
509  throw std::runtime_error("NULL for column " + cd->columnName);
510  }
512  } else {
513  auto ti = cd->columnType;
514  Datum d = StringToDatum(val, ti);
515  addBoolean(static_cast<int8_t>(d.boolval));
516  }
517  break;
518  }
519  case kTINYINT: {
520  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
521  auto ti = cd->columnType;
522  Datum d = StringToDatum(val, ti);
523  addTinyint(d.tinyintval);
524  } else {
525  if (cd->columnType.get_notnull()) {
526  throw std::runtime_error("NULL for column " + cd->columnName);
527  }
529  }
530  break;
531  }
532  case kSMALLINT: {
533  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
534  auto ti = cd->columnType;
535  Datum d = StringToDatum(val, ti);
536  addSmallint(d.smallintval);
537  } else {
538  if (cd->columnType.get_notnull()) {
539  throw std::runtime_error("NULL for column " + cd->columnName);
540  }
541  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
542  }
543  break;
544  }
545  case kINT: {
546  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
547  auto ti = cd->columnType;
548  Datum d = StringToDatum(val, ti);
549  addInt(d.intval);
550  } else {
551  if (cd->columnType.get_notnull()) {
552  throw std::runtime_error("NULL for column " + cd->columnName);
553  }
555  }
556  break;
557  }
558  case kBIGINT: {
559  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
560  auto ti = cd->columnType;
561  Datum d = StringToDatum(val, ti);
562  addBigint(d.bigintval);
563  } else {
564  if (cd->columnType.get_notnull()) {
565  throw std::runtime_error("NULL for column " + cd->columnName);
566  }
568  }
569  break;
570  }
571  case kDECIMAL:
572  case kNUMERIC: {
573  if (!is_null) {
574  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
575  Datum d = StringToDatum(val, ti);
576  const auto converted_decimal_value =
578  addBigint(converted_decimal_value);
579  } else {
580  if (cd->columnType.get_notnull()) {
581  throw std::runtime_error("NULL for column " + cd->columnName);
582  }
584  }
585  break;
586  }
587  case kFLOAT:
588  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
589  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
590  } else {
591  if (cd->columnType.get_notnull()) {
592  throw std::runtime_error("NULL for column " + cd->columnName);
593  }
594  addFloat(NULL_FLOAT);
595  }
596  break;
597  case kDOUBLE:
598  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
599  addDouble(std::atof(std::string(val).c_str()));
600  } else {
601  if (cd->columnType.get_notnull()) {
602  throw std::runtime_error("NULL for column " + cd->columnName);
603  }
604  addDouble(NULL_DOUBLE);
605  }
606  break;
607  case kTEXT:
608  case kVARCHAR:
609  case kCHAR: {
610  // @TODO(wei) for now, use empty string for nulls
611  if (is_null) {
612  if (cd->columnType.get_notnull()) {
613  throw std::runtime_error("NULL for column " + cd->columnName);
614  }
615  addString(std::string());
616  } else {
617  if (val.length() > StringDictionary::MAX_STRLEN) {
618  throw std::runtime_error("String too long for column " + cd->columnName +
619  " was " + std::to_string(val.length()) + " max is " +
621  }
622  addString(val);
623  }
624  break;
625  }
626  case kTIME:
627  case kTIMESTAMP:
628  case kDATE:
629  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
630  SQLTypeInfo ti = cd->columnType;
631  Datum d = StringToDatum(val, ti);
632  addBigint(d.bigintval);
633  } else {
634  if (cd->columnType.get_notnull()) {
635  throw std::runtime_error("NULL for column " + cd->columnName);
636  }
638  }
639  break;
640  case kARRAY: {
641  if (is_null && cd->columnType.get_notnull()) {
642  throw std::runtime_error("NULL for column " + cd->columnName);
643  }
644  SQLTypeInfo ti = cd->columnType;
645  if (IS_STRING(ti.get_subtype())) {
646  std::vector<std::string> string_vec;
647  // Just parse string array, don't push it to buffer yet as we might throw
649  std::string(val), copy_params, string_vec);
650  if (!is_null) {
651  // TODO: add support for NULL string arrays
652  if (ti.get_size() > 0) {
653  auto sti = ti.get_elem_type();
654  size_t expected_size = ti.get_size() / sti.get_size();
655  size_t actual_size = string_vec.size();
656  if (actual_size != expected_size) {
657  throw std::runtime_error("Fixed length array column " + cd->columnName +
658  " expects " + std::to_string(expected_size) +
659  " values, received " +
660  std::to_string(actual_size));
661  }
662  }
663  addStringArray(string_vec);
664  } else {
665  if (ti.get_size() > 0) {
666  // TODO: remove once NULL fixlen arrays are allowed
667  throw std::runtime_error("Fixed length array column " + cd->columnName +
668  " currently cannot accept NULL arrays");
669  }
670  // TODO: add support for NULL string arrays, replace with addStringArray(),
671  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
672  addStringArray(string_vec);
673  }
674  } else {
675  if (!is_null) {
676  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
677  if (d.is_null) { // val could be "NULL"
678  addArray(NullArray(ti));
679  } else {
680  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
681  throw std::runtime_error("Fixed length array for column " + cd->columnName +
682  " has incorrect length: " + std::string(val));
683  }
684  addArray(d);
685  }
686  } else {
687  addArray(NullArray(ti));
688  }
689  }
690  break;
691  }
692  case kPOINT:
693  case kLINESTRING:
694  case kPOLYGON:
695  case kMULTIPOLYGON:
696  addGeoString(val);
697  break;
698  default:
699  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
700  }
701 }
702 
704  const auto type = column_desc_->columnType.is_decimal()
705  ? decimal_to_int_type(column_desc_->columnType)
706  : column_desc_->columnType.get_type();
707  switch (type) {
708  case kBOOLEAN:
709  bool_buffer_->pop_back();
710  break;
711  case kTINYINT:
712  tinyint_buffer_->pop_back();
713  break;
714  case kSMALLINT:
715  smallint_buffer_->pop_back();
716  break;
717  case kINT:
718  int_buffer_->pop_back();
719  break;
720  case kBIGINT:
721  bigint_buffer_->pop_back();
722  break;
723  case kFLOAT:
724  float_buffer_->pop_back();
725  break;
726  case kDOUBLE:
727  double_buffer_->pop_back();
728  break;
729  case kTEXT:
730  case kVARCHAR:
731  case kCHAR:
732  string_buffer_->pop_back();
733  break;
734  case kDATE:
735  case kTIME:
736  case kTIMESTAMP:
737  bigint_buffer_->pop_back();
738  break;
739  case kARRAY:
740  if (IS_STRING(column_desc_->columnType.get_subtype())) {
741  string_array_buffer_->pop_back();
742  } else {
743  array_buffer_->pop_back();
744  }
745  break;
746  case kPOINT:
747  case kLINESTRING:
748  case kPOLYGON:
749  case kMULTIPOLYGON:
750  geo_string_buffer_->pop_back();
751  break;
752  default:
753  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
754  }
755 }
756 
757 struct GeoImportException : std::runtime_error {
758  using std::runtime_error::runtime_error;
759 };
760 
761 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
762 template <typename DATA_TYPE>
764  const ColumnDescriptor* cd,
765  const Array& array,
766  std::vector<DATA_TYPE>& buffer,
767  const ArraySliceRange& slice_range,
768  import_export::BadRowsTracker* const bad_rows_tracker) {
769  auto data =
770  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
771  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
772  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
773  if (bad_rows_tracker && cd->columnType.is_geometry()) {
774  f_add_geo_phy_cols = [&](const int64_t row) {
775  // Populate physical columns (ref. DBHandler::load_table)
776  std::vector<double> coords, bounds;
777  std::vector<int> ring_sizes, poly_rings;
778  int render_group = 0;
779  SQLTypeInfo ti;
780  // replace any unexpected exception from getGeoColumns or other
781  // on this path with a GeoImportException so that we wont over
782  // push a null to the logical column...
783  try {
784  SQLTypeInfo import_ti{ti};
785  if (array.IsNull(row)) {
787  import_ti, coords, bounds, ring_sizes, poly_rings, false);
788  } else {
789  arrow_throw_if<GeoImportException>(
790  !Geospatial::GeoTypesFactory::getGeoColumns(geo_string_buffer_->back(),
791  ti,
792  coords,
793  bounds,
794  ring_sizes,
795  poly_rings,
796  false),
797  error_context(cd, bad_rows_tracker) + "Invalid geometry");
798  arrow_throw_if<GeoImportException>(
799  cd->columnType.get_type() != ti.get_type(),
800  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
801  }
802  auto col_idx_workpad = col_idx; // what a pitfall!!
804  bad_rows_tracker->importer->getCatalog(),
805  cd,
806  *import_buffers,
807  col_idx_workpad,
808  coords,
809  bounds,
810  ring_sizes,
811  poly_rings,
812  render_group);
813  } catch (GeoImportException&) {
814  throw;
815  } catch (std::runtime_error& e) {
816  throw GeoImportException(e.what());
817  } catch (const std::exception& e) {
818  throw GeoImportException(e.what());
819  } catch (...) {
820  throw GeoImportException("unknown exception");
821  }
822  };
823  }
824  auto f_mark_a_bad_row = [&](const auto row) {
825  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
826  bad_rows_tracker->rows.insert(row - slice_range.first);
827  };
828  buffer.reserve(slice_range.second - slice_range.first);
829  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
830  try {
831  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
832  f_add_geo_phy_cols(row);
833  } catch (GeoImportException&) {
834  f_mark_a_bad_row(row);
835  } catch (ArrowImporterException&) {
836  // trace bad rows of each column; otherwise rethrow.
837  if (bad_rows_tracker) {
838  *data << nullptr;
839  f_mark_a_bad_row(row);
840  } else {
841  throw;
842  }
843  }
844  }
845  return buffer.size();
846 }
847 
849  const Array& col,
850  const bool exact_type_match,
851  const ArraySliceRange& slice_range,
852  BadRowsTracker* const bad_rows_tracker) {
853  const auto type = cd->columnType.get_type();
854  if (cd->columnType.get_notnull()) {
855  // We can't have any null values for this column; to have them is an error
856  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
857  }
858 
859  switch (type) {
860  case kBOOLEAN:
861  if (exact_type_match) {
862  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
863  }
864  return convert_arrow_val_to_import_buffer(
865  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
866  case kTINYINT:
867  if (exact_type_match) {
868  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
869  }
870  return convert_arrow_val_to_import_buffer(
871  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
872  case kSMALLINT:
873  if (exact_type_match) {
874  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
875  }
876  return convert_arrow_val_to_import_buffer(
877  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
878  case kINT:
879  if (exact_type_match) {
880  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
881  }
882  return convert_arrow_val_to_import_buffer(
883  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
884  case kBIGINT:
885  case kNUMERIC:
886  case kDECIMAL:
887  if (exact_type_match) {
888  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
889  }
890  return convert_arrow_val_to_import_buffer(
891  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
892  case kFLOAT:
893  if (exact_type_match) {
894  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
895  }
896  return convert_arrow_val_to_import_buffer(
897  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
898  case kDOUBLE:
899  if (exact_type_match) {
900  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
901  }
902  return convert_arrow_val_to_import_buffer(
903  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
904  case kTEXT:
905  case kVARCHAR:
906  case kCHAR:
907  if (exact_type_match) {
908  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
909  "Expected string type");
910  }
911  return convert_arrow_val_to_import_buffer(
912  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
913  case kTIME:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
916  "Expected time32 or time64 type");
917  }
918  return convert_arrow_val_to_import_buffer(
919  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
920  case kTIMESTAMP:
921  if (exact_type_match) {
922  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
923  }
924  return convert_arrow_val_to_import_buffer(
925  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
926  case kDATE:
927  if (exact_type_match) {
928  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
929  "Expected date32 or date64 type");
930  }
931  return convert_arrow_val_to_import_buffer(
932  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
933  case kPOINT:
934  case kLINESTRING:
935  case kPOLYGON:
936  case kMULTIPOLYGON:
937  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
938  "Expected string type");
939  return convert_arrow_val_to_import_buffer(
940  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
941  case kARRAY:
942  throw std::runtime_error("Arrow array appends not yet supported");
943  default:
944  throw std::runtime_error("Invalid Type");
945  }
946 }
947 
948 // this is exclusively used by load_table_binary_columnar
949 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
950  size_t dataSize = 0;
951  if (cd->columnType.get_notnull()) {
952  // We can't have any null values for this column; to have them is an error
953  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
954  throw std::runtime_error("NULL for column " + cd->columnName);
955  }
956  }
957 
958  switch (cd->columnType.get_type()) {
959  case kBOOLEAN: {
960  dataSize = col.data.int_col.size();
961  bool_buffer_->reserve(dataSize);
962  for (size_t i = 0; i < dataSize; i++) {
963  if (col.nulls[i]) {
964  bool_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
965  } else {
966  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
967  }
968  }
969  break;
970  }
971  case kTINYINT: {
972  dataSize = col.data.int_col.size();
973  tinyint_buffer_->reserve(dataSize);
974  for (size_t i = 0; i < dataSize; i++) {
975  if (col.nulls[i]) {
976  tinyint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
977  } else {
978  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
979  }
980  }
981  break;
982  }
983  case kSMALLINT: {
984  dataSize = col.data.int_col.size();
985  smallint_buffer_->reserve(dataSize);
986  for (size_t i = 0; i < dataSize; i++) {
987  if (col.nulls[i]) {
988  smallint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
989  } else {
990  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
991  }
992  }
993  break;
994  }
995  case kINT: {
996  dataSize = col.data.int_col.size();
997  int_buffer_->reserve(dataSize);
998  for (size_t i = 0; i < dataSize; i++) {
999  if (col.nulls[i]) {
1000  int_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1001  } else {
1002  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1003  }
1004  }
1005  break;
1006  }
1007  case kBIGINT:
1008  case kNUMERIC:
1009  case kDECIMAL: {
1010  dataSize = col.data.int_col.size();
1011  bigint_buffer_->reserve(dataSize);
1012  for (size_t i = 0; i < dataSize; i++) {
1013  if (col.nulls[i]) {
1014  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1015  } else {
1016  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1017  }
1018  }
1019  break;
1020  }
1021  case kFLOAT: {
1022  dataSize = col.data.real_col.size();
1023  float_buffer_->reserve(dataSize);
1024  for (size_t i = 0; i < dataSize; i++) {
1025  if (col.nulls[i]) {
1026  float_buffer_->push_back(NULL_FLOAT);
1027  } else {
1028  float_buffer_->push_back((float)col.data.real_col[i]);
1029  }
1030  }
1031  break;
1032  }
1033  case kDOUBLE: {
1034  dataSize = col.data.real_col.size();
1035  double_buffer_->reserve(dataSize);
1036  for (size_t i = 0; i < dataSize; i++) {
1037  if (col.nulls[i]) {
1038  double_buffer_->push_back(NULL_DOUBLE);
1039  } else {
1040  double_buffer_->push_back((double)col.data.real_col[i]);
1041  }
1042  }
1043  break;
1044  }
1045  case kTEXT:
1046  case kVARCHAR:
1047  case kCHAR: {
1048  // TODO: for now, use empty string for nulls
1049  dataSize = col.data.str_col.size();
1050  string_buffer_->reserve(dataSize);
1051  for (size_t i = 0; i < dataSize; i++) {
1052  if (col.nulls[i]) {
1053  string_buffer_->push_back(std::string());
1054  } else {
1055  string_buffer_->push_back(col.data.str_col[i]);
1056  }
1057  }
1058  break;
1059  }
1060  case kTIME:
1061  case kTIMESTAMP:
1062  case kDATE: {
1063  dataSize = col.data.int_col.size();
1064  bigint_buffer_->reserve(dataSize);
1065  for (size_t i = 0; i < dataSize; i++) {
1066  if (col.nulls[i]) {
1067  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1068  } else {
1069  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1070  }
1071  }
1072  break;
1073  }
1074  case kPOINT:
1075  case kLINESTRING:
1076  case kPOLYGON:
1077  case kMULTIPOLYGON: {
1078  dataSize = col.data.str_col.size();
1079  geo_string_buffer_->reserve(dataSize);
1080  for (size_t i = 0; i < dataSize; i++) {
1081  if (col.nulls[i]) {
1082  // TODO: add support for NULL geo
1083  geo_string_buffer_->push_back(std::string());
1084  } else {
1085  geo_string_buffer_->push_back(col.data.str_col[i]);
1086  }
1087  }
1088  break;
1089  }
1090  case kARRAY: {
1091  dataSize = col.data.arr_col.size();
1092  if (IS_STRING(cd->columnType.get_subtype())) {
1093  for (size_t i = 0; i < dataSize; i++) {
1094  std::vector<std::string>& string_vec = addStringArray();
1095  if (!col.nulls[i]) {
1096  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1097  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1098  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1099  }
1100  }
1101  }
1102  } else {
1103  auto elem_ti = cd->columnType.get_subtype();
1104  switch (elem_ti) {
1105  case kBOOLEAN: {
1106  for (size_t i = 0; i < dataSize; i++) {
1107  if (col.nulls[i]) {
1108  addArray(NullArray(cd->columnType));
1109  } else {
1110  size_t len = col.data.arr_col[i].data.int_col.size();
1111  size_t byteSize = len * sizeof(int8_t);
1112  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1113  int8_t* p = buf;
1114  for (size_t j = 0; j < len; ++j) {
1115  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1116  p += sizeof(bool);
1117  }
1118  addArray(ArrayDatum(byteSize, buf, false));
1119  }
1120  }
1121  break;
1122  }
1123  case kTINYINT: {
1124  for (size_t i = 0; i < dataSize; i++) {
1125  if (col.nulls[i]) {
1126  addArray(NullArray(cd->columnType));
1127  } else {
1128  size_t len = col.data.arr_col[i].data.int_col.size();
1129  size_t byteSize = len * sizeof(int8_t);
1130  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1131  int8_t* p = buf;
1132  for (size_t j = 0; j < len; ++j) {
1133  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1134  p += sizeof(int8_t);
1135  }
1136  addArray(ArrayDatum(byteSize, buf, false));
1137  }
1138  }
1139  break;
1140  }
1141  case kSMALLINT: {
1142  for (size_t i = 0; i < dataSize; i++) {
1143  if (col.nulls[i]) {
1144  addArray(NullArray(cd->columnType));
1145  } else {
1146  size_t len = col.data.arr_col[i].data.int_col.size();
1147  size_t byteSize = len * sizeof(int16_t);
1148  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1149  int8_t* p = buf;
1150  for (size_t j = 0; j < len; ++j) {
1151  *(int16_t*)p =
1152  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1153  p += sizeof(int16_t);
1154  }
1155  addArray(ArrayDatum(byteSize, buf, false));
1156  }
1157  }
1158  break;
1159  }
1160  case kINT: {
1161  for (size_t i = 0; i < dataSize; i++) {
1162  if (col.nulls[i]) {
1163  addArray(NullArray(cd->columnType));
1164  } else {
1165  size_t len = col.data.arr_col[i].data.int_col.size();
1166  size_t byteSize = len * sizeof(int32_t);
1167  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1168  int8_t* p = buf;
1169  for (size_t j = 0; j < len; ++j) {
1170  *(int32_t*)p =
1171  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1172  p += sizeof(int32_t);
1173  }
1174  addArray(ArrayDatum(byteSize, buf, false));
1175  }
1176  }
1177  break;
1178  }
1179  case kBIGINT:
1180  case kNUMERIC:
1181  case kDECIMAL: {
1182  for (size_t i = 0; i < dataSize; i++) {
1183  if (col.nulls[i]) {
1184  addArray(NullArray(cd->columnType));
1185  } else {
1186  size_t len = col.data.arr_col[i].data.int_col.size();
1187  size_t byteSize = len * sizeof(int64_t);
1188  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1189  int8_t* p = buf;
1190  for (size_t j = 0; j < len; ++j) {
1191  *(int64_t*)p =
1192  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1193  p += sizeof(int64_t);
1194  }
1195  addArray(ArrayDatum(byteSize, buf, false));
1196  }
1197  }
1198  break;
1199  }
1200  case kFLOAT: {
1201  for (size_t i = 0; i < dataSize; i++) {
1202  if (col.nulls[i]) {
1203  addArray(NullArray(cd->columnType));
1204  } else {
1205  size_t len = col.data.arr_col[i].data.real_col.size();
1206  size_t byteSize = len * sizeof(float);
1207  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1208  int8_t* p = buf;
1209  for (size_t j = 0; j < len; ++j) {
1210  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1211  p += sizeof(float);
1212  }
1213  addArray(ArrayDatum(byteSize, buf, false));
1214  }
1215  }
1216  break;
1217  }
1218  case kDOUBLE: {
1219  for (size_t i = 0; i < dataSize; i++) {
1220  if (col.nulls[i]) {
1221  addArray(NullArray(cd->columnType));
1222  } else {
1223  size_t len = col.data.arr_col[i].data.real_col.size();
1224  size_t byteSize = len * sizeof(double);
1225  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1226  int8_t* p = buf;
1227  for (size_t j = 0; j < len; ++j) {
1228  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1229  p += sizeof(double);
1230  }
1231  addArray(ArrayDatum(byteSize, buf, false));
1232  }
1233  }
1234  break;
1235  }
1236  case kTIME:
1237  case kTIMESTAMP:
1238  case kDATE: {
1239  for (size_t i = 0; i < dataSize; i++) {
1240  if (col.nulls[i]) {
1241  addArray(NullArray(cd->columnType));
1242  } else {
1243  size_t len = col.data.arr_col[i].data.int_col.size();
1244  size_t byteWidth = sizeof(int64_t);
1245  size_t byteSize = len * byteWidth;
1246  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1247  int8_t* p = buf;
1248  for (size_t j = 0; j < len; ++j) {
1249  *reinterpret_cast<int64_t*>(p) =
1250  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1251  p += sizeof(int64_t);
1252  }
1253  addArray(ArrayDatum(byteSize, buf, false));
1254  }
1255  }
1256  break;
1257  }
1258  default:
1259  throw std::runtime_error("Invalid Array Type");
1260  }
1261  }
1262  break;
1263  }
1264  default:
1265  throw std::runtime_error("Invalid Type");
1266  }
1267  return dataSize;
1268 }
1269 
1271  const TDatum& datum,
1272  const bool is_null,
1273  const int64_t replicate_count) {
1274  set_replicate_count(replicate_count);
1275  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1276  : cd->columnType.get_type();
1277  switch (type) {
1278  case kBOOLEAN: {
1279  if (is_null) {
1280  if (cd->columnType.get_notnull()) {
1281  throw std::runtime_error("NULL for column " + cd->columnName);
1282  }
1283  addBoolean(inline_fixed_encoding_null_val(cd->columnType));
1284  } else {
1285  addBoolean((int8_t)datum.val.int_val);
1286  }
1287  break;
1288  }
1289  case kTINYINT:
1290  if (!is_null) {
1291  addTinyint((int8_t)datum.val.int_val);
1292  } else {
1293  if (cd->columnType.get_notnull()) {
1294  throw std::runtime_error("NULL for column " + cd->columnName);
1295  }
1296  addTinyint(inline_fixed_encoding_null_val(cd->columnType));
1297  }
1298  break;
1299  case kSMALLINT:
1300  if (!is_null) {
1301  addSmallint((int16_t)datum.val.int_val);
1302  } else {
1303  if (cd->columnType.get_notnull()) {
1304  throw std::runtime_error("NULL for column " + cd->columnName);
1305  }
1306  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
1307  }
1308  break;
1309  case kINT:
1310  if (!is_null) {
1311  addInt((int32_t)datum.val.int_val);
1312  } else {
1313  if (cd->columnType.get_notnull()) {
1314  throw std::runtime_error("NULL for column " + cd->columnName);
1315  }
1317  }
1318  break;
1319  case kBIGINT:
1320  if (!is_null) {
1321  addBigint(datum.val.int_val);
1322  } else {
1323  if (cd->columnType.get_notnull()) {
1324  throw std::runtime_error("NULL for column " + cd->columnName);
1325  }
1327  }
1328  break;
1329  case kFLOAT:
1330  if (!is_null) {
1331  addFloat((float)datum.val.real_val);
1332  } else {
1333  if (cd->columnType.get_notnull()) {
1334  throw std::runtime_error("NULL for column " + cd->columnName);
1335  }
1336  addFloat(NULL_FLOAT);
1337  }
1338  break;
1339  case kDOUBLE:
1340  if (!is_null) {
1341  addDouble(datum.val.real_val);
1342  } else {
1343  if (cd->columnType.get_notnull()) {
1344  throw std::runtime_error("NULL for column " + cd->columnName);
1345  }
1346  addDouble(NULL_DOUBLE);
1347  }
1348  break;
1349  case kTEXT:
1350  case kVARCHAR:
1351  case kCHAR: {
1352  // @TODO(wei) for now, use empty string for nulls
1353  if (is_null) {
1354  if (cd->columnType.get_notnull()) {
1355  throw std::runtime_error("NULL for column " + cd->columnName);
1356  }
1357  addString(std::string());
1358  } else {
1359  addString(datum.val.str_val);
1360  }
1361  break;
1362  }
1363  case kTIME:
1364  case kTIMESTAMP:
1365  case kDATE: {
1366  if (!is_null) {
1367  addBigint(datum.val.int_val);
1368  } else {
1369  if (cd->columnType.get_notnull()) {
1370  throw std::runtime_error("NULL for column " + cd->columnName);
1371  }
1373  }
1374  break;
1375  }
1376  case kARRAY:
1377  if (is_null && cd->columnType.get_notnull()) {
1378  throw std::runtime_error("NULL for column " + cd->columnName);
1379  }
1380  if (IS_STRING(cd->columnType.get_subtype())) {
1381  std::vector<std::string>& string_vec = addStringArray();
1382  addBinaryStringArray(datum, string_vec);
1383  } else {
1384  if (!is_null) {
1385  addArray(TDatumToArrayDatum(datum, cd->columnType));
1386  } else {
1387  addArray(NullArray(cd->columnType));
1388  }
1389  }
1390  break;
1391  case kPOINT:
1392  case kLINESTRING:
1393  case kPOLYGON:
1394  case kMULTIPOLYGON:
1395  if (is_null) {
1396  if (cd->columnType.get_notnull()) {
1397  throw std::runtime_error("NULL for column " + cd->columnName);
1398  }
1399  addGeoString(std::string());
1400  } else {
1401  addGeoString(datum.val.str_val);
1402  }
1403  break;
1404  default:
1405  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1406  }
1407 }
1408 
1409 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1410  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1411  return false;
1412  }
1413  // we don't need to do any coordinate-system transformation
1414  // here (yet) so we don't need to use any OGR API or types
1415  // just use the values directly (assumed to be in 4326)
1416  coords.push_back(lon);
1417  coords.push_back(lat);
1418  return true;
1419 }
1420 
1422  const Catalog_Namespace::Catalog& catalog,
1423  const ColumnDescriptor* cd,
1424  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1425  size_t& col_idx,
1426  std::vector<double>& coords,
1427  std::vector<double>& bounds,
1428  std::vector<int>& ring_sizes,
1429  std::vector<int>& poly_rings,
1430  int render_group,
1431  const int64_t replicate_count) {
1432  const auto col_ti = cd->columnType;
1433  const auto col_type = col_ti.get_type();
1434  auto columnId = cd->columnId;
1435  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1436  bool is_null_geo = false;
1437  bool is_null_point = false;
1438  if (!col_ti.get_notnull()) {
1439  // Check for NULL geo
1440  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1441  is_null_point = true;
1442  coords.clear();
1443  }
1444  is_null_geo = coords.empty();
1445  if (is_null_point) {
1446  coords.push_back(NULL_ARRAY_DOUBLE);
1447  coords.push_back(NULL_DOUBLE);
1448  // Treating POINT coords as notnull, need to store actual encoding
1449  // [un]compressed+[not]null
1450  is_null_geo = false;
1451  }
1452  }
1453  TDatum tdd_coords;
1454  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1455  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1456  // in a fixlen array, compressed and uncompressed.
1457  if (!is_null_geo) {
1458  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1459  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1460  for (auto cc : compressed_coords) {
1461  tdd_coords.val.arr_val.emplace_back();
1462  tdd_coords.val.arr_val.back().val.int_val = cc;
1463  }
1464  }
1465  tdd_coords.is_null = is_null_geo;
1466  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1467 
1468  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1469  // Create ring_sizes array value and add it to the physical column
1470  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1471  TDatum tdd_ring_sizes;
1472  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1473  if (!is_null_geo) {
1474  for (auto ring_size : ring_sizes) {
1475  tdd_ring_sizes.val.arr_val.emplace_back();
1476  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1477  }
1478  }
1479  tdd_ring_sizes.is_null = is_null_geo;
1480  import_buffers[col_idx++]->add_value(
1481  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1482  }
1483 
1484  if (col_type == kMULTIPOLYGON) {
1485  // Create poly_rings array value and add it to the physical column
1486  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1487  TDatum tdd_poly_rings;
1488  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1489  if (!is_null_geo) {
1490  for (auto num_rings : poly_rings) {
1491  tdd_poly_rings.val.arr_val.emplace_back();
1492  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1493  }
1494  }
1495  tdd_poly_rings.is_null = is_null_geo;
1496  import_buffers[col_idx++]->add_value(
1497  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1498  }
1499 
1500  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1501  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1502  TDatum tdd_bounds;
1503  tdd_bounds.val.arr_val.reserve(bounds.size());
1504  if (!is_null_geo) {
1505  for (auto b : bounds) {
1506  tdd_bounds.val.arr_val.emplace_back();
1507  tdd_bounds.val.arr_val.back().val.real_val = b;
1508  }
1509  }
1510  tdd_bounds.is_null = is_null_geo;
1511  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1512  }
1513 
1514  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1515  // Create render_group value and add it to the physical column
1516  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1517  TDatum td_render_group;
1518  td_render_group.val.int_val = render_group;
1519  td_render_group.is_null = is_null_geo;
1520  import_buffers[col_idx++]->add_value(
1521  cd_render_group, td_render_group, false, replicate_count);
1522  }
1523 }
1524 
1526  const Catalog_Namespace::Catalog& catalog,
1527  const ColumnDescriptor* cd,
1528  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1529  size_t& col_idx,
1530  std::vector<std::vector<double>>& coords_column,
1531  std::vector<std::vector<double>>& bounds_column,
1532  std::vector<std::vector<int>>& ring_sizes_column,
1533  std::vector<std::vector<int>>& poly_rings_column,
1534  int render_group,
1535  const int64_t replicate_count) {
1536  const auto col_ti = cd->columnType;
1537  const auto col_type = col_ti.get_type();
1538  auto columnId = cd->columnId;
1539 
1540  auto coords_row_count = coords_column.size();
1541  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1542  for (auto coords : coords_column) {
1543  bool is_null_geo = false;
1544  bool is_null_point = false;
1545  if (!col_ti.get_notnull()) {
1546  // Check for NULL geo
1547  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1548  is_null_point = true;
1549  coords.clear();
1550  }
1551  is_null_geo = coords.empty();
1552  if (is_null_point) {
1553  coords.push_back(NULL_ARRAY_DOUBLE);
1554  coords.push_back(NULL_DOUBLE);
1555  // Treating POINT coords as notnull, need to store actual encoding
1556  // [un]compressed+[not]null
1557  is_null_geo = false;
1558  }
1559  }
1560  std::vector<TDatum> td_coords_data;
1561  if (!is_null_geo) {
1562  std::vector<uint8_t> compressed_coords =
1563  Geospatial::compress_coords(coords, col_ti);
1564  for (auto cc : compressed_coords) {
1565  TDatum td_byte;
1566  td_byte.val.int_val = cc;
1567  td_coords_data.push_back(td_byte);
1568  }
1569  }
1570  TDatum tdd_coords;
1571  tdd_coords.val.arr_val = td_coords_data;
1572  tdd_coords.is_null = is_null_geo;
1573  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1574  }
1575  col_idx++;
1576 
1577  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1578  if (ring_sizes_column.size() != coords_row_count) {
1579  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1580  }
1581  // Create ring_sizes array value and add it to the physical column
1582  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1583  for (auto ring_sizes : ring_sizes_column) {
1584  bool is_null_geo = false;
1585  if (!col_ti.get_notnull()) {
1586  // Check for NULL geo
1587  is_null_geo = ring_sizes.empty();
1588  }
1589  std::vector<TDatum> td_ring_sizes;
1590  for (auto ring_size : ring_sizes) {
1591  TDatum td_ring_size;
1592  td_ring_size.val.int_val = ring_size;
1593  td_ring_sizes.push_back(td_ring_size);
1594  }
1595  TDatum tdd_ring_sizes;
1596  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1597  tdd_ring_sizes.is_null = is_null_geo;
1598  import_buffers[col_idx]->add_value(
1599  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1600  }
1601  col_idx++;
1602  }
1603 
1604  if (col_type == kMULTIPOLYGON) {
1605  if (poly_rings_column.size() != coords_row_count) {
1606  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1607  }
1608  // Create poly_rings array value and add it to the physical column
1609  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1610  for (auto poly_rings : poly_rings_column) {
1611  bool is_null_geo = false;
1612  if (!col_ti.get_notnull()) {
1613  // Check for NULL geo
1614  is_null_geo = poly_rings.empty();
1615  }
1616  std::vector<TDatum> td_poly_rings;
1617  for (auto num_rings : poly_rings) {
1618  TDatum td_num_rings;
1619  td_num_rings.val.int_val = num_rings;
1620  td_poly_rings.push_back(td_num_rings);
1621  }
1622  TDatum tdd_poly_rings;
1623  tdd_poly_rings.val.arr_val = td_poly_rings;
1624  tdd_poly_rings.is_null = is_null_geo;
1625  import_buffers[col_idx]->add_value(
1626  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1627  }
1628  col_idx++;
1629  }
1630 
1631  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1632  if (bounds_column.size() != coords_row_count) {
1633  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1634  }
1635  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1636  for (auto bounds : bounds_column) {
1637  bool is_null_geo = false;
1638  if (!col_ti.get_notnull()) {
1639  // Check for NULL geo
1640  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1641  }
1642  std::vector<TDatum> td_bounds_data;
1643  for (auto b : bounds) {
1644  TDatum td_double;
1645  td_double.val.real_val = b;
1646  td_bounds_data.push_back(td_double);
1647  }
1648  TDatum tdd_bounds;
1649  tdd_bounds.val.arr_val = td_bounds_data;
1650  tdd_bounds.is_null = is_null_geo;
1651  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1652  }
1653  col_idx++;
1654  }
1655 
1656  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1657  // Create render_group value and add it to the physical column
1658  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1659  TDatum td_render_group;
1660  td_render_group.val.int_val = render_group;
1661  td_render_group.is_null = false;
1662  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1663  import_buffers[col_idx]->add_value(
1664  cd_render_group, td_render_group, false, replicate_count);
1665  }
1666  col_idx++;
1667  }
1668 }
1669 
1670 namespace {
1671 
1672 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1673  const std::list<const ColumnDescriptor*>& col_descs) {
1674  // validate the columns
1675  // for now we can only explode into a single destination column
1676  // which must be of the child type (POLYGON, LINESTRING, POINT)
1677  int collection_col_idx = -1;
1678  int col_idx = 0;
1679  std::string collection_col_name;
1680  SQLTypes collection_child_type = kNULLT;
1681  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1682  auto const& cd = *cd_it;
1683  auto const col_type = cd->columnType.get_type();
1684  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1685  if (collection_col_idx >= 0) {
1686  throw std::runtime_error(
1687  "Explode Collections: Found more than one destination column");
1688  }
1689  collection_col_idx = col_idx;
1690  collection_child_type = col_type;
1691  collection_col_name = cd->columnName;
1692  }
1693  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1694  ++cd_it;
1695  }
1696  col_idx++;
1697  }
1698  if (collection_col_idx < 0) {
1699  throw std::runtime_error(
1700  "Explode Collections: Failed to find a supported column type to explode "
1701  "into");
1702  }
1703  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1704 }
1705 
1707  OGRGeometry* ogr_geometry,
1708  SQLTypes collection_child_type,
1709  const std::string& collection_col_name,
1710  size_t row_or_feature_idx,
1711  std::function<void(OGRGeometry*)> execute_import_lambda) {
1712  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1713  bool is_collection = false;
1714  switch (collection_child_type) {
1715  case kPOINT:
1716  switch (ogr_geometry_type) {
1717  case wkbMultiPoint:
1718  is_collection = true;
1719  break;
1720  case wkbPoint:
1721  break;
1722  default:
1723  throw std::runtime_error(
1724  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1725  }
1726  break;
1727  case kLINESTRING:
1728  switch (ogr_geometry_type) {
1729  case wkbMultiLineString:
1730  is_collection = true;
1731  break;
1732  case wkbLineString:
1733  break;
1734  default:
1735  throw std::runtime_error(
1736  "Explode Collections: Source geo type must be MULTILINESTRING or "
1737  "LINESTRING");
1738  }
1739  break;
1740  case kPOLYGON:
1741  switch (ogr_geometry_type) {
1742  case wkbMultiPolygon:
1743  is_collection = true;
1744  break;
1745  case wkbPolygon:
1746  break;
1747  default:
1748  throw std::runtime_error(
1749  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1750  }
1751  break;
1752  default:
1753  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1754  }
1755 
1756  int64_t us = 0LL;
1757 
1758  // explode or just import
1759  if (is_collection) {
1760  // cast to collection
1761  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1762  CHECK(collection_geometry);
1763 
1764 #if LOG_EXPLODE_COLLECTIONS
1765  // log number of children
1766  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1767  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1768  << " child rows";
1769 #endif
1770 
1771  // loop over children
1772  uint32_t child_geometry_count = 0;
1773  auto child_geometry_it = collection_geometry->begin();
1774  while (child_geometry_it != collection_geometry->end()) {
1775  // get and import this child
1776  OGRGeometry* import_geometry = *child_geometry_it;
1778  [&] { execute_import_lambda(import_geometry); });
1779 
1780  // next child
1781  child_geometry_it++;
1782  child_geometry_count++;
1783  }
1784  } else {
1785  // import non-collection row just once
1787  [&] { execute_import_lambda(ogr_geometry); });
1788  }
1789 
1790  // done
1791  return us;
1792 }
1793 
1794 } // namespace
1795 
1797  int thread_id,
1798  Importer* importer,
1799  std::unique_ptr<char[]> scratch_buffer,
1800  size_t begin_pos,
1801  size_t end_pos,
1802  size_t total_size,
1803  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1804  size_t first_row_index_this_buffer) {
1806  int64_t total_get_row_time_us = 0;
1807  int64_t total_str_to_val_time_us = 0;
1808  CHECK(scratch_buffer);
1809  auto buffer = scratch_buffer.get();
1810  auto load_ms = measure<>::execution([]() {});
1811  auto ms = measure<>::execution([&]() {
1812  const CopyParams& copy_params = importer->get_copy_params();
1813  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1814  size_t begin =
1815  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
1816  const char* thread_buf = buffer + begin_pos + begin;
1817  const char* thread_buf_end = buffer + end_pos;
1818  const char* buf_end = buffer + total_size;
1819  bool try_single_thread = false;
1820  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1821  importer->get_import_buffers(thread_id);
1823  int phys_cols = 0;
1824  int point_cols = 0;
1825  for (const auto cd : col_descs) {
1826  const auto& col_ti = cd->columnType;
1827  phys_cols += col_ti.get_physical_cols();
1828  if (cd->columnType.get_type() == kPOINT) {
1829  point_cols++;
1830  }
1831  }
1832  auto num_cols = col_descs.size() - phys_cols;
1833  for (const auto& p : import_buffers) {
1834  p->clear();
1835  }
1836  std::vector<std::string_view> row;
1837  size_t row_index_plus_one = 0;
1838  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1839  row.clear();
1840  std::vector<std::unique_ptr<char[]>>
1841  tmp_buffers; // holds string w/ removed escape chars, etc
1842  if (DEBUG_TIMING) {
1845  thread_buf_end,
1846  buf_end,
1847  copy_params,
1848  importer->get_is_array(),
1849  row,
1850  tmp_buffers,
1851  try_single_thread);
1852  });
1853  total_get_row_time_us += us;
1854  } else {
1856  thread_buf_end,
1857  buf_end,
1858  copy_params,
1859  importer->get_is_array(),
1860  row,
1861  tmp_buffers,
1862  try_single_thread);
1863  }
1864  row_index_plus_one++;
1865  // Each POINT could consume two separate coords instead of a single WKT
1866  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1867  import_status.rows_rejected++;
1868  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1869  << row.size() << "): " << shared::printContainer(row);
1870  if (import_status.rows_rejected > copy_params.max_reject) {
1871  break;
1872  }
1873  continue;
1874  }
1875 
1876  //
1877  // lambda for importing a row (perhaps multiple times if exploding a collection)
1878  //
1879 
1880  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1881  size_t import_idx = 0;
1882  size_t col_idx = 0;
1883  try {
1884  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1885  auto cd = *cd_it;
1886  const auto& col_ti = cd->columnType;
1887 
1888  bool is_null =
1889  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1890  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1891  // So initially nullness may be missed and not passed to add_value,
1892  // which then might also check and still decide it's actually a NULL, e.g.
1893  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1894  // So "NULL" is not recognized as NULL but then it's not recognized as
1895  // a valid kINT, so it's a NULL after all.
1896  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1897 
1898  // Treating empty as NULL
1899  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1900  is_null = true;
1901  }
1902 
1903  if (col_ti.get_physical_cols() == 0) {
1904  // not geo
1905 
1906  import_buffers[col_idx]->add_value(
1907  cd, row[import_idx], is_null, copy_params);
1908 
1909  // next
1910  ++import_idx;
1911  ++col_idx;
1912  } else {
1913  // geo
1914 
1915  // store null string in the base column
1916  import_buffers[col_idx]->add_value(
1917  cd, copy_params.null_str, true, copy_params);
1918 
1919  // WKT from string we're not storing
1920  auto const& geo_string = row[import_idx];
1921 
1922  // next
1923  ++import_idx;
1924  ++col_idx;
1925 
1926  SQLTypes col_type = col_ti.get_type();
1927  CHECK(IS_GEO(col_type));
1928 
1929  std::vector<double> coords;
1930  std::vector<double> bounds;
1931  std::vector<int> ring_sizes;
1932  std::vector<int> poly_rings;
1933  int render_group = 0;
1934 
1935  // if this is a POINT column, and the field is not null, and
1936  // looks like a scalar numeric value (and not a hex blob)
1937  // attempt to import two columns as lon/lat (or lat/lon)
1938  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
1939  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
1940  geo_string[0] == '-') &&
1941  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
1942  double lon = std::atof(std::string(geo_string).c_str());
1943  double lat = NAN;
1944  auto lat_str = row[import_idx];
1945  ++import_idx;
1946  if (lat_str.size() > 0 &&
1947  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1948  lat = std::atof(std::string(lat_str).c_str());
1949  }
1950  // Swap coordinates if this table uses a reverse order: lat/lon
1951  if (!copy_params.lonlat) {
1952  std::swap(lat, lon);
1953  }
1954  // TODO: should check if POINT column should have been declared with
1955  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1956  // throw std::runtime_error("POINT column " + cd->columnName + " is
1957  // not WGS84, cannot insert lon/lat");
1958  // }
1959  if (!importGeoFromLonLat(lon, lat, coords)) {
1960  throw std::runtime_error(
1961  "Cannot read lon/lat to insert into POINT column " +
1962  cd->columnName);
1963  }
1964  } else {
1965  // import it
1966  SQLTypeInfo import_ti{col_ti};
1967  if (is_null) {
1968  if (col_ti.get_notnull()) {
1969  throw std::runtime_error("NULL geo for column " + cd->columnName);
1970  }
1972  import_ti,
1973  coords,
1974  bounds,
1975  ring_sizes,
1976  poly_rings,
1977  PROMOTE_POLYGON_TO_MULTIPOLYGON);
1978  } else {
1979  if (import_geometry) {
1980  // geometry already exploded
1982  import_geometry,
1983  import_ti,
1984  coords,
1985  bounds,
1986  ring_sizes,
1987  poly_rings,
1988  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
1989  std::string msg =
1990  "Failed to extract valid geometry from exploded row " +
1991  std::to_string(first_row_index_this_buffer +
1992  row_index_plus_one) +
1993  " for column " + cd->columnName;
1994  throw std::runtime_error(msg);
1995  }
1996  } else {
1997  // extract geometry directly from WKT
1999  std::string(geo_string),
2000  import_ti,
2001  coords,
2002  bounds,
2003  ring_sizes,
2004  poly_rings,
2005  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2006  std::string msg = "Failed to extract valid geometry from row " +
2007  std::to_string(first_row_index_this_buffer +
2008  row_index_plus_one) +
2009  " for column " + cd->columnName;
2010  throw std::runtime_error(msg);
2011  }
2012  }
2013 
2014  // validate types
2015  if (col_type != import_ti.get_type()) {
2016  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2017  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2018  col_type == SQLTypes::kMULTIPOLYGON)) {
2019  throw std::runtime_error(
2020  "Imported geometry doesn't match the type of column " +
2021  cd->columnName);
2022  }
2023  }
2024  }
2025 
2026  // assign render group?
2027  if (columnIdToRenderGroupAnalyzerMap.size()) {
2028  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2029  if (ring_sizes.size()) {
2030  // get a suitable render group for these poly coords
2031  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2032  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2033  render_group =
2034  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2035  } else {
2036  // empty poly
2037  render_group = -1;
2038  }
2039  }
2040  }
2041  }
2042 
2043  // import extracted geo
2045  cd,
2046  import_buffers,
2047  col_idx,
2048  coords,
2049  bounds,
2050  ring_sizes,
2051  poly_rings,
2052  render_group);
2053 
2054  // skip remaining physical columns
2055  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2056  ++cd_it;
2057  }
2058  }
2059  }
2060  import_status.rows_completed++;
2061  } catch (const std::exception& e) {
2062  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2063  import_buffers[col_idx_to_pop]->pop_value();
2064  }
2065  import_status.rows_rejected++;
2066  LOG(ERROR) << "Input exception thrown: " << e.what()
2067  << ". Row discarded. Data: " << shared::printContainer(row);
2068  }
2069  };
2070 
2071  if (copy_params.geo_explode_collections) {
2072  // explode and import
2073  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2074  explode_collections_step1(col_descs);
2075  // pull out the collection WKT or WKB hex
2076  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2077  auto const& collection_geo_string = row[collection_col_idx];
2078  // convert to OGR
2079  OGRGeometry* ogr_geometry = nullptr;
2080  ScopeGuard destroy_ogr_geometry = [&] {
2081  if (ogr_geometry) {
2082  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2083  }
2084  };
2086  std::string(collection_geo_string));
2087  // do the explode and import
2088  us = explode_collections_step2(ogr_geometry,
2089  collection_child_type,
2090  collection_col_name,
2091  first_row_index_this_buffer + row_index_plus_one,
2092  execute_import_row);
2093  } else {
2094  // import non-collection row just once
2096  [&] { execute_import_row(nullptr); });
2097  }
2098  total_str_to_val_time_us += us;
2099  } // end thread
2100  if (import_status.rows_completed > 0) {
2101  load_ms = measure<>::execution(
2102  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2103  }
2104  });
2105  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2106  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2107  << import_status.rows_completed << " rows inserted in "
2108  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2109  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2110  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2111  << "sec" << std::endl;
2112  }
2113 
2114  import_status.thread_id = thread_id;
2115  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2116 
2117  return import_status;
2118 }
2119 
2121  int thread_id,
2122  Importer* importer,
2123  OGRSpatialReference* poGeographicSR,
2124  const FeaturePtrVector& features,
2125  size_t firstFeature,
2126  size_t numFeatures,
2127  const FieldNameToIndexMapType& fieldNameToIndexMap,
2128  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2129  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2131  const CopyParams& copy_params = importer->get_copy_params();
2132  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2133  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2134  importer->get_import_buffers(thread_id);
2135 
2136  for (const auto& p : import_buffers) {
2137  p->clear();
2138  }
2139 
2140  auto convert_timer = timer_start();
2141 
2142  // we create this on the fly based on the first feature's SR
2143  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2144 
2145  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2146  if (!features[iFeature]) {
2147  continue;
2148  }
2149 
2150  // get this feature's geometry
2151  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2152  if (pGeometry) {
2153  // for geodatabase, we need to consider features with no geometry
2154  // as we still want to create a table, even if it has no geo column
2155 
2156  // transform it
2157  // avoid GDAL error if not transformable
2158  auto geometry_sr = pGeometry->getSpatialReference();
2159  if (geometry_sr) {
2160  // create an OGRCoordinateTransformation (CT) on the fly
2161  // we must assume that all geo in this file will have
2162  // the same source SR, so the CT will be valid for all
2163  // transforming to a reusable CT is faster than to an SR
2164  if (coordinate_transformation == nullptr) {
2165  coordinate_transformation.reset(
2166  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2167  if (coordinate_transformation == nullptr) {
2168  throw std::runtime_error(
2169  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2170  }
2171  }
2172  pGeometry->transform(coordinate_transformation.get());
2173  }
2174  }
2175 
2176  //
2177  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2178  //
2179 
2180  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2181  size_t col_idx = 0;
2182  try {
2183  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2184  auto cd = *cd_it;
2185 
2186  // is this a geo column?
2187  const auto& col_ti = cd->columnType;
2188  if (col_ti.is_geometry()) {
2189  // Note that this assumes there is one and only one geo column in the table.
2190  // Currently, the importer only supports reading a single geospatial feature
2191  // from an input shapefile / geojson file, but this code will need to be
2192  // modified if that changes
2193  SQLTypes col_type = col_ti.get_type();
2194 
2195  // store null string in the base column
2196  import_buffers[col_idx]->add_value(
2197  cd, copy_params.null_str, true, copy_params);
2198  ++col_idx;
2199 
2200  // the data we now need to extract for the other columns
2201  std::vector<double> coords;
2202  std::vector<double> bounds;
2203  std::vector<int> ring_sizes;
2204  std::vector<int> poly_rings;
2205  int render_group = 0;
2206 
2207  // extract it
2208  SQLTypeInfo import_ti{col_ti};
2209  bool is_null_geo = !import_geometry;
2210  if (is_null_geo) {
2211  if (col_ti.get_notnull()) {
2212  throw std::runtime_error("NULL geo for column " + cd->columnName);
2213  }
2215  import_ti,
2216  coords,
2217  bounds,
2218  ring_sizes,
2219  poly_rings,
2220  PROMOTE_POLYGON_TO_MULTIPOLYGON);
2221  } else {
2223  import_geometry,
2224  import_ti,
2225  coords,
2226  bounds,
2227  ring_sizes,
2228  poly_rings,
2229  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2230  std::string msg = "Failed to extract valid geometry from feature " +
2231  std::to_string(firstFeature + iFeature + 1) +
2232  " for column " + cd->columnName;
2233  throw std::runtime_error(msg);
2234  }
2235 
2236  // validate types
2237  if (col_type != import_ti.get_type()) {
2238  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2239  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2240  col_type == SQLTypes::kMULTIPOLYGON)) {
2241  throw std::runtime_error(
2242  "Imported geometry doesn't match the type of column " +
2243  cd->columnName);
2244  }
2245  }
2246  }
2247 
2248  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2249  if (ring_sizes.size()) {
2250  // get a suitable render group for these poly coords
2251  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2252  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2253  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2254  } else {
2255  // empty poly
2256  render_group = -1;
2257  }
2258  }
2259 
2260  // create coords array value and add it to the physical column
2261  ++cd_it;
2262  auto cd_coords = *cd_it;
2263  std::vector<TDatum> td_coord_data;
2264  if (!is_null_geo) {
2265  std::vector<uint8_t> compressed_coords =
2266  Geospatial::compress_coords(coords, col_ti);
2267  for (auto cc : compressed_coords) {
2268  TDatum td_byte;
2269  td_byte.val.int_val = cc;
2270  td_coord_data.push_back(td_byte);
2271  }
2272  }
2273  TDatum tdd_coords;
2274  tdd_coords.val.arr_val = td_coord_data;
2275  tdd_coords.is_null = is_null_geo;
2276  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2277  ++col_idx;
2278 
2279  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2280  // Create ring_sizes array value and add it to the physical column
2281  ++cd_it;
2282  auto cd_ring_sizes = *cd_it;
2283  std::vector<TDatum> td_ring_sizes;
2284  if (!is_null_geo) {
2285  for (auto ring_size : ring_sizes) {
2286  TDatum td_ring_size;
2287  td_ring_size.val.int_val = ring_size;
2288  td_ring_sizes.push_back(td_ring_size);
2289  }
2290  }
2291  TDatum tdd_ring_sizes;
2292  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2293  tdd_ring_sizes.is_null = is_null_geo;
2294  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2295  ++col_idx;
2296  }
2297 
2298  if (col_type == kMULTIPOLYGON) {
2299  // Create poly_rings array value and add it to the physical column
2300  ++cd_it;
2301  auto cd_poly_rings = *cd_it;
2302  std::vector<TDatum> td_poly_rings;
2303  if (!is_null_geo) {
2304  for (auto num_rings : poly_rings) {
2305  TDatum td_num_rings;
2306  td_num_rings.val.int_val = num_rings;
2307  td_poly_rings.push_back(td_num_rings);
2308  }
2309  }
2310  TDatum tdd_poly_rings;
2311  tdd_poly_rings.val.arr_val = td_poly_rings;
2312  tdd_poly_rings.is_null = is_null_geo;
2313  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2314  ++col_idx;
2315  }
2316 
2317  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2318  col_type == kMULTIPOLYGON) {
2319  // Create bounds array value and add it to the physical column
2320  ++cd_it;
2321  auto cd_bounds = *cd_it;
2322  std::vector<TDatum> td_bounds_data;
2323  if (!is_null_geo) {
2324  for (auto b : bounds) {
2325  TDatum td_double;
2326  td_double.val.real_val = b;
2327  td_bounds_data.push_back(td_double);
2328  }
2329  }
2330  TDatum tdd_bounds;
2331  tdd_bounds.val.arr_val = td_bounds_data;
2332  tdd_bounds.is_null = is_null_geo;
2333  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2334  ++col_idx;
2335  }
2336 
2337  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2338  // Create render_group value and add it to the physical column
2339  ++cd_it;
2340  auto cd_render_group = *cd_it;
2341  TDatum td_render_group;
2342  td_render_group.val.int_val = render_group;
2343  td_render_group.is_null = is_null_geo;
2344  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2345  ++col_idx;
2346  }
2347  } else {
2348  // regular column
2349  // pull from GDAL metadata
2350  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2351  CHECK(cit != columnNameToSourceNameMap.end());
2352  auto const& field_name = cit->second;
2353 
2354  auto const fit = fieldNameToIndexMap.find(field_name);
2355  CHECK(fit != fieldNameToIndexMap.end());
2356  auto const& field_index = fit->second;
2357  CHECK(field_index < fieldNameToIndexMap.size());
2358 
2359  auto const& feature = features[iFeature];
2360 
2361  auto field_defn = feature->GetFieldDefnRef(field_index);
2362  CHECK(field_defn);
2363 
2364  // OGRFeature::GetFieldAsString() can only return 80 characters
2365  // so for array columns, we are obliged to fetch the actual values
2366  // and construct the concatenated string ourselves
2367 
2368  std::string value_string;
2369  int array_index = 0, array_size = 0;
2370 
2371  auto stringify_numeric_list = [&](auto* values) {
2372  value_string = "{";
2373  while (array_index < array_size) {
2374  auto separator = (array_index > 0) ? "," : "";
2375  value_string += separator + std::to_string(values[array_index]);
2376  array_index++;
2377  }
2378  value_string += "}";
2379  };
2380 
2381  auto field_type = field_defn->GetType();
2382  switch (field_type) {
2383  case OFTInteger:
2384  case OFTInteger64:
2385  case OFTReal:
2386  case OFTString:
2387  case OFTBinary:
2388  case OFTDate:
2389  case OFTTime:
2390  case OFTDateTime: {
2391  value_string = feature->GetFieldAsString(field_index);
2392  } break;
2393  case OFTIntegerList: {
2394  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2395  stringify_numeric_list(values);
2396  } break;
2397  case OFTInteger64List: {
2398  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2399  stringify_numeric_list(values);
2400  } break;
2401  case OFTRealList: {
2402  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2403  stringify_numeric_list(values);
2404  } break;
2405  case OFTStringList: {
2406  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2407  value_string = "{";
2408  if (array_of_strings) {
2409  while (auto* this_string = array_of_strings[array_index]) {
2410  auto separator = (array_index > 0) ? "," : "";
2411  value_string += separator + std::string(this_string);
2412  array_index++;
2413  }
2414  }
2415  value_string += "}";
2416  } break;
2417  default:
2418  throw std::runtime_error("Unsupported geo file field type (" +
2419  std::to_string(static_cast<int>(field_type)) +
2420  ")");
2421  }
2422 
2423  static CopyParams default_copy_params;
2424  import_buffers[col_idx]->add_value(
2425  cd, value_string, false, default_copy_params);
2426  ++col_idx;
2427  }
2428  }
2429  import_status.rows_completed++;
2430  } catch (const std::exception& e) {
2431  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2432  import_buffers[col_idx_to_pop]->pop_value();
2433  }
2434  import_status.rows_rejected++;
2435  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2436  }
2437  };
2438 
2439  if (pGeometry && copy_params.geo_explode_collections) {
2440  // explode and import
2441  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2442  explode_collections_step1(col_descs);
2443  explode_collections_step2(pGeometry,
2444  collection_child_type,
2445  collection_col_name,
2446  firstFeature + iFeature + 1,
2447  execute_import_feature);
2448  } else {
2449  // import non-collection or null feature just once
2450  execute_import_feature(pGeometry);
2451  }
2452  } // end features
2453 
2454  float convert_ms =
2455  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2456  convert_timer)) /
2457  1000.0f;
2458 
2459  float load_ms = 0.0f;
2460  if (import_status.rows_completed > 0) {
2461  auto load_timer = timer_start();
2462  importer->load(import_buffers, import_status.rows_completed);
2463  load_ms =
2464  float(
2465  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2466  load_timer)) /
2467  1000.0f;
2468  }
2469 
2470  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2471  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2472  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2473  }
2474 
2475  import_status.thread_id = thread_id;
2476 
2477  if (DEBUG_TIMING) {
2478  LOG(INFO) << "DEBUG: Total "
2479  << float(timer_stop<std::chrono::steady_clock::time_point,
2480  std::chrono::microseconds>(convert_timer)) /
2481  1000.0f
2482  << "ms";
2483  }
2484 
2485  return import_status;
2486 }
2487 
2489  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2490  size_t row_count) {
2491  return loadImpl(import_buffers, row_count, false);
2492 }
2493 
2494 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2495  size_t row_count) {
2496  return loadImpl(import_buffers, row_count, true);
2497 }
2498 
2499 namespace {
2500 
2501 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2502  const auto& ti = import_buffer.getTypeInfo();
2503  const int8_t* values_buffer{nullptr};
2504  if (ti.is_string()) {
2505  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2506  values_buffer = import_buffer.getStringDictBuffer();
2507  } else {
2508  values_buffer = import_buffer.getAsBytes();
2509  }
2510  CHECK(values_buffer);
2511  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2512  switch (logical_size) {
2513  case 1: {
2514  return values_buffer[index];
2515  }
2516  case 2: {
2517  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2518  }
2519  case 4: {
2520  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2521  }
2522  case 8: {
2523  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2524  }
2525  default:
2526  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2527  }
2528  UNREACHABLE();
2529  return 0;
2530 }
2531 
2532 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2533  const auto& ti = import_buffer.getTypeInfo();
2534  CHECK_EQ(kFLOAT, ti.get_type());
2535  const auto values_buffer = import_buffer.getAsBytes();
2536  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2537 }
2538 
2539 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2540  const auto& ti = import_buffer.getTypeInfo();
2541  CHECK_EQ(kDOUBLE, ti.get_type());
2542  const auto values_buffer = import_buffer.getAsBytes();
2543  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2544 }
2545 
2546 } // namespace
2547 
2548 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2549  std::vector<size_t>& all_shard_row_counts,
2550  const OneShardBuffers& import_buffers,
2551  const size_t row_count,
2552  const size_t shard_count) {
2553  all_shard_row_counts.resize(shard_count);
2554  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2555  all_shard_import_buffers.emplace_back();
2556  for (const auto& typed_import_buffer : import_buffers) {
2557  all_shard_import_buffers.back().emplace_back(
2558  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2559  typed_import_buffer->getStringDictionary()));
2560  }
2561  }
2562  CHECK_GT(table_desc_->shardedColumnId, 0);
2563  int col_idx{0};
2564  const ColumnDescriptor* shard_col_desc{nullptr};
2565  for (const auto col_desc : column_descs_) {
2566  ++col_idx;
2567  if (col_idx == table_desc_->shardedColumnId) {
2568  shard_col_desc = col_desc;
2569  break;
2570  }
2571  }
2572  CHECK(shard_col_desc);
2573  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2574  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2575  const auto& shard_col_ti = shard_col_desc->columnType;
2576  CHECK(shard_col_ti.is_integer() ||
2577  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2578  shard_col_ti.is_time());
2579  if (shard_col_ti.is_string()) {
2580  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2581  CHECK(payloads_ptr);
2582  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2583  }
2584 
2585  // for each replicated (alter added) columns, number of rows in a shard is
2586  // inferred from that of the sharding column, not simply evenly distributed.
2587  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2588  // Here the loop count is overloaded. For normal imports, we loop thru all
2589  // input values (rows), so the loop count is the number of input rows.
2590  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2591  // all shards, so the loop count is the number of shards.
2592  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2593  for (size_t i = 0; i < loop_count; ++i) {
2594  const size_t shard =
2595  getReplicating()
2596  ? i
2597  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2598  auto& shard_output_buffers = all_shard_import_buffers[shard];
2599 
2600  // when replicate a column, populate 'rows' to all shards only once
2601  // and its value is fetch from the first and the single row
2602  const auto row_index = getReplicating() ? 0 : i;
2603 
2604  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2605  const auto& input_buffer = import_buffers[col_idx];
2606  const auto& col_ti = input_buffer->getTypeInfo();
2607  const auto type =
2608  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2609 
2610  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2611  // count. and, bypass non-replicated column.
2612  if (getReplicating()) {
2613  if (input_buffer->get_replicate_count() > 0) {
2614  shard_output_buffers[col_idx]->set_replicate_count(
2615  shard_tds[shard]->fragmenter->getNumRows());
2616  } else {
2617  continue;
2618  }
2619  }
2620 
2621  switch (type) {
2622  case kBOOLEAN:
2623  shard_output_buffers[col_idx]->addBoolean(
2624  int_value_at(*input_buffer, row_index));
2625  break;
2626  case kTINYINT:
2627  shard_output_buffers[col_idx]->addTinyint(
2628  int_value_at(*input_buffer, row_index));
2629  break;
2630  case kSMALLINT:
2631  shard_output_buffers[col_idx]->addSmallint(
2632  int_value_at(*input_buffer, row_index));
2633  break;
2634  case kINT:
2635  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2636  break;
2637  case kBIGINT:
2638  shard_output_buffers[col_idx]->addBigint(
2639  int_value_at(*input_buffer, row_index));
2640  break;
2641  case kFLOAT:
2642  shard_output_buffers[col_idx]->addFloat(
2643  float_value_at(*input_buffer, row_index));
2644  break;
2645  case kDOUBLE:
2646  shard_output_buffers[col_idx]->addDouble(
2647  double_value_at(*input_buffer, row_index));
2648  break;
2649  case kTEXT:
2650  case kVARCHAR:
2651  case kCHAR: {
2652  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2653  shard_output_buffers[col_idx]->addString(
2654  (*input_buffer->getStringBuffer())[row_index]);
2655  break;
2656  }
2657  case kTIME:
2658  case kTIMESTAMP:
2659  case kDATE:
2660  shard_output_buffers[col_idx]->addBigint(
2661  int_value_at(*input_buffer, row_index));
2662  break;
2663  case kARRAY:
2664  if (IS_STRING(col_ti.get_subtype())) {
2665  CHECK(input_buffer->getStringArrayBuffer());
2666  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2667  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2668  shard_output_buffers[col_idx]->addStringArray(input_arr);
2669  } else {
2670  shard_output_buffers[col_idx]->addArray(
2671  (*input_buffer->getArrayBuffer())[row_index]);
2672  }
2673  break;
2674  case kPOINT:
2675  case kLINESTRING:
2676  case kPOLYGON:
2677  case kMULTIPOLYGON: {
2678  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2679  shard_output_buffers[col_idx]->addGeoString(
2680  (*input_buffer->getGeoStringBuffer())[row_index]);
2681  break;
2682  }
2683  default:
2684  CHECK(false);
2685  }
2686  }
2687  ++all_shard_row_counts[shard];
2688  // when replicating a column, row count of a shard == replicate count of the column on
2689  // the shard
2690  if (getReplicating()) {
2691  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2692  }
2693  }
2694 }
2695 
2697  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2698  size_t row_count,
2699  bool checkpoint) {
2700  if (load_callback_) {
2701  auto data_blocks = get_data_block_pointers(import_buffers);
2702  return load_callback_(import_buffers, data_blocks, row_count);
2703  }
2704  if (table_desc_->nShards) {
2705  std::vector<OneShardBuffers> all_shard_import_buffers;
2706  std::vector<size_t> all_shard_row_counts;
2707  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2708  distributeToShards(all_shard_import_buffers,
2709  all_shard_row_counts,
2710  import_buffers,
2711  row_count,
2712  shard_tables.size());
2713  bool success = true;
2714  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2715  if (!all_shard_row_counts[shard_idx]) {
2716  continue;
2717  }
2718  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2719  all_shard_row_counts[shard_idx],
2720  shard_tables[shard_idx],
2721  checkpoint);
2722  }
2723  return success;
2724  }
2725  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2726 }
2727 
2728 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2729  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2730  std::vector<DataBlockPtr> result(import_buffers.size());
2731  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2732  encoded_data_block_ptrs_futures;
2733  // make all async calls to string dictionary here and then continue execution
2734  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2735  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2736  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2737  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2738  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2739 
2740  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2741  buf_idx,
2742  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2743  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2744  return import_buffers[buf_idx]->getStringDictBuffer();
2745  })));
2746  }
2747  }
2748 
2749  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2750  DataBlockPtr p;
2751  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2752  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2753  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2754  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2755  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2756  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2757  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2758  p.stringsPtr = string_payload_ptr;
2759  } else {
2760  // This condition means we have column which is ENCODED string. We already made
2761  // Async request to gain the encoded integer values above so we should skip this
2762  // iteration and continue.
2763  continue;
2764  }
2765  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2766  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2767  p.stringsPtr = geo_payload_ptr;
2768  } else {
2769  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2770  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2771  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2772  import_buffers[buf_idx]->addDictEncodedStringArray(
2773  *import_buffers[buf_idx]->getStringArrayBuffer());
2774  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2775  } else {
2776  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2777  }
2778  }
2779  result[buf_idx] = p;
2780  }
2781 
2782  // wait for the async requests we made for string dictionary
2783  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2784  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2785  }
2786  return result;
2787 }
2788 
2790  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2791  size_t row_count,
2792  const TableDescriptor* shard_table,
2793  bool checkpoint) {
2794  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2795  // patch insert_data with new column
2796  if (this->getReplicating()) {
2797  for (const auto& import_buff : import_buffers) {
2798  insert_data_.replicate_count = import_buff->get_replicate_count();
2799  }
2800  }
2801 
2802  Fragmenter_Namespace::InsertData ins_data(insert_data_);
2803  ins_data.numRows = row_count;
2804  bool success = true;
2805 
2806  ins_data.data = get_data_block_pointers(import_buffers);
2807 
2808  for (const auto& import_buffer : import_buffers) {
2809  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2810  }
2811 
2812  // release loader_lock so that in InsertOrderFragmenter::insertDat
2813  // we can have multiple threads sort/shuffle InsertData
2814  loader_lock.unlock();
2815 
2816  {
2817  try {
2818  if (checkpoint) {
2819  shard_table->fragmenter->insertData(ins_data);
2820  } else {
2821  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2822  }
2823  } catch (std::exception& e) {
2824  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2825  success = false;
2826  }
2827  }
2828  return success;
2829 }
2830 
2831 void Loader::dropColumns(const std::vector<int>& columnIds) {
2832  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2833  if (table_desc_->nShards) {
2834  table_descs = catalog_.getPhysicalTablesDescriptors(table_desc_);
2835  }
2836  for (auto table_desc : table_descs) {
2837  table_desc->fragmenter->dropColumns(columnIds);
2838  }
2839 }
2840 
2841 void Loader::init(const bool use_catalog_locks) {
2842  insert_data_.databaseId = catalog_.getCurrentDB().dbId;
2843  insert_data_.tableId = table_desc_->tableId;
2844  for (auto cd : column_descs_) {
2845  insert_data_.columnIds.push_back(cd->columnId);
2846  if (cd->columnType.get_compression() == kENCODING_DICT) {
2847  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2848  const auto dd = use_catalog_locks
2849  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2850  : catalog_.getMetadataForDictUnlocked(
2851  cd->columnType.get_comp_param(), true);
2852  CHECK(dd);
2853  dict_map_[cd->columnId] = dd->stringDict.get();
2854  }
2855  }
2856  insert_data_.numRows = 0;
2857 }
2858 
2860  detect_row_delimiter();
2861  split_raw_data();
2862  find_best_sqltypes_and_headers();
2863 }
2864 
2866  const bool decompressed) {
2867  if (!p_file) {
2868  p_file = fopen(file_path.c_str(), "rb");
2869  }
2870  if (!p_file) {
2871  throw std::runtime_error("failed to open file '" + file_path +
2872  "': " + strerror(errno));
2873  }
2874 
2875  // somehow clang does not support ext/stdio_filebuf.h, so
2876  // need to diy readline with customized copy_params.line_delim...
2877  std::string line;
2878  line.reserve(1 * 1024 * 1024);
2879  auto end_time = std::chrono::steady_clock::now() +
2880  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2881  try {
2882  while (!feof(p_file)) {
2883  int c;
2884  size_t n = 0;
2885  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2886  if (n++ >= line.capacity()) {
2887  break;
2888  }
2889  line += c;
2890  }
2891  if (0 == n) {
2892  break;
2893  }
2894  // remember the first line, which is possibly a header line, to
2895  // ignore identical header line(s) in 2nd+ files of a archive;
2896  // otherwise, 2nd+ header may be mistaken as an all-string row
2897  // and so be final column types.
2898  if (line1.empty()) {
2899  line1 = line;
2900  } else if (line == line1) {
2901  line.clear();
2902  continue;
2903  }
2904 
2905  raw_data += line;
2906  raw_data += copy_params.line_delim;
2907  line.clear();
2909  if (std::chrono::steady_clock::now() > end_time) {
2910  if (import_status.rows_completed > 10000) {
2911  break;
2912  }
2913  }
2914  }
2915  } catch (std::exception& e) {
2916  }
2917 
2918  // as if load truncated
2920  load_failed = true;
2921 
2922  fclose(p_file);
2923  p_file = nullptr;
2924  return import_status;
2925 }
2926 
2928  // this becomes analogous to Importer::import()
2930 }
2931 
2933  if (copy_params.delimiter == '\0') {
2934  copy_params.delimiter = ',';
2935  if (boost::filesystem::extension(file_path) == ".tsv") {
2936  copy_params.delimiter = '\t';
2937  }
2938  }
2939 }
2940 
2942  const char* buf = raw_data.c_str();
2943  const char* buf_end = buf + raw_data.size();
2944  bool try_single_thread = false;
2945  for (const char* p = buf; p < buf_end; p++) {
2946  std::vector<std::string> row;
2947  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2949  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2950  raw_rows.push_back(row);
2951  if (try_single_thread) {
2952  break;
2953  }
2954  }
2955  if (try_single_thread) {
2956  copy_params.threads = 1;
2957  raw_rows.clear();
2958  for (const char* p = buf; p < buf_end; p++) {
2959  std::vector<std::string> row;
2960  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2962  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2963  raw_rows.push_back(row);
2964  }
2965  }
2966 }
2967 
2968 template <class T>
2969 bool try_cast(const std::string& str) {
2970  try {
2971  boost::lexical_cast<T>(str);
2972  } catch (const boost::bad_lexical_cast& e) {
2973  return false;
2974  }
2975  return true;
2976 }
2977 
2978 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2979  std::tm tm_struct;
2980  char* buf;
2981  for (auto format : formats) {
2982  buf = strptime(str, format.c_str(), &tm_struct);
2983  if (buf) {
2984  return buf;
2985  }
2986  }
2987  return nullptr;
2988 }
2989 
2990 SQLTypes Detector::detect_sqltype(const std::string& str) {
2991  SQLTypes type = kTEXT;
2992  if (try_cast<double>(str)) {
2993  type = kDOUBLE;
2994  /*if (try_cast<bool>(str)) {
2995  type = kBOOLEAN;
2996  }*/
2997  if (try_cast<int16_t>(str)) {
2998  type = kSMALLINT;
2999  } else if (try_cast<int32_t>(str)) {
3000  type = kINT;
3001  } else if (try_cast<int64_t>(str)) {
3002  type = kBIGINT;
3003  } else if (try_cast<float>(str)) {
3004  type = kFLOAT;
3005  }
3006  }
3007 
3008  // check for geo types
3009  if (type == kTEXT) {
3010  // convert to upper case
3011  std::string str_upper_case = str;
3012  std::transform(
3013  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3014 
3015  // then test for leading words
3016  if (str_upper_case.find("POINT") == 0) {
3017  type = kPOINT;
3018  } else if (str_upper_case.find("LINESTRING") == 0) {
3019  type = kLINESTRING;
3020  } else if (str_upper_case.find("POLYGON") == 0) {
3021  if (PROMOTE_POLYGON_TO_MULTIPOLYGON) {
3022  type = kMULTIPOLYGON;
3023  } else {
3024  type = kPOLYGON;
3025  }
3026  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3027  type = kMULTIPOLYGON;
3028  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3029  std::string::npos &&
3030  (str_upper_case.size() % 2) == 0) {
3031  // simple hex blob (two characters per byte, not uu-encode or base64)
3032  if (str_upper_case.size() >= 10) {
3033  // match WKB blobs for supported geometry types
3034  // the first byte specifies if the data is big-endian or little-endian
3035  // the next four bytes are the geometry type (1 = POINT etc.)
3036  // @TODO support eWKB, which has extra bits set in the geometry type
3037  auto first_five_bytes = str_upper_case.substr(0, 10);
3038  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3039  type = kPOINT;
3040  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3041  type = kLINESTRING;
3042  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3043  type = kPOLYGON;
3044  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3045  type = kMULTIPOLYGON;
3046  } else {
3047  // unsupported WKB type
3048  return type;
3049  }
3050  } else {
3051  // too short to be WKB
3052  return type;
3053  }
3054  }
3055  }
3056 
3057  // check for time types
3058  if (type == kTEXT) {
3059  // @TODO
3060  // make these tests more robust so they don't match stuff they should not
3061  char* buf;
3062  buf = try_strptimes(str.c_str(),
3063  {"%Y-%m-%d", "%m/%d/%Y", "%Y/%m/%d", "%d-%b-%y", "%d/%b/%Y"});
3064  if (buf) {
3065  type = kDATE;
3066  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
3067  buf++;
3068  }
3069  }
3070  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
3071  {"%T %z", "%T", "%H%M%S", "%R"});
3072  if (buf) {
3073  if (type == kDATE) {
3074  type = kTIMESTAMP;
3075  } else {
3076  type = kTIME;
3077  }
3078  }
3079  }
3080 
3081  return type;
3082 }
3083 
3084 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3085  std::vector<SQLTypes> types(row.size());
3086  for (size_t i = 0; i < row.size(); i++) {
3087  types[i] = detect_sqltype(row[i]);
3088  }
3089  return types;
3090 }
3091 
3093  static std::array<int, kSQLTYPE_LAST> typeorder;
3094  typeorder[kCHAR] = 0;
3095  typeorder[kBOOLEAN] = 2;
3096  typeorder[kSMALLINT] = 3;
3097  typeorder[kINT] = 4;
3098  typeorder[kBIGINT] = 5;
3099  typeorder[kFLOAT] = 6;
3100  typeorder[kDOUBLE] = 7;
3101  typeorder[kTIMESTAMP] = 8;
3102  typeorder[kTIME] = 9;
3103  typeorder[kDATE] = 10;
3104  typeorder[kPOINT] = 11;
3105  typeorder[kLINESTRING] = 11;
3106  typeorder[kPOLYGON] = 11;
3107  typeorder[kMULTIPOLYGON] = 11;
3108  typeorder[kTEXT] = 12;
3109 
3110  // note: b < a instead of a < b because the map is ordered most to least restrictive
3111  return typeorder[b] < typeorder[a];
3112 }
3113 
3115  best_sqltypes = find_best_sqltypes(raw_rows.begin() + 1, raw_rows.end(), copy_params);
3116  best_encodings =
3117  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3118  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3119  switch (copy_params.has_header) {
3121  has_headers = detect_headers(head_types, best_sqltypes);
3122  if (has_headers) {
3124  } else {
3126  }
3127  break;
3129  has_headers = false;
3130  break;
3132  has_headers = true;
3133  break;
3134  }
3135 }
3136 
3138  best_sqltypes = find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3139 }
3140 
3141 std::vector<SQLTypes> Detector::find_best_sqltypes(
3142  const std::vector<std::vector<std::string>>& raw_rows,
3143  const CopyParams& copy_params) {
3144  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3145 }
3146 
3147 std::vector<SQLTypes> Detector::find_best_sqltypes(
3148  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3149  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3150  const CopyParams& copy_params) {
3151  if (raw_rows.size() < 1) {
3152  throw std::runtime_error("No rows found in: " +
3153  boost::filesystem::basename(file_path));
3154  }
3155  auto end_time = std::chrono::steady_clock::now() + timeout;
3156  size_t num_cols = raw_rows.front().size();
3157  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3158  std::vector<size_t> non_null_col_counts(num_cols, 0);
3159  for (auto row = row_begin; row != row_end; row++) {
3160  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3161  best_types.push_back(kCHAR);
3162  non_null_col_counts.push_back(0);
3163  }
3164  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3165  // do not count nulls
3166  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3167  continue;
3168  }
3169  SQLTypes t = detect_sqltype(row->at(col_idx));
3170  non_null_col_counts[col_idx]++;
3171  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3172  best_types[col_idx] = t;
3173  }
3174  }
3175  if (std::chrono::steady_clock::now() > end_time) {
3176  break;
3177  }
3178  }
3179  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3180  // if we don't have any non-null values for this column make it text to be
3181  // safe b/c that is least restrictive type
3182  if (non_null_col_counts[col_idx] == 0) {
3183  best_types[col_idx] = kTEXT;
3184  }
3185  }
3186 
3187  return best_types;
3188 }
3189 
3190 std::vector<EncodingType> Detector::find_best_encodings(
3191  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3192  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3193  const std::vector<SQLTypes>& best_types) {
3194  if (raw_rows.size() < 1) {
3195  throw std::runtime_error("No rows found in: " +
3196  boost::filesystem::basename(file_path));
3197  }
3198  size_t num_cols = best_types.size();
3199  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3200  std::vector<size_t> num_rows_per_col(num_cols, 1);
3201  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3202  for (auto row = row_begin; row != row_end; row++) {
3203  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3204  if (IS_STRING(best_types[col_idx])) {
3205  count_set[col_idx].insert(row->at(col_idx));
3206  num_rows_per_col[col_idx]++;
3207  }
3208  }
3209  }
3210  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3211  if (IS_STRING(best_types[col_idx])) {
3212  float uniqueRatio =
3213  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3214  if (uniqueRatio < 0.75) {
3215  best_encodes[col_idx] = kENCODING_DICT;
3216  }
3217  }
3218  }
3219  return best_encodes;
3220 }
3221 
3222 // detect_headers returns true if:
3223 // - all elements of the first argument are kTEXT
3224 // - there is at least one instance where tail_types is more restrictive than head_types
3225 // (ie, not kTEXT)
3226 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3227  const std::vector<SQLTypes>& tail_types) {
3228  if (head_types.size() != tail_types.size()) {
3229  return false;
3230  }
3231  bool has_headers = false;
3232  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3233  if (head_types[col_idx] != kTEXT) {
3234  return false;
3235  }
3236  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3237  }
3238  return has_headers;
3239 }
3240 
3241 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3242  n = std::min(n, raw_rows.size());
3243  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3244  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3245  raw_rows.begin() + n);
3246  return sample_rows;
3247 }
3248 
3249 std::vector<std::string> Detector::get_headers() {
3250  std::vector<std::string> headers(best_sqltypes.size());
3251  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3252  if (has_headers && i < raw_rows[0].size()) {
3253  headers[i] = raw_rows[0][i];
3254  } else {
3255  headers[i] = "column_" + std::to_string(i + 1);
3256  }
3257  }
3258  return headers;
3259 }
3260 
3261 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3262  size_t row_count) {
3263  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3264  load_failed = true;
3265  }
3266 }
3267 
3269  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3270  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3271  if (load_failed) {
3272  // rollback to starting epoch - undo all the added records
3273  loader->setTableEpochs(table_epochs);
3274  } else {
3275  loader->checkpoint();
3276  }
3277  }
3278 
3279  if (loader->getTableDesc()->persistenceLevel ==
3280  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3281  auto ms = measure<>::execution([&]() {
3282  if (!load_failed) {
3283  for (auto& p : import_buffers_vec[0]) {
3284  if (!p->stringDictCheckpoint()) {
3285  LOG(ERROR) << "Checkpointing Dictionary for Column "
3286  << p->getColumnDesc()->columnName << " failed.";
3287  load_failed = true;
3288  break;
3289  }
3290  }
3291  }
3292  });
3293  if (DEBUG_TIMING) {
3294  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3295  << std::endl;
3296  }
3297  }
3298 }
3299 
3301  // in generalized importing scheme, reaching here file_path may
3302  // contain a file path, a url or a wildcard of file paths.
3303  // see CopyTableStmt::execute.
3304  auto file_paths = omnisci::glob(file_path);
3305  if (file_paths.size() == 0) {
3306  file_paths.push_back(file_path);
3307  }
3308 
3309  // sum up sizes of all local files -- only for local files. if
3310  // file_path is a s3 url, sizes will be obtained via S3Archive.
3311  for (const auto& file_path : file_paths) {
3313  }
3314 
3315 #ifdef ENABLE_IMPORT_PARQUET
3316  // s3 parquet goes different route because the files do not use libarchive
3317  // but parquet api, and they need to landed like .7z files.
3318  //
3319  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3320  // because for example spark sql users may specify a output url w/o file
3321  // extension like this:
3322  // df.write
3323  // .mode("overwrite")
3324  // .parquet("s3://bucket/folder/parquet/mydata")
3325  // without the parameter, it means plain or compressed csv files.
3326  // note: .ORC and AVRO files should follow a similar path to Parquet?
3327  if (copy_params.file_type == FileType::PARQUET) {
3328  import_parquet(file_paths);
3329  } else
3330 #endif
3331  {
3332  import_compressed(file_paths);
3333  }
3334  return import_status;
3335 }
3336 
3337 #ifdef ENABLE_IMPORT_PARQUET
3338 inline auto open_parquet_table(const std::string& file_path,
3339  std::shared_ptr<arrow::io::ReadableFile>& infile,
3340  std::unique_ptr<parquet::arrow::FileReader>& reader,
3341  std::shared_ptr<arrow::Table>& table) {
3342  using namespace parquet::arrow;
3343  auto file_result = arrow::io::ReadableFile::Open(file_path);
3344  PARQUET_THROW_NOT_OK(file_result.status());
3345  infile = file_result.ValueOrDie();
3346 
3347  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3348  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3349  const auto num_row_groups = reader->num_row_groups();
3350  const auto num_columns = table->num_columns();
3351  const auto num_rows = table->num_rows();
3352  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3353  << " columns in " << num_row_groups << " groups.";
3354  return std::make_tuple(num_row_groups, num_columns, num_rows);
3355 }
3356 
3357 void Detector::import_local_parquet(const std::string& file_path) {
3358  std::shared_ptr<arrow::io::ReadableFile> infile;
3359  std::unique_ptr<parquet::arrow::FileReader> reader;
3360  std::shared_ptr<arrow::Table> table;
3361  int num_row_groups, num_columns;
3362  int64_t num_rows;
3363  std::tie(num_row_groups, num_columns, num_rows) =
3364  open_parquet_table(file_path, infile, reader, table);
3365  // make up header line if not yet
3366  if (0 == raw_data.size()) {
3368  copy_params.line_delim = '\n';
3369  copy_params.delimiter = ',';
3370  // must quote values to skip any embedded delimiter
3371  copy_params.quoted = true;
3372  copy_params.quote = '"';
3373  copy_params.escape = '"';
3374  for (int c = 0; c < num_columns; ++c) {
3375  if (c) {
3376  raw_data += copy_params.delimiter;
3377  }
3378  raw_data += table->ColumnNames().at(c);
3379  }
3380  raw_data += copy_params.line_delim;
3381  }
3382  // make up raw data... rowwize...
3383  const ColumnDescriptor cd;
3384  for (int g = 0; g < num_row_groups; ++g) {
3385  // data is columnwise
3386  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3387  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3388  arrays.resize(num_columns);
3389  for (int c = 0; c < num_columns; ++c) {
3390  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3391  for (auto chunk : arrays[c]->chunks()) {
3392  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3393  }
3394  }
3395  for (int r = 0; r < num_rows; ++r) {
3396  for (int c = 0; c < num_columns; ++c) {
3397  std::vector<std::string> buffer;
3398  for (auto chunk : arrays[c]->chunks()) {
3399  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3400  if (c) {
3401  raw_data += copy_params.delimiter;
3402  }
3403  if (!chunk->IsNull(r)) {
3404  raw_data += copy_params.quote;
3405  raw_data += boost::replace_all_copy(
3406  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3407  raw_data += copy_params.quote;
3408  }
3409  }
3410  }
3411  raw_data += copy_params.line_delim;
3412  if (++import_status.rows_completed >= 10000) {
3413  // as if load truncated
3415  load_failed = true;
3416  return;
3417  }
3418  }
3419  }
3420 }
3421 
3422 template <typename DATA_TYPE>
3424  std::vector<DATA_TYPE>& buffer,
3425  import_export::BadRowsTracker* const bad_rows_tracker) {
3426  const auto old_size = buffer.size();
3427  // erase backward to minimize memory movement overhead
3428  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3429  ++rit) {
3430  buffer.erase(buffer.begin() + *rit);
3431  }
3432  return std::make_tuple(old_size, buffer.size());
3433 }
3434 
3436  BadRowsTracker* const bad_rows_tracker) {
3437  switch (type) {
3438  case kBOOLEAN:
3439  return del_values(*bool_buffer_, bad_rows_tracker);
3440  case kTINYINT:
3441  return del_values(*tinyint_buffer_, bad_rows_tracker);
3442  case kSMALLINT:
3443  return del_values(*smallint_buffer_, bad_rows_tracker);
3444  case kINT:
3445  return del_values(*int_buffer_, bad_rows_tracker);
3446  case kBIGINT:
3447  case kNUMERIC:
3448  case kDECIMAL:
3449  case kTIME:
3450  case kTIMESTAMP:
3451  case kDATE:
3452  return del_values(*bigint_buffer_, bad_rows_tracker);
3453  case kFLOAT:
3454  return del_values(*float_buffer_, bad_rows_tracker);
3455  case kDOUBLE:
3456  return del_values(*double_buffer_, bad_rows_tracker);
3457  case kTEXT:
3458  case kVARCHAR:
3459  case kCHAR:
3460  return del_values(*string_buffer_, bad_rows_tracker);
3461  case kPOINT:
3462  case kLINESTRING:
3463  case kPOLYGON:
3464  case kMULTIPOLYGON:
3465  return del_values(*geo_string_buffer_, bad_rows_tracker);
3466  case kARRAY:
3467  return del_values(*array_buffer_, bad_rows_tracker);
3468  default:
3469  throw std::runtime_error("Invalid Type");
3470  }
3471 }
3472 
3473 void Importer::import_local_parquet(const std::string& file_path) {
3474  std::shared_ptr<arrow::io::ReadableFile> infile;
3475  std::unique_ptr<parquet::arrow::FileReader> reader;
3476  std::shared_ptr<arrow::Table> table;
3477  int num_row_groups, num_columns;
3478  int64_t nrow_in_file;
3479  std::tie(num_row_groups, num_columns, nrow_in_file) =
3480  open_parquet_table(file_path, infile, reader, table);
3481  // column_list has no $deleted
3482  const auto& column_list = get_column_descs();
3483  // for now geo columns expect a wkt or wkb hex string
3484  std::vector<const ColumnDescriptor*> cds;
3485  int num_physical_cols = 0;
3486  for (auto& cd : column_list) {
3487  cds.push_back(cd);
3488  num_physical_cols += cd->columnType.get_physical_cols();
3489  }
3490  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3491  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3492  std::to_string(num_columns) + " columns in file vs " +
3493  std::to_string(column_list.size() - num_physical_cols) +
3494  " columns in table.");
3495  // slice each group to import slower columns faster, eg. geo or string
3497  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3498  // init row estimate for this file
3499  const auto filesize = get_filesize(file_path);
3500  size_t nrow_completed{0};
3501  file_offsets.push_back(0);
3502  // map logic column index to physical column index
3503  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3504  int physical_col_idx = 0;
3505  for (int i = 0; i < logic_col_idx; ++i) {
3506  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3507  }
3508  return physical_col_idx;
3509  };
3510  // load a file = nested iteration of row groups, row slices and logical columns
3511  auto ms_load_a_file = measure<>::execution([&]() {
3512  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3513  // a sliced row group will be handled like a (logic) parquet file, with
3514  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3515  import_buffers_vec.resize(num_slices);
3516  for (int slice = 0; slice < num_slices; slice++) {
3517  import_buffers_vec[slice].clear();
3518  for (const auto cd : cds) {
3519  import_buffers_vec[slice].emplace_back(
3520  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3521  }
3522  }
3523  /*
3524  * A caveat here is: Parquet files or arrow data is imported column wise.
3525  * Unlike importing row-wise csv files, a error on any row of any column
3526  * forces to give up entire row group of all columns, unless there is a
3527  * sophisticated method to trace erroneous rows in individual columns so
3528  * that we can union bad rows and drop them from corresponding
3529  * import_buffers_vec; otherwise, we may exceed maximum number of
3530  * truncated rows easily even with very sparse errors in the files.
3531  */
3532  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3533  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3534  auto& bad_rows_tracker = bad_rows_trackers[slice];
3535  bad_rows_tracker.file_name = file_path;
3536  bad_rows_tracker.row_group = slice;
3537  bad_rows_tracker.importer = this;
3538  }
3539  // process arrow arrays to import buffers
3540  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3541  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3542  const auto cd = cds[physical_col_idx];
3543  std::shared_ptr<arrow::ChunkedArray> array;
3544  PARQUET_THROW_NOT_OK(
3545  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3546  const size_t array_size = array->length();
3547  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3548  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3549  for (int slice = 0; slice < num_slices; ++slice) {
3550  thread_controller.startThread([&, slice] {
3551  const auto slice_offset = slice % num_slices;
3552  ArraySliceRange slice_range(
3553  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3554  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3555  auto& bad_rows_tracker = bad_rows_trackers[slice];
3556  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3557  import_buffer->import_buffers = &import_buffers_vec[slice];
3558  import_buffer->col_idx = physical_col_idx + 1;
3559  for (auto chunk : array->chunks()) {
3560  import_buffer->add_arrow_values(
3561  cd, *chunk, false, slice_range, &bad_rows_tracker);
3562  }
3563  });
3564  }
3565  thread_controller.finish();
3566  }
3567  std::vector<size_t> nrow_in_slice_raw(num_slices);
3568  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3569  // trim bad rows from import buffers
3570  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3571  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3572  const auto cd = cds[physical_col_idx];
3573  for (int slice = 0; slice < num_slices; ++slice) {
3574  auto& bad_rows_tracker = bad_rows_trackers[slice];
3575  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3576  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3577  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3578  }
3579  }
3580  // flush slices of this row group to chunks
3581  for (int slice = 0; slice < num_slices; ++slice) {
3582  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3583  }
3584  // update import stats
3585  const auto nrow_original =
3586  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3587  const auto nrow_imported =
3588  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3589  nrow_in_slice_successfully_loaded.end(),
3590  0);
3591  const auto nrow_dropped = nrow_original - nrow_imported;
3592  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3593  << " rows, drop " << nrow_dropped << " rows.";
3594  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3595  import_status.rows_completed += nrow_imported;
3596  import_status.rows_rejected += nrow_dropped;
3599  load_failed = true;
3600  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3601  << ") rows rejected exceeded. Halting load.";
3602  }
3603  // row estimate
3604  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3605  nrow_completed += nrow_imported;
3606  file_offsets.back() =
3607  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3608  // sum up current total file offsets
3609  const auto total_file_offset =
3610  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3611  // estimate number of rows per current total file offset
3612  if (total_file_offset) {
3614  (float)total_file_size / total_file_offset * import_status.rows_completed;
3615  VLOG(3) << "rows_completed " << import_status.rows_completed
3616  << ", rows_estimated " << import_status.rows_estimated
3617  << ", total_file_size " << total_file_size << ", total_file_offset "
3618  << total_file_offset;
3619  }
3620  }
3621  });
3622  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3623  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3624 }
3625 
3626 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3627  auto importer = dynamic_cast<Importer*>(this);
3628  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
3629  : std::vector<Catalog_Namespace::TableEpochInfo>{};
3630  try {
3631  std::exception_ptr teptr;
3632  // file_paths may contain one local file path, a list of local file paths
3633  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3634  for (auto const& file_path : file_paths) {
3635  std::map<int, std::string> url_parts;
3636  Archive::parse_url(file_path, url_parts);
3637 
3638  // for a s3 url we need to know the obj keys that it comprises
3639  std::vector<std::string> objkeys;
3640  std::unique_ptr<S3ParquetArchive> us3arch;
3641  if ("s3" == url_parts[2]) {
3642 #ifdef HAVE_AWS_S3
3643  us3arch.reset(new S3ParquetArchive(file_path,
3649  us3arch->init_for_read();
3650  total_file_size += us3arch->get_total_file_size();
3651  objkeys = us3arch->get_objkeys();
3652 #else
3653  throw std::runtime_error("AWS S3 support not available");
3654 #endif // HAVE_AWS_S3
3655  } else {
3656  objkeys.emplace_back(file_path);
3657  }
3658 
3659  // for each obj key of a s3 url we need to land it before
3660  // importing it like doing with a 'local file'.
3661  for (auto const& objkey : objkeys) {
3662  try {
3663  auto file_path =
3664  us3arch
3665  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3666  : objkey;
3667  import_local_parquet(file_path);
3668  if (us3arch) {
3669  us3arch->vacuum(objkey);
3670  }
3671  } catch (...) {
3672  if (us3arch) {
3673  us3arch->vacuum(objkey);
3674  }
3675  throw;
3676  }
3678  break;
3679  }
3680  }
3681  }
3682  // rethrow any exception happened herebefore
3683  if (teptr) {
3684  std::rethrow_exception(teptr);
3685  }
3686  } catch (...) {
3687  load_failed = true;
3689  throw;
3690  }
3691  }
3692 
3693  if (importer) {
3694  importer->checkpoint(table_epochs);
3695  }
3696 }
3697 #endif // ENABLE_IMPORT_PARQUET
3698 
3699 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3700  // a new requirement is to have one single input stream into
3701  // Importer::importDelimited, so need to move pipe related
3702  // stuff to the outmost block.
3703  int fd[2];
3704  if (pipe(fd) < 0) {
3705  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3706  }
3707  signal(SIGPIPE, SIG_IGN);
3708 
3709  std::exception_ptr teptr;
3710  // create a thread to read uncompressed byte stream out of pipe and
3711  // feed into importDelimited()
3712  ImportStatus ret;
3713  auto th_pipe_reader = std::thread([&]() {
3714  try {
3715  // importDelimited will read from FILE* p_file
3716  if (0 == (p_file = fdopen(fd[0], "r"))) {
3717  throw std::runtime_error(std::string("failed to open a pipe: ") +
3718  strerror(errno));
3719  }
3720 
3721  // in future, depending on data types of this uncompressed stream
3722  // it can be feed into other function such like importParquet, etc
3723  ret = importDelimited(file_path, true);
3724  } catch (...) {
3725  if (!teptr) { // no replace
3726  teptr = std::current_exception();
3727  }
3728  }
3729 
3730  if (p_file) {
3731  fclose(p_file);
3732  }
3733  p_file = 0;
3734  });
3735 
3736  // create a thread to iterate all files (in all archives) and
3737  // forward the uncompressed byte stream to fd[1] which is
3738  // then feed into importDelimited, importParquet, and etc.
3739  auto th_pipe_writer = std::thread([&]() {
3740  std::unique_ptr<S3Archive> us3arch;
3741  bool stop = false;
3742  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3743  try {
3744  auto file_path = file_paths[fi];
3745  std::unique_ptr<Archive> uarch;
3746  std::map<int, std::string> url_parts;
3747  Archive::parse_url(file_path, url_parts);
3748  const std::string S3_objkey_url_scheme = "s3ok";
3749  if ("file" == url_parts[2] || "" == url_parts[2]) {
3750  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3751  } else if ("s3" == url_parts[2]) {
3752 #ifdef HAVE_AWS_S3
3753  // new a S3Archive with a shared s3client.
3754  // should be safe b/c no wildcard with s3 url
3755  us3arch.reset(new S3Archive(file_path,
3761  us3arch->init_for_read();
3762  total_file_size += us3arch->get_total_file_size();
3763  // not land all files here but one by one in following iterations
3764  for (const auto& objkey : us3arch->get_objkeys()) {
3765  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3766  }
3767  continue;
3768 #else
3769  throw std::runtime_error("AWS S3 support not available");
3770 #endif // HAVE_AWS_S3
3771  } else if (S3_objkey_url_scheme == url_parts[2]) {
3772 #ifdef HAVE_AWS_S3
3773  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3774  auto file_path =
3775  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3776  if (0 == file_path.size()) {
3777  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3778  }
3779  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3780  // file not removed until file closed
3781  us3arch->vacuum(objkey);
3782 #else
3783  throw std::runtime_error("AWS S3 support not available");
3784 #endif // HAVE_AWS_S3
3785  }
3786 #if 0 // TODO(ppan): implement and enable any other archive class
3787  else
3788  if ("hdfs" == url_parts[2])
3789  uarch.reset(new HdfsArchive(file_path));
3790 #endif
3791  else {
3792  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3793  }
3794 
3795  // init the archive for read
3796  auto& arch = *uarch;
3797 
3798  // coming here, the archive of url should be ready to be read, unarchived
3799  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3800  const void* buf;
3801  size_t size;
3802  bool just_saw_archive_header;
3803  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3804  bool first_text_header_skipped = false;
3805  // start reading uncompressed bytes of this archive from libarchive
3806  // note! this archive may contain more than one files!
3807  file_offsets.push_back(0);
3808  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3809  bool insert_line_delim_after_this_file = false;
3810  while (!stop) {
3811  int64_t offset{-1};
3812  auto ok = arch.read_data_block(&buf, &size, &offset);
3813  // can't use (uncompressed) size, so track (max) file offset.
3814  // also we want to capture offset even on e.o.f.
3815  if (offset > 0) {
3816  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3817  file_offsets.back() = offset;
3818  }
3819  if (!ok) {
3820  break;
3821  }
3822  // one subtle point here is now we concatenate all files
3823  // to a single FILE stream with which we call importDelimited
3824  // only once. this would make it misunderstand that only one
3825  // header line is with this 'single' stream, while actually
3826  // we may have one header line for each of the files.
3827  // so we need to skip header lines here instead in importDelimited.
3828  const char* buf2 = (const char*)buf;
3829  int size2 = size;
3831  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3832  while (size2-- > 0) {
3833  if (*buf2++ == copy_params.line_delim) {
3834  break;
3835  }
3836  }
3837  if (size2 <= 0) {
3838  LOG(WARNING) << "No line delimiter in block." << std::endl;
3839  } else {
3840  just_saw_archive_header = false;
3841  first_text_header_skipped = true;
3842  }
3843  }
3844  // In very rare occasions the write pipe somehow operates in a mode similar to
3845  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3846  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3847  // loop reading till no bytes left, otherwise the annoying `failed to write
3848  // pipe: Success`...
3849  if (size2 > 0) {
3850  int nremaining = size2;
3851  while (nremaining > 0) {
3852  // try to write the entire remainder of the buffer to the pipe
3853  int nwritten = write(fd[1], buf2, nremaining);
3854  // how did we do?
3855  if (nwritten < 0) {
3856  // something bad happened
3857  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3858  // ignore these, assume nothing written, try again
3859  nwritten = 0;
3860  } else {
3861  // a real error
3862  throw std::runtime_error(
3863  std::string("failed or interrupted write to pipe: ") +
3864  strerror(errno));
3865  }
3866  } else if (nwritten == nremaining) {
3867  // we wrote everything; we're done
3868  break;
3869  }
3870  // only wrote some (or nothing), try again
3871  nremaining -= nwritten;
3872  buf2 += nwritten;
3873  // no exception when too many rejected
3874  // @simon.eves how would this get set? from the other thread? mutex
3875  // needed?
3877  stop = true;
3878  break;
3879  }
3880  }
3881  // check that this file (buf for size) ended with a line delim
3882  if (size > 0) {
3883  const char* plast = static_cast<const char*>(buf) + (size - 1);
3884  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3885  }
3886  }
3887  }
3888  // if that file didn't end with a line delim, we insert one here to terminate
3889  // that file's stream use a loop for the same reason as above
3890  if (insert_line_delim_after_this_file) {
3891  while (true) {
3892  // write the delim char to the pipe
3893  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3894  // how did we do?
3895  if (nwritten < 0) {
3896  // something bad happened
3897  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3898  // ignore these, assume nothing written, try again
3899  nwritten = 0;
3900  } else {
3901  // a real error
3902  throw std::runtime_error(
3903  std::string("failed or interrupted write to pipe: ") +
3904  strerror(errno));
3905  }
3906  } else if (nwritten == 1) {
3907  // we wrote it; we're done
3908  break;
3909  }
3910  }
3911  }
3912  }
3913  } catch (...) {
3914  // when import is aborted because too many data errors or because end of a
3915  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3916  // suppressed.
3917  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3919  break;
3920  }
3921  if (import_status.rows_completed > 0) {
3922  if (nullptr != dynamic_cast<Detector*>(this)) {
3923  break;
3924  }
3925  }
3926  if (!teptr) { // no replace
3927  teptr = std::current_exception();
3928  }
3929  break;
3930  }
3931  }
3932  // close writer end
3933  close(fd[1]);
3934  });
3935 
3936  th_pipe_reader.join();
3937  th_pipe_writer.join();
3938 
3939  // rethrow any exception happened herebefore
3940  if (teptr) {
3941  std::rethrow_exception(teptr);
3942  }
3943 }
3944 
3947 }
3948 
3949 ImportStatus Importer::importDelimited(const std::string& file_path,
3950  const bool decompressed) {
3951  bool load_truncated = false;
3953 
3954  if (!p_file) {
3955  p_file = fopen(file_path.c_str(), "rb");
3956  }
3957  if (!p_file) {
3958  throw std::runtime_error("failed to open file '" + file_path +
3959  "': " + strerror(errno));
3960  }
3961 
3962  if (!decompressed) {
3963  (void)fseek(p_file, 0, SEEK_END);
3964  file_size = ftell(p_file);
3965  }
3966 
3967  if (copy_params.threads == 0) {
3968  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3969  } else {
3970  max_threads = static_cast<size_t>(copy_params.threads);
3971  }
3972 
3973  // deal with small files
3974  size_t alloc_size = copy_params.buffer_size;
3975  if (!decompressed && file_size < alloc_size) {
3976  alloc_size = file_size;
3977  }
3978 
3979  for (size_t i = 0; i < max_threads; i++) {
3980  import_buffers_vec.emplace_back();
3981  for (const auto cd : loader->get_column_descs()) {
3982  import_buffers_vec[i].emplace_back(
3983  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
3984  }
3985  }
3986 
3987  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3988  size_t current_pos = 0;
3989  size_t end_pos;
3990  size_t begin_pos = 0;
3991 
3992  (void)fseek(p_file, current_pos, SEEK_SET);
3993  size_t size =
3994  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3995 
3996  // make render group analyzers for each poly column
3997  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3999  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4000  loader->getTableDesc()->tableId, false, false, false);
4001  for (auto cd : columnDescriptors) {
4002  SQLTypes ct = cd->columnType.get_type();
4003  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4004  auto rga = std::make_shared<RenderGroupAnalyzer>();
4005  rga->seedFromExistingTableContents(loader, cd->columnName);
4006  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4007  }
4008  }
4009  }
4010 
4011  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4012  loader->getTableDesc()->tableId};
4013  auto table_epochs = loader->getTableEpochs();
4014  {
4015  std::list<std::future<ImportStatus>> threads;
4016 
4017  // use a stack to track thread_ids which must not overlap among threads
4018  // because thread_id is used to index import_buffers_vec[]
4019  std::stack<size_t> stack_thread_ids;
4020  for (size_t i = 0; i < max_threads; i++) {
4021  stack_thread_ids.push(i);
4022  }
4023  // added for true row index on error
4024  size_t first_row_index_this_buffer = 0;
4025 
4026  while (size > 0) {
4027  unsigned int num_rows_this_buffer = 0;
4028  CHECK(scratch_buffer);
4029  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4030  scratch_buffer,
4031  size,
4032  copy_params,
4033  first_row_index_this_buffer,
4034  num_rows_this_buffer,
4035  p_file);
4036 
4037  // unput residual
4038  int nresidual = size - end_pos;
4039  std::unique_ptr<char[]> unbuf;
4040  if (nresidual > 0) {
4041  unbuf = std::make_unique<char[]>(nresidual);
4042  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4043  }
4044 
4045  // get a thread_id not in use
4046  auto thread_id = stack_thread_ids.top();
4047  stack_thread_ids.pop();
4048  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4049 
4050  threads.push_back(std::async(std::launch::async,
4052  thread_id,
4053  this,
4054  std::move(scratch_buffer),
4055  begin_pos,
4056  end_pos,
4057  end_pos,
4058  columnIdToRenderGroupAnalyzerMap,
4059  first_row_index_this_buffer));
4060 
4061  first_row_index_this_buffer += num_rows_this_buffer;
4062 
4063  current_pos += end_pos;
4064  scratch_buffer = std::make_unique<char[]>(alloc_size);
4065  CHECK(scratch_buffer);
4066  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4067  size = nresidual +
4068  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4069 
4070  begin_pos = 0;
4071 
4072  while (threads.size() > 0) {
4073  int nready = 0;
4074  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4075  it != threads.end();) {
4076  auto& p = *it;
4077  std::chrono::milliseconds span(
4078  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4079  if (p.wait_for(span) == std::future_status::ready) {
4080  auto ret_import_status = p.get();
4081  import_status += ret_import_status;
4082  // sum up current total file offsets
4083  size_t total_file_offset{0};
4084  if (decompressed) {
4085  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4086  for (const auto file_offset : file_offsets) {
4087  total_file_offset += file_offset;
4088  }
4089  }
4090  // estimate number of rows per current total file offset
4091  if (decompressed ? total_file_offset : current_pos) {
4093  (decompressed ? (float)total_file_size / total_file_offset
4094  : (float)file_size / current_pos) *
4096  }
4097  VLOG(3) << "rows_completed " << import_status.rows_completed
4098  << ", rows_estimated " << import_status.rows_estimated
4099  << ", total_file_size " << total_file_size << ", total_file_offset "
4100  << total_file_offset;
4102  // recall thread_id for reuse
4103  stack_thread_ids.push(ret_import_status.thread_id);
4104  threads.erase(it++);
4105  ++nready;
4106  } else {
4107  ++it;
4108  }
4109  }
4110 
4111  if (nready == 0) {
4112  std::this_thread::yield();
4113  }
4114 
4115  // on eof, wait all threads to finish
4116  if (0 == size) {
4117  continue;
4118  }
4119 
4120  // keep reading if any free thread slot
4121  // this is one of the major difference from old threading model !!
4122  if (threads.size() < max_threads) {
4123  break;
4124  }
4125  }
4126 
4128  load_truncated = true;
4129  load_failed = true;
4130  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4131  break;
4132  }
4133  if (load_failed) {
4134  load_truncated = true;
4135  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
4136  "more details";
4137  break;
4138  }
4139  }
4140 
4141  // join dangling threads in case of LOG(ERROR) above
4142  for (auto& p : threads) {
4143  p.wait();
4144  }
4145  }
4146 
4147  checkpoint(table_epochs);
4148 
4149  // must set import_status.load_truncated before closing this end of pipe
4150  // otherwise, the thread on the other end would throw an unwanted 'write()'
4151  // exception
4152  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
4153  import_status.load_truncated = load_truncated;
4154 
4155  fclose(p_file);
4156  p_file = nullptr;
4157  return import_status;
4158 }
4159 
4161  if (getTableDesc()->persistenceLevel ==
4162  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4163  getCatalog().checkpoint(getTableDesc()->tableId);
4164  }
4165 }
4166 
4167 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4168  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4169  getTableDesc()->tableId);
4170 }
4171 
4173  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4174  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4175 }
4176 
4177 /* static */
4179  // for now we only support S3
4180  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4181  // only set if non-empty to allow GDAL defaults to persist
4182  // explicitly clear if empty to revert to default and not reuse a previous session's
4183  // keys
4184  if (copy_params.s3_region.size()) {
4185 #if DEBUG_AWS_AUTHENTICATION
4186  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4187 #endif
4188  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4189  } else {
4190 #if DEBUG_AWS_AUTHENTICATION
4191  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4192 #endif
4193  CPLSetConfigOption("AWS_REGION", nullptr);
4194  }
4195  if (copy_params.s3_endpoint.size()) {
4196 #if DEBUG_AWS_AUTHENTICATION
4197  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4198 #endif
4199  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4200  } else {
4201 #if DEBUG_AWS_AUTHENTICATION
4202  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4203 #endif
4204  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4205  }
4206  if (copy_params.s3_access_key.size()) {
4207 #if DEBUG_AWS_AUTHENTICATION
4208  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4209  << "'";
4210 #endif
4211  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4212  } else {
4213 #if DEBUG_AWS_AUTHENTICATION
4214  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4215 #endif
4216  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4217  }
4218  if (copy_params.s3_secret_key.size()) {
4219 #if DEBUG_AWS_AUTHENTICATION
4220  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4221  << "'";
4222 #endif
4223  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4224  } else {
4225 #if DEBUG_AWS_AUTHENTICATION
4226  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4227 #endif
4228  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4229  }
4230 
4231 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4232  // if we haven't set keys, we need to disable signed access
4233  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4234 #if DEBUG_AWS_AUTHENTICATION
4235  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4236 #endif
4237  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4238  } else {
4239 #if DEBUG_AWS_AUTHENTICATION
4240  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4241 #endif
4242  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4243  }
4244 #endif
4245 }
4246 
4247 /* static */
4248 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4249  const CopyParams& copy_params) {
4250  // lazy init GDAL
4251  GDAL::init();
4252 
4253  // set authorization tokens
4254  setGDALAuthorizationTokens(copy_params);
4255 
4256  // open the file
4257  OGRDataSource* poDS;
4258 #if GDAL_VERSION_MAJOR == 1
4259  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4260 #else
4261  poDS = (OGRDataSource*)GDALOpenEx(
4262  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4263  if (poDS == nullptr) {
4264  poDS = (OGRDataSource*)GDALOpenEx(
4265  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4266  if (poDS) {
4267  LOG(INFO) << "openGDALDataset had to open as read-only";
4268  }
4269  }
4270 #endif
4271  if (poDS == nullptr) {
4272  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4273  }
4274  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4275  // in a memory leak if GDAL successfully opened the input dataset.
4276  return poDS;
4277 }
4278 
4279 namespace {
4280 
4281 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4282  const OGRDataSourceUqPtr& poDS,
4283  const std::string& file_name) {
4284  // get layer with specified name, or default to first layer
4285  OGRLayer* poLayer = nullptr;
4286  if (geo_layer_name.size()) {
4287  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4288  if (poLayer == nullptr) {
4289  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4290  file_name);
4291  }
4292  } else {
4293  poLayer = poDS->GetLayer(0);
4294  if (poLayer == nullptr) {
4295  throw std::runtime_error("No layers found in " + file_name);
4296  }
4297  }
4298  return *poLayer;
4299 }
4300 
4301 } // namespace
4302 
4303 /* static */
4305  const std::string& file_name,
4306  const std::string& geo_column_name,
4307  std::map<std::string, std::vector<std::string>>& metadata,
4308  int rowLimit,
4309  const CopyParams& copy_params) {
4310  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4311  if (poDS == nullptr) {
4312  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4313  file_name);
4314  }
4315 
4316  OGRLayer& layer =
4317  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4318 
4319  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4320  CHECK(poFDefn);
4321 
4322  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4323  auto nFeats = layer.GetFeatureCount();
4324  size_t numFeatures =
4325  std::max(static_cast<decltype(nFeats)>(0),
4326  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4327  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4328  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4329  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4330  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4331  }
4332  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4333  layer.ResetReading();
4334  size_t iFeature = 0;
4335  while (iFeature < numFeatures) {
4336  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4337  if (!poFeature) {
4338  break;
4339  }
4340 
4341  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4342  if (poGeometry != nullptr) {
4343  // validate geom type (again?)
4344  switch (wkbFlatten(poGeometry->getGeometryType())) {
4345  case wkbPoint:
4346  case wkbLineString:
4347  case wkbPolygon:
4348  case wkbMultiPolygon:
4349  break;
4350  case wkbMultiPoint:
4351  case wkbMultiLineString:
4352  // supported if geo_explode_collections is specified
4353  if (!copy_params.geo_explode_collections) {
4354  throw std::runtime_error("Unsupported geometry type: " +
4355  std::string(poGeometry->getGeometryName()));
4356  }
4357  break;
4358  default:
4359  throw std::runtime_error("Unsupported geometry type: " +
4360  std::string(poGeometry->getGeometryName()));
4361  }
4362 
4363  // populate metadata for regular fields
4364  for (auto i : metadata) {
4365  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4366  if (iField >= 0) { // geom is -1
4367  metadata[i.first].at(iFeature) =
4368  std::string(poFeature->GetFieldAsString(iField));
4369  }
4370  }
4371 
4372  // populate metadata for geo column with WKT string
4373  char* wkts = nullptr;
4374  poGeometry->exportToWkt(&wkts);
4375  CHECK(wkts);
4376  metadata[geo_column_name].at(iFeature) = wkts;
4377  CPLFree(wkts);
4378  }
4379  iFeature++;
4380  }
4381 }
4382 
4383 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4384  switch (ogr_type) {
4385  case OFTInteger:
4386  return std::make_pair(kINT, false);
4387  case OFTIntegerList:
4388  return std::make_pair(kINT, true);
4389 #if GDAL_VERSION_MAJOR > 1
4390  case OFTInteger64:
4391  return std::make_pair(kBIGINT, false);
4392  case OFTInteger64List:
4393  return std::make_pair(kBIGINT, true);
4394 #endif
4395  case OFTReal:
4396  return std::make_pair(kDOUBLE, false);
4397  case OFTRealList:
4398  return std::make_pair(kDOUBLE, true);
4399  case OFTString:
4400  return std::make_pair(kTEXT, false);
4401  case OFTStringList:
4402  return std::make_pair(kTEXT, true);
4403  case OFTDate:
4404  return std::make_pair(kDATE, false);
4405  case OFTTime:
4406  return std::make_pair(kTIME, false);
4407  case OFTDateTime:
4408  return std::make_pair(kTIMESTAMP, false);
4409  case OFTBinary:
4410  // Interpret binary blobs as byte arrays here
4411  // but actual import will store NULL as GDAL will not
4412  // extract the blob (OGRFeature::GetFieldAsString will
4413  // result in the import buffers having an empty string)
4414  return std::make_pair(kTINYINT, true);
4415  default:
4416  break;
4417  }
4418  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4419 }
4420 
4421 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4422  switch (ogr_type) {
4423  case wkbPoint:
4424  return kPOINT;
4425  case wkbLineString:
4426  return kLINESTRING;
4427  case wkbPolygon:
4428  return kPOLYGON;
4429  case wkbMultiPolygon:
4430  return kMULTIPOLYGON;
4431  default:
4432  break;
4433  }
4434  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4435 }
4436 
4437 /* static */
4438 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4439  const std::string& file_name,
4440  const std::string& geo_column_name,
4441  const CopyParams& copy_params) {
4442  std::list<ColumnDescriptor> cds;
4443 
4444  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4445  if (poDS == nullptr) {
4446  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4447  file_name);
4448  }
4449 
4450  OGRLayer& layer =
4451  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4452 
4453  layer.ResetReading();
4454  // TODO(andrewseidl): support multiple features
4455  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4456  if (poFeature == nullptr) {
4457  throw std::runtime_error("No features found in " + file_name);
4458  }
4459  // get fields as regular columns
4460  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4461  CHECK(poFDefn);
4462  int iField;
4463  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4464  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4465  auto typePair = ogr_to_type(poFieldDefn->GetType());
4466  ColumnDescriptor cd;
4467  cd.columnName = poFieldDefn->GetNameRef();
4468  cd.sourceName = poFieldDefn->GetNameRef();
4469  SQLTypeInfo ti;
4470  if (typePair.second) {
4471  ti.set_type(kARRAY);
4472  ti.set_subtype(typePair.first);
4473  } else {
4474  ti.set_type(typePair.first);
4475  }
4476  if (typePair.first == kTEXT) {
4478  ti.set_comp_param(32);
4479  }
4480  ti.set_fixed_size();
4481  cd.columnType = ti;
4482  cds.push_back(cd);
4483  }
4484  // get geo column, if any
4485  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4486  if (poGeometry) {
4487  ColumnDescriptor cd;
4488  cd.columnName = geo_column_name;
4489  cd.sourceName = geo_column_name;
4490 
4491  // get GDAL type
4492  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4493 
4494  // if exploding, override any collection type to child type
4495  if (copy_params.geo_explode_collections) {
4496  if (ogr_type == wkbMultiPolygon) {
4497  ogr_type = wkbPolygon;
4498  } else if (ogr_type == wkbMultiLineString) {
4499  ogr_type = wkbLineString;
4500  } else if (ogr_type == wkbMultiPoint) {
4501  ogr_type = wkbPoint;
4502  }
4503  }
4504 
4505  // convert to internal type
4506  SQLTypes geoType = ogr_to_type(ogr_type);
4507 
4508  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4509  if (PROMOTE_POLYGON_TO_MULTIPOLYGON && !copy_params.geo_explode_collections) {
4510  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4511  }
4512 
4513  // build full internal type
4514  SQLTypeInfo ti;
4515  ti.set_type(geoType);
4516  ti.set_subtype(copy_params.geo_coords_type);
4517  ti.set_input_srid(copy_params.geo_coords_srid);
4518  ti.set_output_srid(copy_params.geo_coords_srid);
4519  ti.set_compression(copy_params.geo_coords_encoding);
4520  ti.set_comp_param(copy_params.geo_coords_comp_param);
4521  cd.columnType = ti;
4522 
4523  cds.push_back(cd);
4524  }
4525  return cds;
4526 }
4527 
4528 bool Importer::gdalStatInternal(const std::string& path,
4529  const CopyParams& copy_params,
4530  bool also_dir) {
4531  // lazy init GDAL
4532  GDAL::init();
4533 
4534  // set authorization tokens
4535  setGDALAuthorizationTokens(copy_params);
4536 
4537 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4538  // clear GDAL stat cache
4539  // without this, file existence will be cached, even if authentication changes
4540  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4541  VSICurlClearCache();
4542 #endif
4543 
4544  // stat path
4545  VSIStatBufL sb;
4546  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4547  if (result < 0) {
4548  return false;
4549  }
4550 
4551  // exists?
4552  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4553  return true;
4554  } else if (VSI_ISREG(sb.st_mode)) {
4555  return true;
4556  }
4557  return false;
4558 }
4559 
4560 /* static */
4561 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4562  return gdalStatInternal(path, copy_params, false);
4563 }
4564 
4565 /* static */
4566 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4567  const CopyParams& copy_params) {
4568  return gdalStatInternal(path, copy_params, true);
4569 }
4570 
4571 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4572  std::vector<std::string>& files) {
4573  // prepare to gather subdirectories
4574  std::vector<std::string> subdirectories;
4575 
4576  // get entries
4577  char** entries = VSIReadDir(archive_path.c_str());
4578  if (!entries) {
4579  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4580  return;
4581  }
4582 
4583  // force scope
4584  {
4585  // request clean-up
4586  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4587 
4588  // check all the entries
4589  int index = 0;
4590  while (true) {
4591  // get next entry, or drop out if there isn't one
4592  char* entry_c = entries[index++];
4593  if (!entry_c) {
4594  break;
4595  }
4596  std::string entry(entry_c);
4597 
4598  // ignore '.' and '..'
4599  if (entry == "." || entry == "..") {
4600  continue;
4601  }
4602 
4603  // build the full path
4604  std::string entry_path = archive_path + std::string("/") + entry;
4605 
4606  // is it a file or a sub-folder
4607  VSIStatBufL sb;
4608  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4609  if (result < 0) {
4610  break;
4611  }
4612 
4613  if (VSI_ISDIR(sb.st_mode)) {
4614  // a directory that ends with .gdb could be a Geodatabase bundle
4615  // arguably dangerous to decide this purely by name, but any further
4616  // validation would be very complex especially at this scope
4617  if (boost::iends_with(entry_path, ".gdb")) {
4618  // add the directory as if it was a file and don't recurse into it
4619  files.push_back(entry_path);
4620  } else {
4621  // add subdirectory to be recursed into
4622  subdirectories.push_back(entry_path);
4623  }
4624  } else {
4625  // add this file
4626  files.push_back(entry_path);
4627  }
4628  }
4629  }
4630 
4631  // recurse into each subdirectories we found
4632  for (const auto& subdirectory : subdirectories) {
4633  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4634  }
4635 }
4636 
4637 /* static */
4638 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4639  const std::string& archive_path,
4640  const CopyParams& copy_params) {
4641  // lazy init GDAL
4642  GDAL::init();
4643 
4644  // set authorization tokens
4645  setGDALAuthorizationTokens(copy_params);
4646 
4647  // prepare to gather files
4648  std::vector<std::string> files;
4649 
4650  // gather the files recursively
4651  gdalGatherFilesInArchiveRecursive(archive_path, files);
4652 
4653  // convert to relative paths inside archive
4654  for (auto& file : files) {
4655  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4656  }
4657 
4658  // done
4659  return files;
4660 }
4661 
4662 /* static */
4663 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4664  const std::string& file_name,
4665  const CopyParams& copy_params) {
4666  // lazy init GDAL
4667  GDAL::init();
4668 
4669  // set authorization tokens
4670  setGDALAuthorizationTokens(copy_params);
4671 
4672  // prepare to gather layer info
4673  std::vector<GeoFileLayerInfo> layer_info;
4674 
4675  // open the data set
4676  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4677  if (poDS == nullptr) {
4678  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4679  file_name);
4680  }
4681 
4682  // enumerate the layers
4683  for (auto&& poLayer : poDS->GetLayers()) {
4685  // prepare to read this layer
4686  poLayer->ResetReading();
4687  // skip layer if empty
4688  if (poLayer->GetFeatureCount() > 0) {
4689  // get first feature
4690  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4691  CHECK(first_feature);
4692  // check feature for geometry
4693  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4694  if (!geometry) {
4695  // layer has no geometry
4696  contents = GeoFileLayerContents::NON_GEO;
4697  } else {
4698  // check the geometry type
4699  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4700  switch (wkbFlatten(geometry_type)) {
4701  case wkbPoint:
4702  case wkbLineString:
4703  case wkbPolygon:
4704  case wkbMultiPolygon:
4705  // layer has supported geo
4706  contents = GeoFileLayerContents::GEO;
4707  break;
4708  case wkbMultiPoint:
4709  case wkbMultiLineString:
4710  // supported if geo_explode_collections is specified
4711  contents = copy_params.geo_explode_collections
4714  break;
4715  default:
4716  // layer has unsupported geometry
4718  break;
4719  }
4720  }
4721  }
4722  // store info for this layer
4723  layer_info.emplace_back(poLayer->GetName(), contents);
4724  }
4725 
4726  // done
4727  return layer_info;
4728 }
4729 
4731  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4732  // initial status
4733  bool load_truncated = false;
4735 
4737  if (poDS == nullptr) {
4738  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4739  file_path);
4740  }
4741 
4742  OGRLayer& layer =
4744 
4745  // get the number of features in this layer
4746  size_t numFeatures = layer.GetFeatureCount();
4747 
4748  // build map of metadata field (additional columns) name to index
4749  // use shared_ptr since we need to pass it to the worker
4750  FieldNameToIndexMapType fieldNameToIndexMap;
4751  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4752  CHECK(poFDefn);
4753  size_t numFields = poFDefn->GetFieldCount();
4754  for (size_t iField = 0; iField < numFields; iField++) {
4755  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4756  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4757  }
4758 
4759  // the geographic spatial reference we want to put everything in
4760  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4761  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4762 
4763 #if GDAL_VERSION_MAJOR >= 3
4764  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4765  // this results in X and Y being transposed for angle-based
4766  // coordinate systems. This restores the previous behavior.
4767  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4768 #endif
4769 
4770 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4771  // just one "thread"
4772  size_t max_threads = 1;
4773 #else
4774  // how many threads to use
4775  size_t max_threads = 0;
4776  if (copy_params.threads == 0) {
4777  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4778  } else {
4779  max_threads = copy_params.threads;
4780  }
4781 #endif
4782 
4783  // make an import buffer for each thread
4784  CHECK_EQ(import_buffers_vec.size(), 0u);
4785  import_buffers_vec.resize(max_threads);
4786  for (size_t i = 0; i < max_threads; i++) {
4787  for (const auto cd : loader->get_column_descs()) {
4788  import_buffers_vec[i].emplace_back(
4789  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4790  }
4791  }
4792 
4793  // make render group analyzers for each poly column
4794  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4796  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4797  loader->getTableDesc()->tableId, false, false, false);
4798  for (auto cd : columnDescriptors) {
4799  SQLTypes ct = cd->columnType.get_type();
4800  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4801  auto rga = std::make_shared<RenderGroupAnalyzer>();
4802  rga->seedFromExistingTableContents(loader, cd->columnName);
4803  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4804  }
4805  }
4806  }
4807 
4808 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4809  // threads
4810  std::list<std::future<ImportStatus>> threads;
4811 
4812  // use a stack to track thread_ids which must not overlap among threads
4813  // because thread_id is used to index import_buffers_vec[]
4814  std::stack<size_t> stack_thread_ids;
4815  for (size_t i = 0; i < max_threads; i++) {
4816  stack_thread_ids.push(i);
4817  }
4818 #endif
4819 
4820  // checkpoint the table
4821  auto table_epochs = loader->getTableEpochs();
4822 
4823  // reset the layer
4824  layer.ResetReading();
4825 
4826  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4827 
4828  // make a features buffer for each thread
4829  std::vector<FeaturePtrVector> features(max_threads);
4830 
4831  // for each feature...
4832  size_t firstFeatureThisChunk = 0;
4833  while (firstFeatureThisChunk < numFeatures) {
4834  // how many features this chunk
4835  size_t numFeaturesThisChunk =
4836  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4837 
4838 // get a thread_id not in use
4839 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4840  size_t thread_id = 0;
4841 #else
4842  auto thread_id = stack_thread_ids.top();
4843  stack_thread_ids.pop();
4844  CHECK(thread_id < max_threads);
4845 #endif
4846 
4847  // fill features buffer for new thread
4848  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4849  features[thread_id].emplace_back(layer.GetNextFeature());
4850  }
4851 
4852 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4853  // call worker function directly
4854  auto ret_import_status = import_thread_shapefile(0,
4855  this,
4856  poGeographicSR.get(),
4857  std::move(features[thread_id]),
4858  firstFeatureThisChunk,
4859  numFeaturesThisChunk,
4860  fieldNameToIndexMap,
4861  columnNameToSourceNameMap,
4862  columnIdToRenderGroupAnalyzerMap);
4863  import_status += ret_import_status;
4864  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4865  import_status.rows_completed;
4866  set_import_status(import_id, import_status);
4867 #else
4868  // fire up that thread to import this geometry
4869  threads.push_back(std::async(std::launch::async,
4871  thread_id,
4872  this,
4873  poGeographicSR.get(),
4874  std::move(features[thread_id]),
4875  firstFeatureThisChunk,
4876  numFeaturesThisChunk,
4877  fieldNameToIndexMap,
4878  columnNameToSourceNameMap,
4879  columnIdToRenderGroupAnalyzerMap));
4880 
4881  // let the threads run
4882  while (threads.size() > 0) {
4883  int nready = 0;
4884  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4885  it != threads.end();) {
4886  auto& p = *it;
4887  std::chrono::milliseconds span(
4888  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4889  if (p.wait_for(span) == std::future_status::ready) {
4890  auto ret_import_status = p.get();
4891  import_status += ret_import_status;
4892  import_status.rows_estimated =
4893  ((float)firstFeatureThisChunk / (float)numFeatures) *
4894  import_status.rows_completed;
4895  set_import_status(import_id, import_status);
4896 
4897  // recall thread_id for reuse
4898  stack_thread_ids.push(ret_import_status.thread_id);
4899 
4900  threads.erase(it++);
4901  ++nready;
4902  } else {
4903  ++it;
4904  }
4905  }
4906 
4907  if (nready == 0) {
4908  std::this_thread::yield();
4909  }
4910 
4911  // keep reading if any free thread slot
4912  // this is one of the major difference from old threading model !!
4913  if (threads.size() < max_threads) {
4914  break;
4915  }
4916  }
4917 #endif
4918 
4919  // out of rows?
4920  if (import_status.rows_rejected > copy_params.max_reject) {
4921  load_truncated = true;
4922  load_failed = true;
4923  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4924  break;
4925  }
4926 
4927  // failed?
4928  if (load_failed) {
4929  load_truncated = true;
4930  LOG(ERROR)
4931  << "A call to the Loader::load failed, Please review the logs for more details";
4932  break;
4933  }
4934 
4935  firstFeatureThisChunk += numFeaturesThisChunk;
4936  }
4937 
4938 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4939  // wait for any remaining threads
4940  if (threads.size()) {
4941  for (auto& p : threads) {
4942  // wait for the thread
4943  p.wait();
4944  // get the result and update the final import status
4945  auto ret_import_status = p.get();
4946  import_status += ret_import_status;
4949  }
4950  }
4951 #endif
4952 
4953  checkpoint(table_epochs);
4954 
4955  // must set import_status.load_truncated before closing this end of pipe
4956  // otherwise, the thread on the other end would throw an unwanted 'write()'
4957  // exception
4958  import_status.load_truncated = load_truncated;
4959  return import_status;
4960 }
4961 
4962 //
4963 // class RenderGroupAnalyzer
4964 //
4965 
4967  const std::unique_ptr<Loader>& loader,
4968  const std::string& geoColumnBaseName) {
4969  // start timer
4970  auto seedTimer = timer_start();
4971 
4972  // start with a fresh tree
4973  _rtree = nullptr;
4974  _numRenderGroups = 0;
4975 
4976  // get the table descriptor
4977  const auto& cat = loader->getCatalog();
4978  if (loader->getTableDesc()->storageType == StorageType::FOREIGN_TABLE) {
4980  LOG(INFO) << "DEBUG: Table is a foreign table";
4981  }
4982  _rtree = std::make_unique<RTree>();
4983  CHECK(_rtree);
4984  return;
4985  }
4986 
4987  const std::string& tableName = loader->getTableDesc()->tableName;
4988  const auto td = cat.getMetadataForTable(tableName);
4989  CHECK(td);
4990  CHECK(td->fragmenter);
4991 
4992  // if the table is empty, just make an empty tree
4993  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4995  LOG(INFO) << "DEBUG: Table is empty!";
4996  }
4997  _rtree = std::make_unique<RTree>();
4998  CHECK(_rtree);
4999  return;
5000  }
5001 
5002  // no seeding possible without these two columns
5003  const auto cd_bounds =
5004  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
5005  const auto cd_render_group =
5006  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
5007  if (!cd_bounds || !cd_render_group) {
5008  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5009  " doesn't have bounds or render_group columns!");
5010  }
5011 
5012  // and validate their types
5013  if (cd_bounds->columnType.get_type() != kARRAY ||
5014  cd_bounds->columnType.get_subtype() != kDOUBLE) {
5015  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5016  " bounds column is wrong type!");
5017  }
5018  if (cd_render_group->columnType.get_type() != kINT) {
5019  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5020  " render_group column is wrong type!");
5021  }
5022 
5023  // get chunk accessor table
5024  auto chunkAccessorTable = getChunkAccessorTable(
5025  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
5026  const auto table_count = std::get<0>(chunkAccessorTable.back());
5027 
5029  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
5030  << "'";
5031  }
5032 
5033  std::vector<Node> nodes;
5034  try {
5035  nodes.resize(table_count);
5036  } catch (const std::exception& e) {
5037  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
5038  std::to_string(table_count) + " rows");
5039  }
5040 
5041  for (size_t row = 0; row < table_count; row++) {
5042  ArrayDatum ad;
5043  VarlenDatum vd;
5044  bool is_end;
5045 
5046  // get ChunkIters and fragment row offset
5047  size_t rowOffset = 0;
5048  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
5049  auto& boundsChunkIter = chunkIters[0];
5050  auto& renderGroupChunkIter = chunkIters[1];
5051 
5052  // get bounds values
5053  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
5054  CHECK(!is_end);
5055  CHECK(ad.pointer);
5056  int numBounds = (int)(ad.length / sizeof(double));
5057  CHECK(numBounds == 4);
5058 
5059  // convert to bounding box
5060  double* bounds = reinterpret_cast<double*>(ad.pointer);
5061  BoundingBox bounding_box;
5062  boost::geometry::assign_inverse(bounding_box);
5063  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5064  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5065 
5066  // get render group
5067  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
5068  CHECK(!is_end);
5069  CHECK(vd.pointer);
5070  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
5071 
5072  // skip rows with invalid render groups (e.g. EMPTY geometry)
5073  if (renderGroup < 0) {
5074  continue;
5075  }
5076 
5077  // store
5078  nodes[row] = std::make_pair(bounding_box, renderGroup);
5079 
5080  // how many render groups do we have now?
5081  if (renderGroup >= _numRenderGroups) {
5082  _numRenderGroups = renderGroup + 1;
5083  }
5084 
5086  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
5087  }
5088  }
5089 
5090  // bulk-load the tree
5091  auto bulk_load_timer = timer_start();
5092  _rtree = std::make_unique<RTree>(nodes);
5093  CHECK(_rtree);
5094  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
5095  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
5096  << timer_stop(bulk_load_timer) << " ms for tree)";
5097 
5099  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
5100  }
5101 }
5102 
5104  const std::vector<double>& bounds) {
5105  // validate
5106  CHECK(bounds.size() == 4);
5107 
5108  // get bounds
5109  BoundingBox bounding_box;
5110  boost::geometry::assign_inverse(bounding_box);
5111  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5112  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5113 
5114  // remainder under mutex to allow this to be multi-threaded
5115  std::lock_guard<std::mutex> guard(_rtreeMutex);
5116 
5117  // get the intersecting nodes
5118  std::vector<Node> intersects;
5119  _rtree->query(boost::geometry::index::intersects(bounding_box),
5120  std::back_inserter(intersects));
5121 
5122  // build bitset of render groups of the intersecting rectangles
5123  // clear bit means available, allows use of find_first()
5124  boost::dynamic_bitset<> bits(_numRenderGroups);
5125  bits.set();
5126  for (const auto& intersection : intersects) {
5127  CHECK(intersection.second < _numRenderGroups);
5128  bits.reset(intersection.second);
5129  }
5130 
5131  // find first available group
5132  int firstAvailableRenderGroup = 0;
5133  size_t firstSetBit = bits.find_first();
5134  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5135  // all known groups represented, add a new one
5136  firstAvailableRenderGroup = _numRenderGroups;
5137  _numRenderGroups++;
5138  } else {
5139  firstAvailableRenderGroup = (int)firstSetBit;
5140  }
5141 
5142  // insert new node
5143  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5144 
5145  // return it
5146  return firstAvailableRenderGroup;
5147 }
5148 
5149 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
5150  const TableDescriptor* td,
5151  Loader* loader) {
5152  CHECK(td);
5153  auto col_descs = loader->get_column_descs();
5154 
5155  std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
5156  for (auto cd : col_descs) {
5157  import_buffers.emplace_back(
5158  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
5159  }
5160 
5161  return import_buffers;
5162 }
5163 
5164 } // namespace import_export
int8_t tinyintval
Definition: sqltypes.h:135
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2865
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:72
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2488
void set_compression(EncodingType c)
Definition: sqltypes.h:359
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:132
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string s3_secret_key
Definition: CopyParams.h:63
#define NULL_DOUBLE
Definition: sqltypes.h:186
bool is_time() const
Definition: sqltypes.h:423
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2978
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
bool is_string() const
Definition: sqltypes.h:417
const int8_t const int64_t * num_rows
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
Definition: sqltypes.h:51
SQLTypes
Definition: sqltypes.h:40
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:541
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4730
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
std::vector< OGRFeatureUqPtr > FeaturePtrVector
Definition: Importer.cpp:136
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs)
Definition: Catalog.cpp:2811
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:77
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor *> &col_descs)
Definition: Importer.cpp:1672
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1706
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:2831
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:194
#define LOG(tag)
Definition: Logger.h:188
bool boolval
Definition: sqltypes.h:134
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3114
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params, const int64_t replicate_count=0)
Definition: Importer.cpp:498
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4281
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:395
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
ArrayDatum NullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:367
auto value_getter(const arrow::Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:149
Datum TDatumToDatum(const TDatum &datum, SQLTypeInfo &ti)
Definition: Importer.cpp:406
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:545
#define UNREACHABLE()
Definition: Logger.h:241
virtual void checkpoint()
Definition: Importer.cpp:4160
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5149
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:350
bool is_null_point(const SQLTypeInfo &geo_ti, const int8_t *coords, const size_t coords_sz)
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
static void init()
Definition: GDAL.cpp:56
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:743
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4663
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:609
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:908
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3261
int8_t * getAsBytes() const
Definition: Importer.h:286
virtual std::vector< Catalog_Namespace::TableEpochInfo > getTableEpochs() const
Definition: Importer.cpp:4167
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:751
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4528
#define CHECK_GT(x, y)
Definition: Logger.h:209
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
bool is_decimal() const
Definition: sqltypes.h:420
int32_t intval
Definition: sqltypes.h:137
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4561
std::string sourceName
std::string to_string(char const *&&v)
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:949
int8_t * pointer
Definition: sqltypes.h:75
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords)
Definition: Importer.cpp:1409
#define DEBUG_TIMING
Definition: Importer.cpp:138
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2841
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1421
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
void set_input_srid(int d)
Definition: sqltypes.h:353
float floatval
Definition: sqltypes.h:139
ImportHeaderRow has_header
Definition: CopyParams.h:48
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2990
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:232
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3190
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:3693
void close(const int fd)
Definition: omnisci_fs.cpp:68
size_t g_archive_read_buf_size
Definition: Importer.cpp:75
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
ArrayDatum TDatumToArrayDatum(const TDatum &datum, const SQLTypeInfo &ti)
Definition: Importer.cpp:451
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:135
std::string cat(Ts &&... args)
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4178
void set_fixed_size()
Definition: sqltypes.h:358
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::vector< std::string > glob(const std::string &pattern)
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4438
std::shared_timed_mutex mapd_shared_mutex
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3699
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4304
int64_t bigintval
Definition: sqltypes.h:138
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:260
#define NULL_FLOAT
Definition: sqltypes.h:185
void gdalGatherFilesInArchiveRecursive(const std::string &archive_path, std::vector< std::string > &files)
Definition: Importer.cpp:4571
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Definition: Importer.cpp:144
int16_t smallintval
Definition: sqltypes.h:136
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:786
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
Definition: Datum.cpp:124
specifies the content in-memory of a row in the column metadata table
int8_t * getStringDictBuffer() const
Definition: Importer.h:354
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:823
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
auto getLoader() const
Definition: Importer.h:810
#define DEBUG_RENDER_GROUP_ANALYZER
Definition: Importer.cpp:139
void set_output_srid(int s)
Definition: sqltypes.h:355
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:3949
double double_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2539
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2494
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:302
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4566
std::set< int64_t > rows
Definition: Importer.h:76
bool loadToShard(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, const TableDescriptor *shard_table, bool checkpoint)
Definition: Importer.cpp:2789
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:825
bool try_cast(const std::string &str)
Definition: Importer.cpp:2969
void set_comp_param(int p)
Definition: sqltypes.h:360
std::string geo_layer_name
Definition: CopyParams.h:78
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:701
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:213
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:54
Definition: sqltypes.h:55
static OGRGeometry * createOGRGeometry(const std::string &wkt_or_wkb_hex)
Definition: Types.cpp:667
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:314
#define CHECK_LE(x, y)
Definition: Logger.h:208
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: sqltypes.h:871
void startThread(FuncType &&func, Args &&... args)
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:3092
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
Definition: Datum.cpp:318
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2532
std::map< std::string, std::string > ColumnNameToSourceNameMapType
Definition: Importer.cpp:133
static ImportStatus import_thread_delimited(int thread_id, Importer *importer, std::unique_ptr< char[]> scratch_buffer, size_t begin_pos, size_t end_pos, size_t total_size, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap, size_t first_row_index_this_buffer)
Definition: Importer.cpp:1796
Definition: sqltypes.h:43
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3241
bool detect_headers(const std::vector< SQLTypes > &first_types, const std::vector< SQLTypes > &rest_types)
Definition: Importer.cpp:3226
#define IS_STRING(T)
Definition: sqltypes.h:173
int64_t inline_fixed_encoding_null_array_val(const SQL_TYPE_INFO &ti)
std::string import_id
Definition: Importer.h:819
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:721
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2501
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:848
void checkpoint(const std::vector< Catalog_Namespace::TableEpochInfo > &table_epochs)
Definition: Importer.cpp:3268
std::chrono::steady_clock::time_point start
Definition: Importer.h:604
ThreadId thread_id()
Definition: Logger.cpp:731
mapd_shared_lock< mapd_shared_mutex > read_lock
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:3084
std::vector< std::string > get_headers()
Definition: Importer.cpp:3249
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2696
const CopyParams & get_copy_params() const
Definition: Importer.h:742
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:624
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:125
#define CHECK(condition)
Definition: Logger.h:197
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:208
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
import_export::TypedImportBuffer TypedImportBuffer
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
std::vector< int > ChunkKey
Definition: types.h:37
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2548
ImportStatus import()
Definition: Importer.cpp:3945
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
static std::vector< std::string > gdalGetAllFilesInArchive(const std::string &archive_path, const CopyParams &copy_params)
Definition: Importer.cpp:4638
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2120
Definition: sqltypes.h:47
SQLTypeInfo columnType
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4248
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:64
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2783
specifies the content in-memory of a row in the table metadata table
std::string error_context(const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
Definition: ArrowImporter.h:76
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
int8_t * numbersPtr
Definition: sqltypes.h:149
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1525
static constexpr char const * FOREIGN_TABLE