OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <unistd.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
57 #include "ImportExport/GDAL.h"
59 #include "Shared/Logger.h"
60 #include "Shared/SqlTypesLayout.h"
61 #include "Shared/geo_compression.h"
62 #include "Shared/geo_types.h"
63 #include "Shared/geosupport.h"
64 #include "Shared/import_helpers.h"
65 #include "Shared/mapd_glob.h"
66 #include "Shared/mapdpath.h"
67 #include "Shared/measure.h"
68 #include "Shared/misc.h"
69 #include "Shared/scope.h"
70 #include "Shared/shard_key.h"
71 #include "Shared/thread_count.h"
73 
74 #include "gen-cpp/OmniSci.h"
75 
76 size_t g_archive_read_buf_size = 1 << 20;
77 
78 inline auto get_filesize(const std::string& file_path) {
79  boost::filesystem::path boost_file_path{file_path};
81  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
82  return ec ? 0 : filesize;
83 }
84 
85 namespace {
86 
88  void operator()(OGRDataSource* datasource) {
89  if (datasource) {
90  GDALClose(datasource);
91  }
92  }
93 };
94 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
95 
97  void operator()(OGRFeature* feature) {
98  if (feature) {
99  OGRFeature::DestroyFeature(feature);
100  }
101  }
102 };
103 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
104 
106  void operator()(OGRSpatialReference* ref) {
107  if (ref) {
108  OGRSpatialReference::DestroySpatialReference(ref);
109  }
110  }
111 };
113  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
114 
115 } // namespace
116 
117 // For logging std::vector<std::string> row.
118 namespace boost {
119 namespace log {
120 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
121  out << '[';
122  for (size_t i = 0; i < row.size(); ++i) {
123  out << (i ? ", " : "") << row[i];
124  }
125  out << ']';
126  return out;
127 }
128 } // namespace log
129 } // namespace boost
130 
131 namespace import_export {
132 
133 using FieldNameToIndexMapType = std::map<std::string, size_t>;
134 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
136  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
137 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
138 
139 #define DEBUG_TIMING false
140 #define DEBUG_RENDER_GROUP_ANALYZER 0
141 #define DEBUG_AWS_AUTHENTICATION 0
142 
143 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
144 
145 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
146 
148 static std::map<std::string, ImportStatus> import_status_map;
149 
151  const TableDescriptor* t,
152  const std::string& f,
153  const CopyParams& p)
154  : Importer(new Loader(c, t), f, p) {}
155 
156 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
157  : DataStreamSink(p, f), loader(providedLoader) {
158  import_id = boost::filesystem::path(file_path).filename().string();
159  file_size = 0;
160  max_threads = 0;
161  p_file = nullptr;
162  buffer[0] = nullptr;
163  buffer[1] = nullptr;
164  // we may be overallocating a little more memory here due to dropping phy cols.
165  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
166  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
167  int i = 0;
168  bool has_array = false;
169  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
170  int skip_physical_cols = 0;
171  for (auto& p : loader->get_column_descs()) {
172  // phy geo columns can't be in input file
173  if (skip_physical_cols-- > 0) {
174  continue;
175  }
176  // neither are rowid or $deleted$
177  // note: columns can be added after rowid/$deleted$
178  if (p->isVirtualCol || p->isDeletedCol) {
179  continue;
180  }
181  skip_physical_cols = p->columnType.get_physical_cols();
182  if (p->columnType.get_type() == kARRAY) {
183  is_array.get()[i] = true;
184  has_array = true;
185  } else {
186  is_array.get()[i] = false;
187  }
188  ++i;
189  }
190  if (has_array) {
191  is_array_a = std::unique_ptr<bool[]>(is_array.release());
192  } else {
193  is_array_a = std::unique_ptr<bool[]>(nullptr);
194  }
195 }
196 
198  if (p_file != nullptr) {
199  fclose(p_file);
200  }
201  if (buffer[0] != nullptr) {
202  free(buffer[0]);
203  }
204  if (buffer[1] != nullptr) {
205  free(buffer[1]);
206  }
207 }
208 
209 ImportStatus Importer::get_import_status(const std::string& import_id) {
210  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
211  return import_status_map.at(import_id);
212 }
213 
214 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
215  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
216  is.end = std::chrono::steady_clock::now();
217  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
219 }
220 
221 static const std::string trim_space(const char* field, const size_t len) {
222  size_t i = 0;
223  size_t j = len;
224  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
225  i++;
226  }
227  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
228  j--;
229  }
230  return std::string(field + i, j - i);
231 }
232 
234  Datum d;
235  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
236  switch (type) {
237  case kBOOLEAN:
239  break;
240  case kBIGINT:
242  break;
243  case kINT:
245  break;
246  case kSMALLINT:
248  break;
249  case kTINYINT:
251  break;
252  case kFLOAT:
253  d.floatval = NULL_FLOAT;
254  break;
255  case kDOUBLE:
257  break;
258  case kTIME:
259  case kTIMESTAMP:
260  case kDATE:
262  break;
263  case kPOINT:
264  case kLINESTRING:
265  case kPOLYGON:
266  case kMULTIPOLYGON:
267  throw std::runtime_error("Internal error: geometry type in NullDatum.");
268  default:
269  throw std::runtime_error("Internal error: invalid type in NullDatum.");
270  }
271  return d;
272 }
273 
275  Datum d;
276  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
277  switch (type) {
278  case kBOOLEAN:
280  break;
281  case kBIGINT:
283  break;
284  case kINT:
286  break;
287  case kSMALLINT:
289  break;
290  case kTINYINT:
292  break;
293  case kFLOAT:
295  break;
296  case kDOUBLE:
298  break;
299  case kTIME:
300  case kTIMESTAMP:
301  case kDATE:
303  break;
304  case kPOINT:
305  case kLINESTRING:
306  case kPOLYGON:
307  case kMULTIPOLYGON:
308  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
309  default:
310  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
311  }
312  return d;
313 }
314 
315 ArrayDatum StringToArray(const std::string& s,
316  const SQLTypeInfo& ti,
317  const CopyParams& copy_params) {
318  SQLTypeInfo elem_ti = ti.get_elem_type();
319  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
320  return ArrayDatum(0, NULL, true);
321  }
322  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
323  LOG(WARNING) << "Malformed array: " << s;
324  return ArrayDatum(0, NULL, true);
325  }
326  std::vector<std::string> elem_strs;
327  size_t last = 1;
328  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
329  i = s.find(copy_params.array_delim, last)) {
330  elem_strs.push_back(s.substr(last, i - last));
331  last = i + 1;
332  }
333  if (last + 1 <= s.size()) {
334  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
335  }
336  if (elem_strs.size() == 1) {
337  auto str = elem_strs.front();
338  auto str_trimmed = trim_space(str.c_str(), str.length());
339  if (str_trimmed == "") {
340  elem_strs.clear(); // Empty array
341  }
342  }
343  if (!elem_ti.is_string()) {
344  size_t len = elem_strs.size() * elem_ti.get_size();
345  int8_t* buf = (int8_t*)checked_malloc(len);
346  int8_t* p = buf;
347  for (auto& es : elem_strs) {
348  auto e = trim_space(es.c_str(), es.length());
349  bool is_null = (e == copy_params.null_str) || e == "NULL";
350  if (!elem_ti.is_string() && e == "") {
351  is_null = true;
352  }
353  if (elem_ti.is_number() || elem_ti.is_time()) {
354  if (!isdigit(e[0]) && e[0] != '-') {
355  is_null = true;
356  }
357  }
358  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
359  p = appendDatum(p, d, elem_ti);
360  }
361  return ArrayDatum(len, buf, false);
362  }
363  // must not be called for array of strings
364  CHECK(false);
365  return ArrayDatum(0, NULL, true);
366 }
367 
369  SQLTypeInfo elem_ti = ti.get_elem_type();
370  auto len = ti.get_size();
371 
372  if (elem_ti.is_string()) {
373  // must not be called for array of strings
374  CHECK(false);
375  return ArrayDatum(0, NULL, true);
376  }
377 
378  if (len > 0) {
379  // Compose a NULL fixlen array
380  int8_t* buf = (int8_t*)checked_malloc(len);
381  // First scalar is a NULL_ARRAY sentinel
382  Datum d = NullArrayDatum(elem_ti);
383  int8_t* p = appendDatum(buf, d, elem_ti);
384  // Rest is filled with normal NULL sentinels
385  Datum d0 = NullDatum(elem_ti);
386  while ((p - buf) < len) {
387  p = appendDatum(p, d0, elem_ti);
388  }
389  CHECK((p - buf) == len);
390  return ArrayDatum(len, buf, true);
391  }
392  // NULL varlen array
393  return ArrayDatum(0, NULL, true);
394 }
395 
397  return NullArray(ti);
398 }
399 
400 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
401  const auto& arr = datum.val.arr_val;
402  for (const auto& elem_datum : arr) {
403  string_vec.push_back(elem_datum.val.str_val);
404  }
405 }
406 
407 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
408  Datum d;
409  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
410  switch (type) {
411  case kBOOLEAN:
412  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
413  break;
414  case kBIGINT:
415  d.bigintval =
416  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
417  break;
418  case kINT:
419  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
420  break;
421  case kSMALLINT:
422  d.smallintval =
423  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
424  break;
425  case kTINYINT:
426  d.tinyintval =
427  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
428  break;
429  case kFLOAT:
430  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
431  break;
432  case kDOUBLE:
433  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
434  break;
435  case kTIME:
436  case kTIMESTAMP:
437  case kDATE:
438  d.bigintval =
439  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
440  break;
441  case kPOINT:
442  case kLINESTRING:
443  case kPOLYGON:
444  case kMULTIPOLYGON:
445  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
446  default:
447  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
448  }
449  return d;
450 }
451 
452 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
453  SQLTypeInfo elem_ti = ti.get_elem_type();
454 
455  CHECK(!elem_ti.is_string());
456 
457  if (datum.is_null) {
458  return NullArray(ti);
459  }
460 
461  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
462  int8_t* buf = (int8_t*)checked_malloc(len);
463  int8_t* p = buf;
464  for (auto& e : datum.val.arr_val) {
465  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
466  }
467 
468  return ArrayDatum(len, buf, false);
469 }
470 
471 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
473  std::vector<std::string_view> string_view_vec;
474  string_view_vec.reserve(string_vec.size());
475  for (const auto& str : string_vec) {
476  if (str.size() > StringDictionary::MAX_STRLEN) {
477  throw std::runtime_error("String too long for dictionary encoding.");
478  }
479  string_view_vec.push_back(str);
480  }
481  switch (column_desc_->columnType.get_size()) {
482  case 1:
483  string_dict_i8_buffer_->resize(string_view_vec.size());
484  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
485  break;
486  case 2:
487  string_dict_i16_buffer_->resize(string_view_vec.size());
488  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
489  break;
490  case 4:
491  string_dict_i32_buffer_->resize(string_view_vec.size());
492  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
493  break;
494  default:
495  CHECK(false);
496  }
497 }
498 
500  const std::string_view val,
501  const bool is_null,
502  const CopyParams& copy_params,
503  const int64_t replicate_count) {
504  set_replicate_count(replicate_count);
505  const auto type = cd->columnType.get_type();
506  switch (type) {
507  case kBOOLEAN: {
508  if (is_null) {
509  if (cd->columnType.get_notnull()) {
510  throw std::runtime_error("NULL for column " + cd->columnName);
511  }
513  } else {
514  auto ti = cd->columnType;
515  Datum d = StringToDatum(val, ti);
516  addBoolean(static_cast<int8_t>(d.boolval));
517  }
518  break;
519  }
520  case kTINYINT: {
521  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
522  auto ti = cd->columnType;
523  Datum d = StringToDatum(val, ti);
525  } else {
526  if (cd->columnType.get_notnull()) {
527  throw std::runtime_error("NULL for column " + cd->columnName);
528  }
530  }
531  break;
532  }
533  case kSMALLINT: {
534  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
535  auto ti = cd->columnType;
536  Datum d = StringToDatum(val, ti);
538  } else {
539  if (cd->columnType.get_notnull()) {
540  throw std::runtime_error("NULL for column " + cd->columnName);
541  }
543  }
544  break;
545  }
546  case kINT: {
547  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
548  auto ti = cd->columnType;
549  Datum d = StringToDatum(val, ti);
550  addInt(d.intval);
551  } else {
552  if (cd->columnType.get_notnull()) {
553  throw std::runtime_error("NULL for column " + cd->columnName);
554  }
556  }
557  break;
558  }
559  case kBIGINT: {
560  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
561  auto ti = cd->columnType;
562  Datum d = StringToDatum(val, ti);
563  addBigint(d.bigintval);
564  } else {
565  if (cd->columnType.get_notnull()) {
566  throw std::runtime_error("NULL for column " + cd->columnName);
567  }
569  }
570  break;
571  }
572  case kDECIMAL:
573  case kNUMERIC: {
574  if (!is_null) {
575  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
576  Datum d = StringToDatum(val, ti);
577  const auto converted_decimal_value =
579  addBigint(converted_decimal_value);
580  } else {
581  if (cd->columnType.get_notnull()) {
582  throw std::runtime_error("NULL for column " + cd->columnName);
583  }
585  }
586  break;
587  }
588  case kFLOAT:
589  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
590  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
591  } else {
592  if (cd->columnType.get_notnull()) {
593  throw std::runtime_error("NULL for column " + cd->columnName);
594  }
596  }
597  break;
598  case kDOUBLE:
599  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
600  addDouble(std::atof(std::string(val).c_str()));
601  } else {
602  if (cd->columnType.get_notnull()) {
603  throw std::runtime_error("NULL for column " + cd->columnName);
604  }
606  }
607  break;
608  case kTEXT:
609  case kVARCHAR:
610  case kCHAR: {
611  // @TODO(wei) for now, use empty string for nulls
612  if (is_null) {
613  if (cd->columnType.get_notnull()) {
614  throw std::runtime_error("NULL for column " + cd->columnName);
615  }
616  addString(std::string());
617  } else {
618  if (val.length() > StringDictionary::MAX_STRLEN) {
619  throw std::runtime_error("String too long for column " + cd->columnName +
620  " was " + std::to_string(val.length()) + " max is " +
622  }
623  addString(val);
624  }
625  break;
626  }
627  case kTIME:
628  case kTIMESTAMP:
629  case kDATE:
630  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
631  SQLTypeInfo ti = cd->columnType;
632  Datum d = StringToDatum(val, ti);
633  addBigint(d.bigintval);
634  } else {
635  if (cd->columnType.get_notnull()) {
636  throw std::runtime_error("NULL for column " + cd->columnName);
637  }
639  }
640  break;
641  case kARRAY: {
642  if (is_null && cd->columnType.get_notnull()) {
643  throw std::runtime_error("NULL for column " + cd->columnName);
644  }
645  SQLTypeInfo ti = cd->columnType;
646  if (IS_STRING(ti.get_subtype())) {
647  std::vector<std::string> string_vec;
648  // Just parse string array, don't push it to buffer yet as we might throw
650  std::string(val), copy_params, string_vec);
651  if (!is_null) {
652  // TODO: add support for NULL string arrays
653  if (ti.get_size() > 0) {
654  auto sti = ti.get_elem_type();
655  size_t expected_size = ti.get_size() / sti.get_size();
656  size_t actual_size = string_vec.size();
657  if (actual_size != expected_size) {
658  throw std::runtime_error("Fixed length array column " + cd->columnName +
659  " expects " + std::to_string(expected_size) +
660  " values, received " +
661  std::to_string(actual_size));
662  }
663  }
664  addStringArray(string_vec);
665  } else {
666  if (ti.get_size() > 0) {
667  // TODO: remove once NULL fixlen arrays are allowed
668  throw std::runtime_error("Fixed length array column " + cd->columnName +
669  " currently cannot accept NULL arrays");
670  }
671  // TODO: add support for NULL string arrays, replace with addStringArray(),
672  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
673  addStringArray(string_vec);
674  }
675  } else {
676  if (!is_null) {
677  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
678  if (d.is_null) { // val could be "NULL"
679  addArray(NullArray(ti));
680  } else {
681  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
682  throw std::runtime_error("Fixed length array for column " + cd->columnName +
683  " has incorrect length: " + std::string(val));
684  }
685  addArray(d);
686  }
687  } else {
688  addArray(NullArray(ti));
689  }
690  }
691  break;
692  }
693  case kPOINT:
694  case kLINESTRING:
695  case kPOLYGON:
696  case kMULTIPOLYGON:
697  addGeoString(val);
698  break;
699  default:
700  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
701  }
702 }
703 
705  const auto type = column_desc_->columnType.is_decimal()
708  switch (type) {
709  case kBOOLEAN:
710  bool_buffer_->pop_back();
711  break;
712  case kTINYINT:
713  tinyint_buffer_->pop_back();
714  break;
715  case kSMALLINT:
716  smallint_buffer_->pop_back();
717  break;
718  case kINT:
719  int_buffer_->pop_back();
720  break;
721  case kBIGINT:
722  bigint_buffer_->pop_back();
723  break;
724  case kFLOAT:
725  float_buffer_->pop_back();
726  break;
727  case kDOUBLE:
728  double_buffer_->pop_back();
729  break;
730  case kTEXT:
731  case kVARCHAR:
732  case kCHAR:
733  string_buffer_->pop_back();
734  break;
735  case kDATE:
736  case kTIME:
737  case kTIMESTAMP:
738  bigint_buffer_->pop_back();
739  break;
740  case kARRAY:
742  string_array_buffer_->pop_back();
743  } else {
744  array_buffer_->pop_back();
745  }
746  break;
747  case kPOINT:
748  case kLINESTRING:
749  case kPOLYGON:
750  case kMULTIPOLYGON:
751  geo_string_buffer_->pop_back();
752  break;
753  default:
754  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
755  }
756 }
757 
758 struct GeoImportException : std::runtime_error {
759  using std::runtime_error::runtime_error;
760 };
761 
762 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
763 template <typename DATA_TYPE>
765  const ColumnDescriptor* cd,
766  const Array& array,
767  std::vector<DATA_TYPE>& buffer,
768  const ArraySliceRange& slice_range,
769  import_export::BadRowsTracker* const bad_rows_tracker) {
770  auto data =
771  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
772  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
773  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
774  if (bad_rows_tracker && cd->columnType.is_geometry()) {
775  f_add_geo_phy_cols = [&](const int64_t row) {
776  // Populate physical columns (ref. DBHandler::load_table)
777  std::vector<double> coords, bounds;
778  std::vector<int> ring_sizes, poly_rings;
779  int render_group = 0;
780  SQLTypeInfo ti;
781  // replace any unexpected exception from getGeoColumns or other
782  // on this path with a GeoImportException so that we wont over
783  // push a null to the logical column...
784  try {
785  SQLTypeInfo import_ti{ti};
786  if (array.IsNull(row)) {
788  import_ti, coords, bounds, ring_sizes, poly_rings, false);
789  } else {
790  arrow_throw_if<GeoImportException>(
792  ti,
793  coords,
794  bounds,
795  ring_sizes,
796  poly_rings,
797  false),
798  error_context(cd, bad_rows_tracker) + "Invalid geometry");
799  arrow_throw_if<GeoImportException>(
800  cd->columnType.get_type() != ti.get_type(),
801  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
802  }
803  auto col_idx_workpad = col_idx; // what a pitfall!!
805  bad_rows_tracker->importer->getCatalog(),
806  cd,
808  col_idx_workpad,
809  coords,
810  bounds,
811  ring_sizes,
812  poly_rings,
813  render_group);
814  } catch (GeoImportException&) {
815  throw;
816  } catch (std::runtime_error& e) {
817  throw GeoImportException(e.what());
818  } catch (const std::exception& e) {
819  throw GeoImportException(e.what());
820  } catch (...) {
821  throw GeoImportException("unknown exception");
822  }
823  };
824  }
825  auto f_mark_a_bad_row = [&](const auto row) {
826  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
827  bad_rows_tracker->rows.insert(row - slice_range.first);
828  };
829  buffer.reserve(slice_range.second - slice_range.first);
830  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
831  try {
832  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
833  f_add_geo_phy_cols(row);
834  } catch (GeoImportException&) {
835  f_mark_a_bad_row(row);
836  } catch (ArrowImporterException&) {
837  // trace bad rows of each column; otherwise rethrow.
838  if (bad_rows_tracker) {
839  *data << nullptr;
840  f_mark_a_bad_row(row);
841  } else {
842  throw;
843  }
844  }
845  }
846  return buffer.size();
847 }
848 
850  const Array& col,
851  const bool exact_type_match,
852  const ArraySliceRange& slice_range,
853  BadRowsTracker* const bad_rows_tracker) {
854  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
855  : cd->columnType.get_type();
856  if (cd->columnType.get_notnull()) {
857  // We can't have any null values for this column; to have them is an error
858  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
859  }
860 
861  switch (type) {
862  case kBOOLEAN:
863  if (exact_type_match) {
864  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
865  }
867  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
868  case kTINYINT:
869  if (exact_type_match) {
870  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
871  }
873  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
874  case kSMALLINT:
875  if (exact_type_match) {
876  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
877  }
879  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
880  case kINT:
881  if (exact_type_match) {
882  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
883  }
885  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
886  case kBIGINT:
887  if (exact_type_match) {
888  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
889  }
891  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
892  case kFLOAT:
893  if (exact_type_match) {
894  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
895  }
897  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
898  case kDOUBLE:
899  if (exact_type_match) {
900  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
901  }
903  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
904  case kTEXT:
905  case kVARCHAR:
906  case kCHAR:
907  if (exact_type_match) {
908  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
909  "Expected string type");
910  }
912  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
913  case kTIME:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
916  "Expected time32 or time64 type");
917  }
919  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
920  case kTIMESTAMP:
921  if (exact_type_match) {
922  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
923  }
925  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
926  case kDATE:
927  if (exact_type_match) {
928  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
929  "Expected date32 or date64 type");
930  }
932  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
933  case kPOINT:
934  case kLINESTRING:
935  case kPOLYGON:
936  case kMULTIPOLYGON:
937  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
938  "Expected string type");
940  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
941  case kARRAY:
942  throw std::runtime_error("Arrow array appends not yet supported");
943  default:
944  throw std::runtime_error("Invalid Type");
945  }
946 }
947 
948 // this is exclusively used by load_table_binary_columnar
949 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
950  size_t dataSize = 0;
951  if (cd->columnType.get_notnull()) {
952  // We can't have any null values for this column; to have them is an error
953  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
954  throw std::runtime_error("NULL for column " + cd->columnName);
955  }
956  }
957 
958  switch (cd->columnType.get_type()) {
959  case kBOOLEAN: {
960  dataSize = col.data.int_col.size();
961  bool_buffer_->reserve(dataSize);
962  for (size_t i = 0; i < dataSize; i++) {
963  if (col.nulls[i]) {
965  } else {
966  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
967  }
968  }
969  break;
970  }
971  case kTINYINT: {
972  dataSize = col.data.int_col.size();
973  tinyint_buffer_->reserve(dataSize);
974  for (size_t i = 0; i < dataSize; i++) {
975  if (col.nulls[i]) {
977  } else {
978  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
979  }
980  }
981  break;
982  }
983  case kSMALLINT: {
984  dataSize = col.data.int_col.size();
985  smallint_buffer_->reserve(dataSize);
986  for (size_t i = 0; i < dataSize; i++) {
987  if (col.nulls[i]) {
989  } else {
990  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
991  }
992  }
993  break;
994  }
995  case kINT: {
996  dataSize = col.data.int_col.size();
997  int_buffer_->reserve(dataSize);
998  for (size_t i = 0; i < dataSize; i++) {
999  if (col.nulls[i]) {
1001  } else {
1002  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1003  }
1004  }
1005  break;
1006  }
1007  case kBIGINT:
1008  case kNUMERIC:
1009  case kDECIMAL: {
1010  dataSize = col.data.int_col.size();
1011  bigint_buffer_->reserve(dataSize);
1012  for (size_t i = 0; i < dataSize; i++) {
1013  if (col.nulls[i]) {
1015  } else {
1016  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1017  }
1018  }
1019  break;
1020  }
1021  case kFLOAT: {
1022  dataSize = col.data.real_col.size();
1023  float_buffer_->reserve(dataSize);
1024  for (size_t i = 0; i < dataSize; i++) {
1025  if (col.nulls[i]) {
1026  float_buffer_->push_back(NULL_FLOAT);
1027  } else {
1028  float_buffer_->push_back((float)col.data.real_col[i]);
1029  }
1030  }
1031  break;
1032  }
1033  case kDOUBLE: {
1034  dataSize = col.data.real_col.size();
1035  double_buffer_->reserve(dataSize);
1036  for (size_t i = 0; i < dataSize; i++) {
1037  if (col.nulls[i]) {
1038  double_buffer_->push_back(NULL_DOUBLE);
1039  } else {
1040  double_buffer_->push_back((double)col.data.real_col[i]);
1041  }
1042  }
1043  break;
1044  }
1045  case kTEXT:
1046  case kVARCHAR:
1047  case kCHAR: {
1048  // TODO: for now, use empty string for nulls
1049  dataSize = col.data.str_col.size();
1050  string_buffer_->reserve(dataSize);
1051  for (size_t i = 0; i < dataSize; i++) {
1052  if (col.nulls[i]) {
1053  string_buffer_->push_back(std::string());
1054  } else {
1055  string_buffer_->push_back(col.data.str_col[i]);
1056  }
1057  }
1058  break;
1059  }
1060  case kTIME:
1061  case kTIMESTAMP:
1062  case kDATE: {
1063  dataSize = col.data.int_col.size();
1064  bigint_buffer_->reserve(dataSize);
1065  for (size_t i = 0; i < dataSize; i++) {
1066  if (col.nulls[i]) {
1068  } else {
1069  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1070  }
1071  }
1072  break;
1073  }
1074  case kPOINT:
1075  case kLINESTRING:
1076  case kPOLYGON:
1077  case kMULTIPOLYGON: {
1078  dataSize = col.data.str_col.size();
1079  geo_string_buffer_->reserve(dataSize);
1080  for (size_t i = 0; i < dataSize; i++) {
1081  if (col.nulls[i]) {
1082  // TODO: add support for NULL geo
1083  geo_string_buffer_->push_back(std::string());
1084  } else {
1085  geo_string_buffer_->push_back(col.data.str_col[i]);
1086  }
1087  }
1088  break;
1089  }
1090  case kARRAY: {
1091  dataSize = col.data.arr_col.size();
1092  if (IS_STRING(cd->columnType.get_subtype())) {
1093  for (size_t i = 0; i < dataSize; i++) {
1094  std::vector<std::string>& string_vec = addStringArray();
1095  if (!col.nulls[i]) {
1096  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1097  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1098  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1099  }
1100  }
1101  }
1102  } else {
1103  auto elem_ti = cd->columnType.get_subtype();
1104  switch (elem_ti) {
1105  case kBOOLEAN: {
1106  for (size_t i = 0; i < dataSize; i++) {
1107  if (col.nulls[i]) {
1109  } else {
1110  size_t len = col.data.arr_col[i].data.int_col.size();
1111  size_t byteSize = len * sizeof(int8_t);
1112  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1113  int8_t* p = buf;
1114  for (size_t j = 0; j < len; ++j) {
1115  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1116  p += sizeof(bool);
1117  }
1118  addArray(ArrayDatum(byteSize, buf, false));
1119  }
1120  }
1121  break;
1122  }
1123  case kTINYINT: {
1124  for (size_t i = 0; i < dataSize; i++) {
1125  if (col.nulls[i]) {
1127  } else {
1128  size_t len = col.data.arr_col[i].data.int_col.size();
1129  size_t byteSize = len * sizeof(int8_t);
1130  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1131  int8_t* p = buf;
1132  for (size_t j = 0; j < len; ++j) {
1133  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1134  p += sizeof(int8_t);
1135  }
1136  addArray(ArrayDatum(byteSize, buf, false));
1137  }
1138  }
1139  break;
1140  }
1141  case kSMALLINT: {
1142  for (size_t i = 0; i < dataSize; i++) {
1143  if (col.nulls[i]) {
1145  } else {
1146  size_t len = col.data.arr_col[i].data.int_col.size();
1147  size_t byteSize = len * sizeof(int16_t);
1148  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1149  int8_t* p = buf;
1150  for (size_t j = 0; j < len; ++j) {
1151  *(int16_t*)p =
1152  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1153  p += sizeof(int16_t);
1154  }
1155  addArray(ArrayDatum(byteSize, buf, false));
1156  }
1157  }
1158  break;
1159  }
1160  case kINT: {
1161  for (size_t i = 0; i < dataSize; i++) {
1162  if (col.nulls[i]) {
1164  } else {
1165  size_t len = col.data.arr_col[i].data.int_col.size();
1166  size_t byteSize = len * sizeof(int32_t);
1167  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1168  int8_t* p = buf;
1169  for (size_t j = 0; j < len; ++j) {
1170  *(int32_t*)p =
1171  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1172  p += sizeof(int32_t);
1173  }
1174  addArray(ArrayDatum(byteSize, buf, false));
1175  }
1176  }
1177  break;
1178  }
1179  case kBIGINT:
1180  case kNUMERIC:
1181  case kDECIMAL: {
1182  for (size_t i = 0; i < dataSize; i++) {
1183  if (col.nulls[i]) {
1185  } else {
1186  size_t len = col.data.arr_col[i].data.int_col.size();
1187  size_t byteSize = len * sizeof(int64_t);
1188  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1189  int8_t* p = buf;
1190  for (size_t j = 0; j < len; ++j) {
1191  *(int64_t*)p =
1192  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1193  p += sizeof(int64_t);
1194  }
1195  addArray(ArrayDatum(byteSize, buf, false));
1196  }
1197  }
1198  break;
1199  }
1200  case kFLOAT: {
1201  for (size_t i = 0; i < dataSize; i++) {
1202  if (col.nulls[i]) {
1204  } else {
1205  size_t len = col.data.arr_col[i].data.real_col.size();
1206  size_t byteSize = len * sizeof(float);
1207  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1208  int8_t* p = buf;
1209  for (size_t j = 0; j < len; ++j) {
1210  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1211  p += sizeof(float);
1212  }
1213  addArray(ArrayDatum(byteSize, buf, false));
1214  }
1215  }
1216  break;
1217  }
1218  case kDOUBLE: {
1219  for (size_t i = 0; i < dataSize; i++) {
1220  if (col.nulls[i]) {
1222  } else {
1223  size_t len = col.data.arr_col[i].data.real_col.size();
1224  size_t byteSize = len * sizeof(double);
1225  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1226  int8_t* p = buf;
1227  for (size_t j = 0; j < len; ++j) {
1228  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1229  p += sizeof(double);
1230  }
1231  addArray(ArrayDatum(byteSize, buf, false));
1232  }
1233  }
1234  break;
1235  }
1236  case kTIME:
1237  case kTIMESTAMP:
1238  case kDATE: {
1239  for (size_t i = 0; i < dataSize; i++) {
1240  if (col.nulls[i]) {
1242  } else {
1243  size_t len = col.data.arr_col[i].data.int_col.size();
1244  size_t byteWidth = sizeof(int64_t);
1245  size_t byteSize = len * byteWidth;
1246  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1247  int8_t* p = buf;
1248  for (size_t j = 0; j < len; ++j) {
1249  *reinterpret_cast<int64_t*>(p) =
1250  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1251  p += sizeof(int64_t);
1252  }
1253  addArray(ArrayDatum(byteSize, buf, false));
1254  }
1255  }
1256  break;
1257  }
1258  default:
1259  throw std::runtime_error("Invalid Array Type");
1260  }
1261  }
1262  break;
1263  }
1264  default:
1265  throw std::runtime_error("Invalid Type");
1266  }
1267  return dataSize;
1268 }
1269 
1271  const TDatum& datum,
1272  const bool is_null,
1273  const int64_t replicate_count) {
1274  set_replicate_count(replicate_count);
1275  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1276  : cd->columnType.get_type();
1277  switch (type) {
1278  case kBOOLEAN: {
1279  if (is_null) {
1280  if (cd->columnType.get_notnull()) {
1281  throw std::runtime_error("NULL for column " + cd->columnName);
1282  }
1284  } else {
1285  addBoolean((int8_t)datum.val.int_val);
1286  }
1287  break;
1288  }
1289  case kTINYINT:
1290  if (!is_null) {
1291  addTinyint((int8_t)datum.val.int_val);
1292  } else {
1293  if (cd->columnType.get_notnull()) {
1294  throw std::runtime_error("NULL for column " + cd->columnName);
1295  }
1297  }
1298  break;
1299  case kSMALLINT:
1300  if (!is_null) {
1301  addSmallint((int16_t)datum.val.int_val);
1302  } else {
1303  if (cd->columnType.get_notnull()) {
1304  throw std::runtime_error("NULL for column " + cd->columnName);
1305  }
1307  }
1308  break;
1309  case kINT:
1310  if (!is_null) {
1311  addInt((int32_t)datum.val.int_val);
1312  } else {
1313  if (cd->columnType.get_notnull()) {
1314  throw std::runtime_error("NULL for column " + cd->columnName);
1315  }
1317  }
1318  break;
1319  case kBIGINT:
1320  if (!is_null) {
1321  addBigint(datum.val.int_val);
1322  } else {
1323  if (cd->columnType.get_notnull()) {
1324  throw std::runtime_error("NULL for column " + cd->columnName);
1325  }
1327  }
1328  break;
1329  case kFLOAT:
1330  if (!is_null) {
1331  addFloat((float)datum.val.real_val);
1332  } else {
1333  if (cd->columnType.get_notnull()) {
1334  throw std::runtime_error("NULL for column " + cd->columnName);
1335  }
1337  }
1338  break;
1339  case kDOUBLE:
1340  if (!is_null) {
1341  addDouble(datum.val.real_val);
1342  } else {
1343  if (cd->columnType.get_notnull()) {
1344  throw std::runtime_error("NULL for column " + cd->columnName);
1345  }
1347  }
1348  break;
1349  case kTEXT:
1350  case kVARCHAR:
1351  case kCHAR: {
1352  // @TODO(wei) for now, use empty string for nulls
1353  if (is_null) {
1354  if (cd->columnType.get_notnull()) {
1355  throw std::runtime_error("NULL for column " + cd->columnName);
1356  }
1357  addString(std::string());
1358  } else {
1359  addString(datum.val.str_val);
1360  }
1361  break;
1362  }
1363  case kTIME:
1364  case kTIMESTAMP:
1365  case kDATE: {
1366  if (!is_null) {
1367  addBigint(datum.val.int_val);
1368  } else {
1369  if (cd->columnType.get_notnull()) {
1370  throw std::runtime_error("NULL for column " + cd->columnName);
1371  }
1373  }
1374  break;
1375  }
1376  case kARRAY:
1377  if (is_null && cd->columnType.get_notnull()) {
1378  throw std::runtime_error("NULL for column " + cd->columnName);
1379  }
1380  if (IS_STRING(cd->columnType.get_subtype())) {
1381  std::vector<std::string>& string_vec = addStringArray();
1382  addBinaryStringArray(datum, string_vec);
1383  } else {
1384  if (!is_null) {
1385  addArray(TDatumToArrayDatum(datum, cd->columnType));
1386  } else {
1388  }
1389  }
1390  break;
1391  case kPOINT:
1392  case kLINESTRING:
1393  case kPOLYGON:
1394  case kMULTIPOLYGON:
1395  if (is_null) {
1396  if (cd->columnType.get_notnull()) {
1397  throw std::runtime_error("NULL for column " + cd->columnName);
1398  }
1399  addGeoString(std::string());
1400  } else {
1401  addGeoString(datum.val.str_val);
1402  }
1403  break;
1404  default:
1405  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1406  }
1407 }
1408 
1409 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1410  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1411  return false;
1412  }
1413  // we don't need to do any coordinate-system transformation
1414  // here (yet) so we don't need to use any OGR API or types
1415  // just use the values directly (assumed to be in 4326)
1416  coords.push_back(lon);
1417  coords.push_back(lat);
1418  return true;
1419 }
1420 
1422  const Catalog_Namespace::Catalog& catalog,
1423  const ColumnDescriptor* cd,
1424  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1425  size_t& col_idx,
1426  std::vector<double>& coords,
1427  std::vector<double>& bounds,
1428  std::vector<int>& ring_sizes,
1429  std::vector<int>& poly_rings,
1430  int render_group,
1431  const int64_t replicate_count) {
1432  const auto col_ti = cd->columnType;
1433  const auto col_type = col_ti.get_type();
1434  auto columnId = cd->columnId;
1435  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1436  bool is_null_geo = false;
1437  bool is_null_point = false;
1438  if (!col_ti.get_notnull()) {
1439  // Check for NULL geo
1440  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1441  is_null_point = true;
1442  coords.clear();
1443  }
1444  is_null_geo = coords.empty();
1445  if (is_null_point) {
1446  coords.push_back(NULL_ARRAY_DOUBLE);
1447  coords.push_back(NULL_DOUBLE);
1448  // Treating POINT coords as notnull, need to store actual encoding
1449  // [un]compressed+[not]null
1450  is_null_geo = false;
1451  }
1452  }
1453  std::vector<TDatum> td_coords_data;
1454  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1455  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1456  // in a fixlen array, compressed and uncompressed.
1457  if (!is_null_geo) {
1458  std::vector<uint8_t> compressed_coords = geospatial::compress_coords(coords, col_ti);
1459  for (auto cc : compressed_coords) {
1460  TDatum td_byte;
1461  td_byte.val.int_val = cc;
1462  td_coords_data.push_back(td_byte);
1463  }
1464  }
1465  TDatum tdd_coords;
1466  tdd_coords.val.arr_val = td_coords_data;
1467  tdd_coords.is_null = is_null_geo;
1468  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1469 
1470  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1471  // Create ring_sizes array value and add it to the physical column
1472  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1473  std::vector<TDatum> td_ring_sizes;
1474  if (!is_null_geo) {
1475  for (auto ring_size : ring_sizes) {
1476  TDatum td_ring_size;
1477  td_ring_size.val.int_val = ring_size;
1478  td_ring_sizes.push_back(td_ring_size);
1479  }
1480  }
1481  TDatum tdd_ring_sizes;
1482  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1483  tdd_ring_sizes.is_null = is_null_geo;
1484  import_buffers[col_idx++]->add_value(
1485  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1486  }
1487 
1488  if (col_type == kMULTIPOLYGON) {
1489  // Create poly_rings array value and add it to the physical column
1490  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1491  std::vector<TDatum> td_poly_rings;
1492  if (!is_null_geo) {
1493  for (auto num_rings : poly_rings) {
1494  TDatum td_num_rings;
1495  td_num_rings.val.int_val = num_rings;
1496  td_poly_rings.push_back(td_num_rings);
1497  }
1498  }
1499  TDatum tdd_poly_rings;
1500  tdd_poly_rings.val.arr_val = td_poly_rings;
1501  tdd_poly_rings.is_null = is_null_geo;
1502  import_buffers[col_idx++]->add_value(
1503  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1504  }
1505 
1506  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1507  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1508  std::vector<TDatum> td_bounds_data;
1509  if (!is_null_geo) {
1510  for (auto b : bounds) {
1511  TDatum td_double;
1512  td_double.val.real_val = b;
1513  td_bounds_data.push_back(td_double);
1514  }
1515  }
1516  TDatum tdd_bounds;
1517  tdd_bounds.val.arr_val = td_bounds_data;
1518  tdd_bounds.is_null = is_null_geo;
1519  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1520  }
1521 
1522  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1523  // Create render_group value and add it to the physical column
1524  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1525  TDatum td_render_group;
1526  td_render_group.val.int_val = render_group;
1527  td_render_group.is_null = is_null_geo;
1528  import_buffers[col_idx++]->add_value(
1529  cd_render_group, td_render_group, false, replicate_count);
1530  }
1531 }
1532 
1534  const Catalog_Namespace::Catalog& catalog,
1535  const ColumnDescriptor* cd,
1536  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1537  size_t& col_idx,
1538  std::vector<std::vector<double>>& coords_column,
1539  std::vector<std::vector<double>>& bounds_column,
1540  std::vector<std::vector<int>>& ring_sizes_column,
1541  std::vector<std::vector<int>>& poly_rings_column,
1542  int render_group,
1543  const int64_t replicate_count) {
1544  const auto col_ti = cd->columnType;
1545  const auto col_type = col_ti.get_type();
1546  auto columnId = cd->columnId;
1547 
1548  auto coords_row_count = coords_column.size();
1549  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1550  for (auto coords : coords_column) {
1551  bool is_null_geo = false;
1552  bool is_null_point = false;
1553  if (!col_ti.get_notnull()) {
1554  // Check for NULL geo
1555  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1556  is_null_point = true;
1557  coords.clear();
1558  }
1559  is_null_geo = coords.empty();
1560  if (is_null_point) {
1561  coords.push_back(NULL_ARRAY_DOUBLE);
1562  coords.push_back(NULL_DOUBLE);
1563  // Treating POINT coords as notnull, need to store actual encoding
1564  // [un]compressed+[not]null
1565  is_null_geo = false;
1566  }
1567  }
1568  std::vector<TDatum> td_coords_data;
1569  if (!is_null_geo) {
1570  std::vector<uint8_t> compressed_coords =
1571  geospatial::compress_coords(coords, col_ti);
1572  for (auto cc : compressed_coords) {
1573  TDatum td_byte;
1574  td_byte.val.int_val = cc;
1575  td_coords_data.push_back(td_byte);
1576  }
1577  }
1578  TDatum tdd_coords;
1579  tdd_coords.val.arr_val = td_coords_data;
1580  tdd_coords.is_null = is_null_geo;
1581  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1582  }
1583  col_idx++;
1584 
1585  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1586  if (ring_sizes_column.size() != coords_row_count) {
1587  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1588  }
1589  // Create ring_sizes array value and add it to the physical column
1590  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1591  for (auto ring_sizes : ring_sizes_column) {
1592  bool is_null_geo = false;
1593  if (!col_ti.get_notnull()) {
1594  // Check for NULL geo
1595  is_null_geo = ring_sizes.empty();
1596  }
1597  std::vector<TDatum> td_ring_sizes;
1598  for (auto ring_size : ring_sizes) {
1599  TDatum td_ring_size;
1600  td_ring_size.val.int_val = ring_size;
1601  td_ring_sizes.push_back(td_ring_size);
1602  }
1603  TDatum tdd_ring_sizes;
1604  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1605  tdd_ring_sizes.is_null = is_null_geo;
1606  import_buffers[col_idx]->add_value(
1607  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1608  }
1609  col_idx++;
1610  }
1611 
1612  if (col_type == kMULTIPOLYGON) {
1613  if (poly_rings_column.size() != coords_row_count) {
1614  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1615  }
1616  // Create poly_rings array value and add it to the physical column
1617  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1618  for (auto poly_rings : poly_rings_column) {
1619  bool is_null_geo = false;
1620  if (!col_ti.get_notnull()) {
1621  // Check for NULL geo
1622  is_null_geo = poly_rings.empty();
1623  }
1624  std::vector<TDatum> td_poly_rings;
1625  for (auto num_rings : poly_rings) {
1626  TDatum td_num_rings;
1627  td_num_rings.val.int_val = num_rings;
1628  td_poly_rings.push_back(td_num_rings);
1629  }
1630  TDatum tdd_poly_rings;
1631  tdd_poly_rings.val.arr_val = td_poly_rings;
1632  tdd_poly_rings.is_null = is_null_geo;
1633  import_buffers[col_idx]->add_value(
1634  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1635  }
1636  col_idx++;
1637  }
1638 
1639  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1640  if (bounds_column.size() != coords_row_count) {
1641  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1642  }
1643  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1644  for (auto bounds : bounds_column) {
1645  bool is_null_geo = false;
1646  if (!col_ti.get_notnull()) {
1647  // Check for NULL geo
1648  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1649  }
1650  std::vector<TDatum> td_bounds_data;
1651  for (auto b : bounds) {
1652  TDatum td_double;
1653  td_double.val.real_val = b;
1654  td_bounds_data.push_back(td_double);
1655  }
1656  TDatum tdd_bounds;
1657  tdd_bounds.val.arr_val = td_bounds_data;
1658  tdd_bounds.is_null = is_null_geo;
1659  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1660  }
1661  col_idx++;
1662  }
1663 
1664  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1665  // Create render_group value and add it to the physical column
1666  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1667  TDatum td_render_group;
1668  td_render_group.val.int_val = render_group;
1669  td_render_group.is_null = false;
1670  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1671  import_buffers[col_idx]->add_value(
1672  cd_render_group, td_render_group, false, replicate_count);
1673  }
1674  col_idx++;
1675  }
1676 }
1677 
1678 namespace {
1679 
1680 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1681  const std::list<const ColumnDescriptor*>& col_descs) {
1682  // validate the columns
1683  // for now we can only explode into a single destination column
1684  // which must be of the child type (POLYGON, LINESTRING, POINT)
1685  int collection_col_idx = -1;
1686  int col_idx = 0;
1687  std::string collection_col_name;
1688  SQLTypes collection_child_type = kNULLT;
1689  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1690  auto const& cd = *cd_it;
1691  auto const col_type = cd->columnType.get_type();
1692  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1693  if (collection_col_idx >= 0) {
1694  throw std::runtime_error(
1695  "Explode Collections: Found more than one destination column");
1696  }
1697  collection_col_idx = col_idx;
1698  collection_child_type = col_type;
1699  collection_col_name = cd->columnName;
1700  }
1701  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1702  ++cd_it;
1703  }
1704  col_idx++;
1705  }
1706  if (collection_col_idx < 0) {
1707  throw std::runtime_error(
1708  "Explode Collections: Failed to find a supported column type to explode "
1709  "into");
1710  }
1711  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1712 }
1713 
1715  OGRGeometry* ogr_geometry,
1716  SQLTypes collection_child_type,
1717  const std::string& collection_col_name,
1718  size_t row_or_feature_idx,
1719  std::function<void(OGRGeometry*)> execute_import_lambda) {
1720  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1721  bool is_collection = false;
1722  switch (collection_child_type) {
1723  case kPOINT:
1724  switch (ogr_geometry_type) {
1725  case wkbMultiPoint:
1726  is_collection = true;
1727  break;
1728  case wkbPoint:
1729  break;
1730  default:
1731  throw std::runtime_error(
1732  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1733  }
1734  break;
1735  case kLINESTRING:
1736  switch (ogr_geometry_type) {
1737  case wkbMultiLineString:
1738  is_collection = true;
1739  break;
1740  case wkbLineString:
1741  break;
1742  default:
1743  throw std::runtime_error(
1744  "Explode Collections: Source geo type must be MULTILINESTRING or "
1745  "LINESTRING");
1746  }
1747  break;
1748  case kPOLYGON:
1749  switch (ogr_geometry_type) {
1750  case wkbMultiPolygon:
1751  is_collection = true;
1752  break;
1753  case wkbPolygon:
1754  break;
1755  default:
1756  throw std::runtime_error(
1757  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1758  }
1759  break;
1760  default:
1761  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1762  }
1763 
1764  int64_t us = 0LL;
1765 
1766  // explode or just import
1767  if (is_collection) {
1768  // cast to collection
1769  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1770  CHECK(collection_geometry);
1771 
1772 #if LOG_EXPLODE_COLLECTIONS
1773  // log number of children
1774  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1775  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1776  << " child rows";
1777 #endif
1778 
1779  // loop over children
1780  uint32_t child_geometry_count = 0;
1781  auto child_geometry_it = collection_geometry->begin();
1782  while (child_geometry_it != collection_geometry->end()) {
1783  // get and import this child
1784  OGRGeometry* import_geometry = *child_geometry_it;
1786  [&] { execute_import_lambda(import_geometry); });
1787 
1788  // next child
1789  child_geometry_it++;
1790  child_geometry_count++;
1791  }
1792  } else {
1793  // import non-collection row just once
1795  [&] { execute_import_lambda(ogr_geometry); });
1796  }
1797 
1798  // done
1799  return us;
1800 }
1801 
1802 } // namespace
1803 
1805  int thread_id,
1806  Importer* importer,
1807  std::unique_ptr<char[]> scratch_buffer,
1808  size_t begin_pos,
1809  size_t end_pos,
1810  size_t total_size,
1811  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1812  size_t first_row_index_this_buffer) {
1813  ImportStatus import_status;
1814  int64_t total_get_row_time_us = 0;
1815  int64_t total_str_to_val_time_us = 0;
1816  CHECK(scratch_buffer);
1817  auto buffer = scratch_buffer.get();
1818  auto load_ms = measure<>::execution([]() {});
1819  auto ms = measure<>::execution([&]() {
1820  const CopyParams& copy_params = importer->get_copy_params();
1821  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1822  size_t begin =
1823  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
1824  const char* thread_buf = buffer + begin_pos + begin;
1825  const char* thread_buf_end = buffer + end_pos;
1826  const char* buf_end = buffer + total_size;
1827  bool try_single_thread = false;
1828  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1829  importer->get_import_buffers(thread_id);
1831  int phys_cols = 0;
1832  int point_cols = 0;
1833  for (const auto cd : col_descs) {
1834  const auto& col_ti = cd->columnType;
1835  phys_cols += col_ti.get_physical_cols();
1836  if (cd->columnType.get_type() == kPOINT) {
1837  point_cols++;
1838  }
1839  }
1840  auto num_cols = col_descs.size() - phys_cols;
1841  for (const auto& p : import_buffers) {
1842  p->clear();
1843  }
1844  std::vector<std::string_view> row;
1845  size_t row_index_plus_one = 0;
1846  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1847  row.clear();
1848  std::vector<std::unique_ptr<char[]>>
1849  tmp_buffers; // holds string w/ removed escape chars, etc
1850  if (DEBUG_TIMING) {
1853  thread_buf_end,
1854  buf_end,
1855  copy_params,
1856  importer->get_is_array(),
1857  row,
1858  tmp_buffers,
1859  try_single_thread);
1860  });
1861  total_get_row_time_us += us;
1862  } else {
1864  thread_buf_end,
1865  buf_end,
1866  copy_params,
1867  importer->get_is_array(),
1868  row,
1869  tmp_buffers,
1870  try_single_thread);
1871  }
1872  row_index_plus_one++;
1873  // Each POINT could consume two separate coords instead of a single WKT
1874  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1875  import_status.rows_rejected++;
1876  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1877  << row.size() << "): " << shared::printContainer(row);
1878  if (import_status.rows_rejected > copy_params.max_reject) {
1879  break;
1880  }
1881  continue;
1882  }
1883 
1884  //
1885  // lambda for importing a row (perhaps multiple times if exploding a collection)
1886  //
1887 
1888  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1889  size_t import_idx = 0;
1890  size_t col_idx = 0;
1891  try {
1892  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1893  auto cd = *cd_it;
1894  const auto& col_ti = cd->columnType;
1895 
1896  bool is_null =
1897  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1898  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1899  // So initially nullness may be missed and not passed to add_value,
1900  // which then might also check and still decide it's actually a NULL, e.g.
1901  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1902  // So "NULL" is not recognized as NULL but then it's not recognized as
1903  // a valid kINT, so it's a NULL after all.
1904  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1905 
1906  // Treating empty as NULL
1907  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1908  is_null = true;
1909  }
1910 
1911  if (col_ti.get_physical_cols() == 0) {
1912  // not geo
1913 
1914  import_buffers[col_idx]->add_value(
1915  cd, row[import_idx], is_null, copy_params);
1916 
1917  // next
1918  ++import_idx;
1919  ++col_idx;
1920  } else {
1921  // geo
1922 
1923  // store null string in the base column
1924  import_buffers[col_idx]->add_value(
1925  cd, copy_params.null_str, true, copy_params);
1926 
1927  // WKT from string we're not storing
1928  auto const& wkt = row[import_idx];
1929 
1930  // next
1931  ++import_idx;
1932  ++col_idx;
1933 
1934  SQLTypes col_type = col_ti.get_type();
1935  CHECK(IS_GEO(col_type));
1936 
1937  std::vector<double> coords;
1938  std::vector<double> bounds;
1939  std::vector<int> ring_sizes;
1940  std::vector<int> poly_rings;
1941  int render_group = 0;
1942 
1943  if (!is_null && col_type == kPOINT && wkt.size() > 0 &&
1944  (wkt[0] == '.' || isdigit(wkt[0]) || wkt[0] == '-')) {
1945  // Invalid WKT, looks more like a scalar.
1946  // Try custom POINT import: from two separate scalars rather than WKT
1947  // string
1948  double lon = std::atof(std::string(wkt).c_str());
1949  double lat = NAN;
1950  auto lat_str = row[import_idx];
1951  ++import_idx;
1952  if (lat_str.size() > 0 &&
1953  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1954  lat = std::atof(std::string(lat_str).c_str());
1955  }
1956  // Swap coordinates if this table uses a reverse order: lat/lon
1957  if (!copy_params.lonlat) {
1958  std::swap(lat, lon);
1959  }
1960  // TODO: should check if POINT column should have been declared with
1961  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1962  // throw std::runtime_error("POINT column " + cd->columnName + " is
1963  // not WGS84, cannot insert lon/lat");
1964  // }
1965  if (!importGeoFromLonLat(lon, lat, coords)) {
1966  throw std::runtime_error(
1967  "Cannot read lon/lat to insert into POINT column " +
1968  cd->columnName);
1969  }
1970  } else {
1971  // import it
1972  SQLTypeInfo import_ti{col_ti};
1973  if (is_null) {
1974  if (col_ti.get_notnull()) {
1975  throw std::runtime_error("NULL geo for column " + cd->columnName);
1976  }
1978  import_ti,
1979  coords,
1980  bounds,
1981  ring_sizes,
1982  poly_rings,
1984  } else {
1985  if (import_geometry) {
1986  // geometry already exploded
1988  import_geometry,
1989  import_ti,
1990  coords,
1991  bounds,
1992  ring_sizes,
1993  poly_rings,
1995  std::string msg =
1996  "Failed to extract valid geometry from exploded row " +
1997  std::to_string(first_row_index_this_buffer +
1998  row_index_plus_one) +
1999  " for column " + cd->columnName;
2000  throw std::runtime_error(msg);
2001  }
2002  } else {
2003  // extract geometry directly from WKT
2005  std::string(wkt),
2006  import_ti,
2007  coords,
2008  bounds,
2009  ring_sizes,
2010  poly_rings,
2012  std::string msg = "Failed to extract valid geometry from row " +
2013  std::to_string(first_row_index_this_buffer +
2014  row_index_plus_one) +
2015  " for column " + cd->columnName;
2016  throw std::runtime_error(msg);
2017  }
2018  }
2019 
2020  // validate types
2021  if (col_type != import_ti.get_type()) {
2023  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2024  col_type == SQLTypes::kMULTIPOLYGON)) {
2025  throw std::runtime_error(
2026  "Imported geometry doesn't match the type of column " +
2027  cd->columnName);
2028  }
2029  }
2030  }
2031 
2032  // assign render group?
2033  if (columnIdToRenderGroupAnalyzerMap.size()) {
2034  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2035  if (ring_sizes.size()) {
2036  // get a suitable render group for these poly coords
2037  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2038  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2039  render_group =
2040  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2041  } else {
2042  // empty poly
2043  render_group = -1;
2044  }
2045  }
2046  }
2047  }
2048 
2049  // import extracted geo
2051  cd,
2052  import_buffers,
2053  col_idx,
2054  coords,
2055  bounds,
2056  ring_sizes,
2057  poly_rings,
2058  render_group);
2059 
2060  // skip remaining physical columns
2061  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2062  ++cd_it;
2063  }
2064  }
2065  }
2066  import_status.rows_completed++;
2067  } catch (const std::exception& e) {
2068  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2069  import_buffers[col_idx_to_pop]->pop_value();
2070  }
2071  import_status.rows_rejected++;
2072  LOG(ERROR) << "Input exception thrown: " << e.what()
2073  << ". Row discarded. Data: " << shared::printContainer(row);
2074  }
2075  };
2076 
2077  if (copy_params.geo_explode_collections) {
2078  // explode and import
2079  // @TODO(se) convert to structure-bindings when we can use C++17 here
2080  auto collection_idx_type_name = explode_collections_step1(col_descs);
2081  int collection_col_idx = std::get<0>(collection_idx_type_name);
2082  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2083  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2084  // pull out the collection WKT
2085  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2086  auto const& collection_wkt = row[collection_col_idx];
2087  // convert to OGR
2088  OGRGeometry* ogr_geometry = nullptr;
2089  ScopeGuard destroy_ogr_geometry = [&] {
2090  if (ogr_geometry) {
2091  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2092  }
2093  };
2094  OGRErr ogr_status = OGRGeometryFactory::createFromWkt(
2095  collection_wkt.data(), nullptr, &ogr_geometry);
2096  if (ogr_status != OGRERR_NONE) {
2097  throw std::runtime_error("Failed to convert WKT to geometry");
2098  }
2099  // do the explode and import
2100  us = explode_collections_step2(ogr_geometry,
2101  collection_child_type,
2102  collection_col_name,
2103  first_row_index_this_buffer + row_index_plus_one,
2104  execute_import_row);
2105  } else {
2106  // import non-collection row just once
2108  [&] { execute_import_row(nullptr); });
2109  }
2110  total_str_to_val_time_us += us;
2111  } // end thread
2112  if (import_status.rows_completed > 0) {
2113  load_ms = measure<>::execution(
2114  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2115  }
2116  });
2117  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2118  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2119  << import_status.rows_completed << " rows inserted in "
2120  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2121  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2122  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2123  << "sec" << std::endl;
2124  }
2125 
2126  import_status.thread_id = thread_id;
2127  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2128 
2129  return import_status;
2130 }
2131 
2133  int thread_id,
2134  Importer* importer,
2135  OGRSpatialReference* poGeographicSR,
2136  const FeaturePtrVector& features,
2137  size_t firstFeature,
2138  size_t numFeatures,
2139  const FieldNameToIndexMapType& fieldNameToIndexMap,
2140  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2141  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2142  ImportStatus import_status;
2143  const CopyParams& copy_params = importer->get_copy_params();
2144  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2145  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2146  importer->get_import_buffers(thread_id);
2147 
2148  for (const auto& p : import_buffers) {
2149  p->clear();
2150  }
2151 
2152  auto convert_timer = timer_start();
2153 
2154  // we create this on the fly based on the first feature's SR
2155  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2156 
2157  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2158  if (!features[iFeature]) {
2159  continue;
2160  }
2161 
2162  // get this feature's geometry
2163  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2164  if (pGeometry) {
2165  // for geodatabase, we need to consider features with no geometry
2166  // as we still want to create a table, even if it has no geo column
2167 
2168  // transform it
2169  // avoid GDAL error if not transformable
2170  auto geometry_sr = pGeometry->getSpatialReference();
2171  if (geometry_sr) {
2172  // create an OGRCoordinateTransformation (CT) on the fly
2173  // we must assume that all geo in this file will have
2174  // the same source SR, so the CT will be valid for all
2175  // transforming to a reusable CT is faster than to an SR
2176  if (coordinate_transformation == nullptr) {
2177  coordinate_transformation.reset(
2178  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2179  if (coordinate_transformation == nullptr) {
2180  throw std::runtime_error(
2181  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2182  }
2183  }
2184  pGeometry->transform(coordinate_transformation.get());
2185  }
2186  }
2187 
2188  //
2189  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2190  //
2191 
2192  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2193  size_t col_idx = 0;
2194  try {
2195  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2196  auto cd = *cd_it;
2197 
2198  // is this a geo column?
2199  const auto& col_ti = cd->columnType;
2200  if (col_ti.is_geometry()) {
2201  // Note that this assumes there is one and only one geo column in the table.
2202  // Currently, the importer only supports reading a single geospatial feature
2203  // from an input shapefile / geojson file, but this code will need to be
2204  // modified if that changes
2205  SQLTypes col_type = col_ti.get_type();
2206 
2207  // store null string in the base column
2208  import_buffers[col_idx]->add_value(
2209  cd, copy_params.null_str, true, copy_params);
2210  ++col_idx;
2211 
2212  // the data we now need to extract for the other columns
2213  std::vector<double> coords;
2214  std::vector<double> bounds;
2215  std::vector<int> ring_sizes;
2216  std::vector<int> poly_rings;
2217  int render_group = 0;
2218 
2219  // extract it
2220  SQLTypeInfo import_ti{col_ti};
2221  bool is_null_geo = !import_geometry;
2222  if (is_null_geo) {
2223  if (col_ti.get_notnull()) {
2224  throw std::runtime_error("NULL geo for column " + cd->columnName);
2225  }
2227  import_ti,
2228  coords,
2229  bounds,
2230  ring_sizes,
2231  poly_rings,
2233  } else {
2235  import_geometry,
2236  import_ti,
2237  coords,
2238  bounds,
2239  ring_sizes,
2240  poly_rings,
2242  std::string msg = "Failed to extract valid geometry from feature " +
2243  std::to_string(firstFeature + iFeature + 1) +
2244  " for column " + cd->columnName;
2245  throw std::runtime_error(msg);
2246  }
2247 
2248  // validate types
2249  if (col_type != import_ti.get_type()) {
2251  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2252  col_type == SQLTypes::kMULTIPOLYGON)) {
2253  throw std::runtime_error(
2254  "Imported geometry doesn't match the type of column " +
2255  cd->columnName);
2256  }
2257  }
2258  }
2259 
2260  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2261  if (ring_sizes.size()) {
2262  // get a suitable render group for these poly coords
2263  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2264  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2265  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2266  } else {
2267  // empty poly
2268  render_group = -1;
2269  }
2270  }
2271 
2272  // create coords array value and add it to the physical column
2273  ++cd_it;
2274  auto cd_coords = *cd_it;
2275  std::vector<TDatum> td_coord_data;
2276  if (!is_null_geo) {
2277  std::vector<uint8_t> compressed_coords =
2278  geospatial::compress_coords(coords, col_ti);
2279  for (auto cc : compressed_coords) {
2280  TDatum td_byte;
2281  td_byte.val.int_val = cc;
2282  td_coord_data.push_back(td_byte);
2283  }
2284  }
2285  TDatum tdd_coords;
2286  tdd_coords.val.arr_val = td_coord_data;
2287  tdd_coords.is_null = is_null_geo;
2288  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2289  ++col_idx;
2290 
2291  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2292  // Create ring_sizes array value and add it to the physical column
2293  ++cd_it;
2294  auto cd_ring_sizes = *cd_it;
2295  std::vector<TDatum> td_ring_sizes;
2296  if (!is_null_geo) {
2297  for (auto ring_size : ring_sizes) {
2298  TDatum td_ring_size;
2299  td_ring_size.val.int_val = ring_size;
2300  td_ring_sizes.push_back(td_ring_size);
2301  }
2302  }
2303  TDatum tdd_ring_sizes;
2304  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2305  tdd_ring_sizes.is_null = is_null_geo;
2306  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2307  ++col_idx;
2308  }
2309 
2310  if (col_type == kMULTIPOLYGON) {
2311  // Create poly_rings array value and add it to the physical column
2312  ++cd_it;
2313  auto cd_poly_rings = *cd_it;
2314  std::vector<TDatum> td_poly_rings;
2315  if (!is_null_geo) {
2316  for (auto num_rings : poly_rings) {
2317  TDatum td_num_rings;
2318  td_num_rings.val.int_val = num_rings;
2319  td_poly_rings.push_back(td_num_rings);
2320  }
2321  }
2322  TDatum tdd_poly_rings;
2323  tdd_poly_rings.val.arr_val = td_poly_rings;
2324  tdd_poly_rings.is_null = is_null_geo;
2325  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2326  ++col_idx;
2327  }
2328 
2329  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2330  col_type == kMULTIPOLYGON) {
2331  // Create bounds array value and add it to the physical column
2332  ++cd_it;
2333  auto cd_bounds = *cd_it;
2334  std::vector<TDatum> td_bounds_data;
2335  if (!is_null_geo) {
2336  for (auto b : bounds) {
2337  TDatum td_double;
2338  td_double.val.real_val = b;
2339  td_bounds_data.push_back(td_double);
2340  }
2341  }
2342  TDatum tdd_bounds;
2343  tdd_bounds.val.arr_val = td_bounds_data;
2344  tdd_bounds.is_null = is_null_geo;
2345  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2346  ++col_idx;
2347  }
2348 
2349  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2350  // Create render_group value and add it to the physical column
2351  ++cd_it;
2352  auto cd_render_group = *cd_it;
2353  TDatum td_render_group;
2354  td_render_group.val.int_val = render_group;
2355  td_render_group.is_null = is_null_geo;
2356  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2357  ++col_idx;
2358  }
2359  } else {
2360  // regular column
2361  // pull from GDAL metadata
2362  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2363  CHECK(cit != columnNameToSourceNameMap.end());
2364  auto const& field_name = cit->second;
2365 
2366  auto const fit = fieldNameToIndexMap.find(field_name);
2367  CHECK(fit != fieldNameToIndexMap.end());
2368  auto const& field_index = fit->second;
2369  CHECK(field_index < fieldNameToIndexMap.size());
2370 
2371  auto const& feature = features[iFeature];
2372 
2373  auto field_defn = feature->GetFieldDefnRef(field_index);
2374  CHECK(field_defn);
2375 
2376  // OGRFeature::GetFieldAsString() can only return 80 characters
2377  // so for array columns, we are obliged to fetch the actual values
2378  // and construct the concatenated string ourselves
2379 
2380  std::string value_string;
2381  int array_index = 0, array_size = 0;
2382 
2383  auto stringify_numeric_list = [&](auto* values) {
2384  value_string = "{";
2385  while (array_index < array_size) {
2386  auto separator = (array_index > 0) ? "," : "";
2387  value_string += separator + std::to_string(values[array_index]);
2388  array_index++;
2389  }
2390  value_string += "}";
2391  };
2392 
2393  auto field_type = field_defn->GetType();
2394  switch (field_type) {
2395  case OFTInteger:
2396  case OFTInteger64:
2397  case OFTReal:
2398  case OFTString:
2399  case OFTBinary:
2400  case OFTDate:
2401  case OFTTime:
2402  case OFTDateTime: {
2403  value_string = feature->GetFieldAsString(field_index);
2404  } break;
2405  case OFTIntegerList: {
2406  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2407  stringify_numeric_list(values);
2408  } break;
2409  case OFTInteger64List: {
2410  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2411  stringify_numeric_list(values);
2412  } break;
2413  case OFTRealList: {
2414  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2415  stringify_numeric_list(values);
2416  } break;
2417  case OFTStringList: {
2418  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2419  value_string = "{";
2420  if (array_of_strings) {
2421  while (auto* this_string = array_of_strings[array_index]) {
2422  auto separator = (array_index > 0) ? "," : "";
2423  value_string += separator + std::string(this_string);
2424  array_index++;
2425  }
2426  }
2427  value_string += "}";
2428  } break;
2429  default:
2430  throw std::runtime_error("Unsupported geo file field type (" +
2431  std::to_string(static_cast<int>(field_type)) +
2432  ")");
2433  }
2434 
2435  static CopyParams default_copy_params;
2436  import_buffers[col_idx]->add_value(
2437  cd, value_string, false, default_copy_params);
2438  ++col_idx;
2439  }
2440  }
2441  import_status.rows_completed++;
2442  } catch (const std::exception& e) {
2443  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2444  import_buffers[col_idx_to_pop]->pop_value();
2445  }
2446  import_status.rows_rejected++;
2447  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2448  }
2449  };
2450 
2451  if (pGeometry && copy_params.geo_explode_collections) {
2452  // explode and import
2453  // @TODO(se) convert to structure-bindings when we can use C++17 here
2454  auto collection_idx_type_name = explode_collections_step1(col_descs);
2455  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2456  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2457  explode_collections_step2(pGeometry,
2458  collection_child_type,
2459  collection_col_name,
2460  firstFeature + iFeature + 1,
2461  execute_import_feature);
2462  } else {
2463  // import non-collection or null feature just once
2464  execute_import_feature(pGeometry);
2465  }
2466  } // end features
2467 
2468  float convert_ms =
2469  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2470  convert_timer)) /
2471  1000.0f;
2472 
2473  float load_ms = 0.0f;
2474  if (import_status.rows_completed > 0) {
2475  auto load_timer = timer_start();
2476  importer->load(import_buffers, import_status.rows_completed);
2477  load_ms =
2478  float(
2479  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2480  load_timer)) /
2481  1000.0f;
2482  }
2483 
2484  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2485  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2486  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2487  }
2488 
2489  import_status.thread_id = thread_id;
2490 
2491  if (DEBUG_TIMING) {
2492  LOG(INFO) << "DEBUG: Total "
2493  << float(timer_stop<std::chrono::steady_clock::time_point,
2494  std::chrono::microseconds>(convert_timer)) /
2495  1000.0f
2496  << "ms";
2497  }
2498 
2499  return import_status;
2500 }
2501 
2503  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2504  size_t row_count) {
2505  return loadImpl(import_buffers, row_count, false);
2506 }
2507 
2508 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2509  size_t row_count) {
2510  return loadImpl(import_buffers, row_count, true);
2511 }
2512 
2513 namespace {
2514 
2515 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2516  const auto& ti = import_buffer.getTypeInfo();
2517  const int8_t* values_buffer{nullptr};
2518  if (ti.is_string()) {
2519  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2520  values_buffer = import_buffer.getStringDictBuffer();
2521  } else {
2522  values_buffer = import_buffer.getAsBytes();
2523  }
2524  CHECK(values_buffer);
2525  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2526  switch (logical_size) {
2527  case 1: {
2528  return values_buffer[index];
2529  }
2530  case 2: {
2531  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2532  }
2533  case 4: {
2534  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2535  }
2536  case 8: {
2537  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2538  }
2539  default:
2540  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2541  }
2542  UNREACHABLE();
2543  return 0;
2544 }
2545 
2546 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2547  const auto& ti = import_buffer.getTypeInfo();
2548  CHECK_EQ(kFLOAT, ti.get_type());
2549  const auto values_buffer = import_buffer.getAsBytes();
2550  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2551 }
2552 
2553 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2554  const auto& ti = import_buffer.getTypeInfo();
2555  CHECK_EQ(kDOUBLE, ti.get_type());
2556  const auto values_buffer = import_buffer.getAsBytes();
2557  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2558 }
2559 
2560 } // namespace
2561 
2562 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2563  std::vector<size_t>& all_shard_row_counts,
2564  const OneShardBuffers& import_buffers,
2565  const size_t row_count,
2566  const size_t shard_count) {
2567  all_shard_row_counts.resize(shard_count);
2568  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2569  all_shard_import_buffers.emplace_back();
2570  for (const auto& typed_import_buffer : import_buffers) {
2571  all_shard_import_buffers.back().emplace_back(
2572  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2573  typed_import_buffer->getStringDictionary()));
2574  }
2575  }
2577  int col_idx{0};
2578  const ColumnDescriptor* shard_col_desc{nullptr};
2579  for (const auto col_desc : column_descs_) {
2580  ++col_idx;
2581  if (col_idx == table_desc_->shardedColumnId) {
2582  shard_col_desc = col_desc;
2583  break;
2584  }
2585  }
2586  CHECK(shard_col_desc);
2587  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2588  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2589  const auto& shard_col_ti = shard_col_desc->columnType;
2590  CHECK(shard_col_ti.is_integer() ||
2591  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2592  shard_col_ti.is_time());
2593  if (shard_col_ti.is_string()) {
2594  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2595  CHECK(payloads_ptr);
2596  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2597  }
2598 
2599  // for each replicated (alter added) columns, number of rows in a shard is
2600  // inferred from that of the sharding column, not simply evenly distributed.
2601  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2602  // Here the loop count is overloaded. For normal imports, we loop thru all
2603  // input values (rows), so the loop count is the number of input rows.
2604  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2605  // all shards, so the loop count is the number of shards.
2606  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2607  for (size_t i = 0; i < loop_count; ++i) {
2608  const size_t shard =
2609  getReplicating()
2610  ? i
2611  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2612  auto& shard_output_buffers = all_shard_import_buffers[shard];
2613 
2614  // when replicate a column, populate 'rows' to all shards only once
2615  // and its value is fetch from the first and the single row
2616  const auto row_index = getReplicating() ? 0 : i;
2617 
2618  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2619  const auto& input_buffer = import_buffers[col_idx];
2620  const auto& col_ti = input_buffer->getTypeInfo();
2621  const auto type =
2622  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2623 
2624  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2625  // count. and, bypass non-replicated column.
2626  if (getReplicating()) {
2627  if (input_buffer->get_replicate_count() > 0) {
2628  shard_output_buffers[col_idx]->set_replicate_count(
2629  shard_tds[shard]->fragmenter->getNumRows());
2630  } else {
2631  continue;
2632  }
2633  }
2634 
2635  switch (type) {
2636  case kBOOLEAN:
2637  shard_output_buffers[col_idx]->addBoolean(
2638  int_value_at(*input_buffer, row_index));
2639  break;
2640  case kTINYINT:
2641  shard_output_buffers[col_idx]->addTinyint(
2642  int_value_at(*input_buffer, row_index));
2643  break;
2644  case kSMALLINT:
2645  shard_output_buffers[col_idx]->addSmallint(
2646  int_value_at(*input_buffer, row_index));
2647  break;
2648  case kINT:
2649  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2650  break;
2651  case kBIGINT:
2652  shard_output_buffers[col_idx]->addBigint(
2653  int_value_at(*input_buffer, row_index));
2654  break;
2655  case kFLOAT:
2656  shard_output_buffers[col_idx]->addFloat(
2657  float_value_at(*input_buffer, row_index));
2658  break;
2659  case kDOUBLE:
2660  shard_output_buffers[col_idx]->addDouble(
2661  double_value_at(*input_buffer, row_index));
2662  break;
2663  case kTEXT:
2664  case kVARCHAR:
2665  case kCHAR: {
2666  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2667  shard_output_buffers[col_idx]->addString(
2668  (*input_buffer->getStringBuffer())[row_index]);
2669  break;
2670  }
2671  case kTIME:
2672  case kTIMESTAMP:
2673  case kDATE:
2674  shard_output_buffers[col_idx]->addBigint(
2675  int_value_at(*input_buffer, row_index));
2676  break;
2677  case kARRAY:
2678  if (IS_STRING(col_ti.get_subtype())) {
2679  CHECK(input_buffer->getStringArrayBuffer());
2680  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2681  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2682  shard_output_buffers[col_idx]->addStringArray(input_arr);
2683  } else {
2684  shard_output_buffers[col_idx]->addArray(
2685  (*input_buffer->getArrayBuffer())[row_index]);
2686  }
2687  break;
2688  case kPOINT:
2689  case kLINESTRING:
2690  case kPOLYGON:
2691  case kMULTIPOLYGON: {
2692  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2693  shard_output_buffers[col_idx]->addGeoString(
2694  (*input_buffer->getGeoStringBuffer())[row_index]);
2695  break;
2696  }
2697  default:
2698  CHECK(false);
2699  }
2700  }
2701  ++all_shard_row_counts[shard];
2702  // when replicating a column, row count of a shard == replicate count of the column on
2703  // the shard
2704  if (getReplicating()) {
2705  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2706  }
2707  }
2708 }
2709 
2711  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2712  size_t row_count,
2713  bool checkpoint) {
2714  if (load_callback_) {
2715  auto data_blocks = get_data_block_pointers(import_buffers);
2716  return load_callback_(import_buffers, data_blocks, row_count);
2717  }
2718  if (table_desc_->nShards) {
2719  std::vector<OneShardBuffers> all_shard_import_buffers;
2720  std::vector<size_t> all_shard_row_counts;
2721  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2722  distributeToShards(all_shard_import_buffers,
2723  all_shard_row_counts,
2724  import_buffers,
2725  row_count,
2726  shard_tables.size());
2727  bool success = true;
2728  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2729  if (!all_shard_row_counts[shard_idx]) {
2730  continue;
2731  }
2732  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2733  all_shard_row_counts[shard_idx],
2734  shard_tables[shard_idx],
2735  checkpoint);
2736  }
2737  return success;
2738  }
2739  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2740 }
2741 
2742 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2743  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2744  std::vector<DataBlockPtr> result(import_buffers.size());
2745  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2746  encoded_data_block_ptrs_futures;
2747  // make all async calls to string dictionary here and then continue execution
2748  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2749  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2750  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2751  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2752  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2753 
2754  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2755  buf_idx,
2756  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2757  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2758  return import_buffers[buf_idx]->getStringDictBuffer();
2759  })));
2760  }
2761  }
2762 
2763  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2764  DataBlockPtr p;
2765  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2766  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2767  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2768  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2769  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2770  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2771  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2772  p.stringsPtr = string_payload_ptr;
2773  } else {
2774  // This condition means we have column which is ENCODED string. We already made
2775  // Async request to gain the encoded integer values above so we should skip this
2776  // iteration and continue.
2777  continue;
2778  }
2779  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2780  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2781  p.stringsPtr = geo_payload_ptr;
2782  } else {
2783  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2784  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2785  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2786  import_buffers[buf_idx]->addDictEncodedStringArray(
2787  *import_buffers[buf_idx]->getStringArrayBuffer());
2788  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2789  } else {
2790  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2791  }
2792  }
2793  result[buf_idx] = p;
2794  }
2795 
2796  // wait for the async requests we made for string dictionary
2797  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2798  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2799  }
2800  return result;
2801 }
2802 
2804  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2805  size_t row_count,
2806  const TableDescriptor* shard_table,
2807  bool checkpoint) {
2808  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2809  // patch insert_data with new column
2810  if (this->getReplicating()) {
2811  for (const auto& import_buff : import_buffers) {
2812  insert_data_.replicate_count = import_buff->get_replicate_count();
2813  }
2814  }
2815 
2817  ins_data.numRows = row_count;
2818  bool success = true;
2819 
2820  ins_data.data = get_data_block_pointers(import_buffers);
2821 
2822  for (const auto& import_buffer : import_buffers) {
2823  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2824  }
2825 
2826  // release loader_lock so that in InsertOrderFragmenter::insertDat
2827  // we can have multiple threads sort/shuffle InsertData
2828  loader_lock.unlock();
2829 
2830  {
2831  try {
2832  if (checkpoint) {
2833  shard_table->fragmenter->insertData(ins_data);
2834  } else {
2835  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2836  }
2837  } catch (std::exception& e) {
2838  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2839  success = false;
2840  }
2841  }
2842  return success;
2843 }
2844 
2845 void Loader::dropColumns(const std::vector<int>& columnIds) {
2846  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2847  if (table_desc_->nShards) {
2849  }
2850  for (auto table_desc : table_descs) {
2851  table_desc->fragmenter->dropColumns(columnIds);
2852  }
2853 }
2854 
2855 void Loader::init(const bool use_catalog_locks) {
2858  for (auto cd : column_descs_) {
2859  insert_data_.columnIds.push_back(cd->columnId);
2860  if (cd->columnType.get_compression() == kENCODING_DICT) {
2861  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2862  const auto dd = use_catalog_locks
2863  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2865  cd->columnType.get_comp_param(), true);
2866  CHECK(dd);
2867  dict_map_[cd->columnId] = dd->stringDict.get();
2868  }
2869  }
2870  insert_data_.numRows = 0;
2871 }
2872 
2875  split_raw_data();
2877 }
2878 
2879 ImportStatus Detector::importDelimited(const std::string& file_path,
2880  const bool decompressed) {
2881  if (!p_file) {
2882  p_file = fopen(file_path.c_str(), "rb");
2883  }
2884  if (!p_file) {
2885  throw std::runtime_error("failed to open file '" + file_path +
2886  "': " + strerror(errno));
2887  }
2888 
2889  // somehow clang does not support ext/stdio_filebuf.h, so
2890  // need to diy readline with customized copy_params.line_delim...
2891  std::string line;
2892  line.reserve(1 * 1024 * 1024);
2893  auto end_time = std::chrono::steady_clock::now() +
2894  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2895  try {
2896  while (!feof(p_file)) {
2897  int c;
2898  size_t n = 0;
2899  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2900  if (n++ >= line.capacity()) {
2901  break;
2902  }
2903  line += c;
2904  }
2905  if (0 == n) {
2906  break;
2907  }
2908  // remember the first line, which is possibly a header line, to
2909  // ignore identical header line(s) in 2nd+ files of a archive;
2910  // otherwise, 2nd+ header may be mistaken as an all-string row
2911  // and so be final column types.
2912  if (line1.empty()) {
2913  line1 = line;
2914  } else if (line == line1) {
2915  line.clear();
2916  continue;
2917  }
2918 
2919  raw_data += line;
2921  line.clear();
2923  if (std::chrono::steady_clock::now() > end_time) {
2924  if (import_status.rows_completed > 10000) {
2925  break;
2926  }
2927  }
2928  }
2929  } catch (std::exception& e) {
2930  }
2931 
2932  // as if load truncated
2934  load_failed = true;
2935 
2936  fclose(p_file);
2937  p_file = nullptr;
2938  return import_status;
2939 }
2940 
2942  // this becomes analogous to Importer::import()
2944 }
2945 
2947  if (copy_params.delimiter == '\0') {
2948  copy_params.delimiter = ',';
2949  if (boost::filesystem::extension(file_path) == ".tsv") {
2950  copy_params.delimiter = '\t';
2951  }
2952  }
2953 }
2954 
2956  const char* buf = raw_data.c_str();
2957  const char* buf_end = buf + raw_data.size();
2958  bool try_single_thread = false;
2959  for (const char* p = buf; p < buf_end; p++) {
2960  std::vector<std::string> row;
2961  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2963  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2964  raw_rows.push_back(row);
2965  if (try_single_thread) {
2966  break;
2967  }
2968  }
2969  if (try_single_thread) {
2970  copy_params.threads = 1;
2971  raw_rows.clear();
2972  for (const char* p = buf; p < buf_end; p++) {
2973  std::vector<std::string> row;
2974  std::vector<std::unique_ptr<char[]>> tmp_buffers;
2976  p, buf_end, buf_end, copy_params, nullptr, row, tmp_buffers, try_single_thread);
2977  raw_rows.push_back(row);
2978  }
2979  }
2980 }
2981 
2982 template <class T>
2983 bool try_cast(const std::string& str) {
2984  try {
2985  boost::lexical_cast<T>(str);
2986  } catch (const boost::bad_lexical_cast& e) {
2987  return false;
2988  }
2989  return true;
2990 }
2991 
2992 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2993  std::tm tm_struct;
2994  char* buf;
2995  for (auto format : formats) {
2996  buf = strptime(str, format.c_str(), &tm_struct);
2997  if (buf) {
2998  return buf;
2999  }
3000  }
3001  return nullptr;
3002 }
3003 
3004 SQLTypes Detector::detect_sqltype(const std::string& str) {
3005  SQLTypes type = kTEXT;
3006  if (try_cast<double>(str)) {
3007  type = kDOUBLE;
3008  /*if (try_cast<bool>(str)) {
3009  type = kBOOLEAN;
3010  }*/
3011  if (try_cast<int16_t>(str)) {
3012  type = kSMALLINT;
3013  } else if (try_cast<int32_t>(str)) {
3014  type = kINT;
3015  } else if (try_cast<int64_t>(str)) {
3016  type = kBIGINT;
3017  } else if (try_cast<float>(str)) {
3018  type = kFLOAT;
3019  }
3020  }
3021 
3022  // check for geo types
3023  if (type == kTEXT) {
3024  // convert to upper case
3025  std::string str_upper_case = str;
3026  std::transform(
3027  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3028 
3029  // then test for leading words
3030  if (str_upper_case.find("POINT") == 0) {
3031  type = kPOINT;
3032  } else if (str_upper_case.find("LINESTRING") == 0) {
3033  type = kLINESTRING;
3034  } else if (str_upper_case.find("POLYGON") == 0) {
3036  type = kMULTIPOLYGON;
3037  } else {
3038  type = kPOLYGON;
3039  }
3040  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3041  type = kMULTIPOLYGON;
3042  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3043  std::string::npos &&
3044  (str_upper_case.size() % 2) == 0) {
3045  // could be a WKB hex blob
3046  // we can't handle these yet
3047  // leave as TEXT for now
3048  // deliberate return here, as otherwise this would get matched as TIME
3049  // @TODO
3050  // implement WKB import
3051  return type;
3052  }
3053  }
3054 
3055  // check for time types
3056  if (type == kTEXT) {
3057  // @TODO
3058  // make these tests more robust so they don't match stuff they should not
3059  char* buf;
3060  buf = try_strptimes(str.c_str(),
3061  {"%Y-%m-%d", "%m/%d/%Y", "%Y/%m/%d", "%d-%b-%y", "%d/%b/%Y"});
3062  if (buf) {
3063  type = kDATE;
3064  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
3065  buf++;
3066  }
3067  }
3068  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
3069  {"%T %z", "%T", "%H%M%S", "%R"});
3070  if (buf) {
3071  if (type == kDATE) {
3072  type = kTIMESTAMP;
3073  } else {
3074  type = kTIME;
3075  }
3076  }
3077  }
3078 
3079  return type;
3080 }
3081 
3082 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3083  std::vector<SQLTypes> types(row.size());
3084  for (size_t i = 0; i < row.size(); i++) {
3085  types[i] = detect_sqltype(row[i]);
3086  }
3087  return types;
3088 }
3089 
3091  static std::array<int, kSQLTYPE_LAST> typeorder;
3092  typeorder[kCHAR] = 0;
3093  typeorder[kBOOLEAN] = 2;
3094  typeorder[kSMALLINT] = 3;
3095  typeorder[kINT] = 4;
3096  typeorder[kBIGINT] = 5;
3097  typeorder[kFLOAT] = 6;
3098  typeorder[kDOUBLE] = 7;
3099  typeorder[kTIMESTAMP] = 8;
3100  typeorder[kTIME] = 9;
3101  typeorder[kDATE] = 10;
3102  typeorder[kPOINT] = 11;
3103  typeorder[kLINESTRING] = 11;
3104  typeorder[kPOLYGON] = 11;
3105  typeorder[kMULTIPOLYGON] = 11;
3106  typeorder[kTEXT] = 12;
3107 
3108  // note: b < a instead of a < b because the map is ordered most to least restrictive
3109  return typeorder[b] < typeorder[a];
3110 }
3111 
3114  best_encodings =
3115  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3116  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3117  switch (copy_params.has_header) {
3119  has_headers = detect_headers(head_types, best_sqltypes);
3120  if (has_headers) {
3122  } else {
3124  }
3125  break;
3127  has_headers = false;
3128  break;
3130  has_headers = true;
3131  break;
3132  }
3133 }
3134 
3137 }
3138 
3139 std::vector<SQLTypes> Detector::find_best_sqltypes(
3140  const std::vector<std::vector<std::string>>& raw_rows,
3141  const CopyParams& copy_params) {
3142  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3143 }
3144 
3145 std::vector<SQLTypes> Detector::find_best_sqltypes(
3146  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3147  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3148  const CopyParams& copy_params) {
3149  if (raw_rows.size() < 1) {
3150  throw std::runtime_error("No rows found in: " +
3151  boost::filesystem::basename(file_path));
3152  }
3153  auto end_time = std::chrono::steady_clock::now() + timeout;
3154  size_t num_cols = raw_rows.front().size();
3155  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3156  std::vector<size_t> non_null_col_counts(num_cols, 0);
3157  for (auto row = row_begin; row != row_end; row++) {
3158  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3159  best_types.push_back(kCHAR);
3160  non_null_col_counts.push_back(0);
3161  }
3162  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3163  // do not count nulls
3164  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3165  continue;
3166  }
3167  SQLTypes t = detect_sqltype(row->at(col_idx));
3168  non_null_col_counts[col_idx]++;
3169  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3170  best_types[col_idx] = t;
3171  }
3172  }
3173  if (std::chrono::steady_clock::now() > end_time) {
3174  break;
3175  }
3176  }
3177  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3178  // if we don't have any non-null values for this column make it text to be
3179  // safe b/c that is least restrictive type
3180  if (non_null_col_counts[col_idx] == 0) {
3181  best_types[col_idx] = kTEXT;
3182  }
3183  }
3184 
3185  return best_types;
3186 }
3187 
3188 std::vector<EncodingType> Detector::find_best_encodings(
3189  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3190  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3191  const std::vector<SQLTypes>& best_types) {
3192  if (raw_rows.size() < 1) {
3193  throw std::runtime_error("No rows found in: " +
3194  boost::filesystem::basename(file_path));
3195  }
3196  size_t num_cols = best_types.size();
3197  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3198  std::vector<size_t> num_rows_per_col(num_cols, 1);
3199  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3200  for (auto row = row_begin; row != row_end; row++) {
3201  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3202  if (IS_STRING(best_types[col_idx])) {
3203  count_set[col_idx].insert(row->at(col_idx));
3204  num_rows_per_col[col_idx]++;
3205  }
3206  }
3207  }
3208  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3209  if (IS_STRING(best_types[col_idx])) {
3210  float uniqueRatio =
3211  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3212  if (uniqueRatio < 0.75) {
3213  best_encodes[col_idx] = kENCODING_DICT;
3214  }
3215  }
3216  }
3217  return best_encodes;
3218 }
3219 
3220 // detect_headers returns true if:
3221 // - all elements of the first argument are kTEXT
3222 // - there is at least one instance where tail_types is more restrictive than head_types
3223 // (ie, not kTEXT)
3224 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3225  const std::vector<SQLTypes>& tail_types) {
3226  if (head_types.size() != tail_types.size()) {
3227  return false;
3228  }
3229  bool has_headers = false;
3230  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3231  if (head_types[col_idx] != kTEXT) {
3232  return false;
3233  }
3234  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3235  }
3236  return has_headers;
3237 }
3238 
3239 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3240  n = std::min(n, raw_rows.size());
3241  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3242  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3243  raw_rows.begin() + n);
3244  return sample_rows;
3245 }
3246 
3247 std::vector<std::string> Detector::get_headers() {
3248  std::vector<std::string> headers(best_sqltypes.size());
3249  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3250  if (has_headers && i < raw_rows[0].size()) {
3251  headers[i] = raw_rows[0][i];
3252  } else {
3253  headers[i] = "column_" + std::to_string(i + 1);
3254  }
3255  }
3256  return headers;
3257 }
3258 
3259 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3260  size_t row_count) {
3261  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3262  load_failed = true;
3263  }
3264 }
3265 
3266 void Importer::checkpoint(const int32_t start_epoch) {
3267  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3268  if (load_failed) {
3269  // rollback to starting epoch - undo all the added records
3270  loader->setTableEpoch(start_epoch);
3271  } else {
3272  loader->checkpoint();
3273  }
3274  }
3275 
3276  if (loader->getTableDesc()->persistenceLevel ==
3277  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3278  auto ms = measure<>::execution([&]() {
3279  if (!load_failed) {
3280  for (auto& p : import_buffers_vec[0]) {
3281  if (!p->stringDictCheckpoint()) {
3282  LOG(ERROR) << "Checkpointing Dictionary for Column "
3283  << p->getColumnDesc()->columnName << " failed.";
3284  load_failed = true;
3285  break;
3286  }
3287  }
3288  }
3289  });
3290  if (DEBUG_TIMING) {
3291  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3292  << std::endl;
3293  }
3294  }
3295 }
3296 
3298  // in generalized importing scheme, reaching here file_path may
3299  // contain a file path, a url or a wildcard of file paths.
3300  // see CopyTableStmt::execute.
3301  auto file_paths = mapd_glob(file_path);
3302  if (file_paths.size() == 0) {
3303  file_paths.push_back(file_path);
3304  }
3305 
3306  // sum up sizes of all local files -- only for local files. if
3307  // file_path is a s3 url, sizes will be obtained via S3Archive.
3308  for (const auto& file_path : file_paths) {
3310  }
3311 
3312 #ifdef ENABLE_IMPORT_PARQUET
3313  // s3 parquet goes different route because the files do not use libarchive
3314  // but parquet api, and they need to landed like .7z files.
3315  //
3316  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3317  // because for example spark sql users may specify a output url w/o file
3318  // extension like this:
3319  // df.write
3320  // .mode("overwrite")
3321  // .parquet("s3://bucket/folder/parquet/mydata")
3322  // without the parameter, it means plain or compressed csv files.
3323  // note: .ORC and AVRO files should follow a similar path to Parquet?
3324  if (copy_params.file_type == FileType::PARQUET) {
3325  import_parquet(file_paths);
3326  } else
3327 #endif
3328  {
3329  import_compressed(file_paths);
3330  }
3331  return import_status;
3332 }
3333 
3334 #ifdef ENABLE_IMPORT_PARQUET
3335 inline auto open_parquet_table(const std::string& file_path,
3336  std::shared_ptr<arrow::io::ReadableFile>& infile,
3337  std::unique_ptr<parquet::arrow::FileReader>& reader,
3338  std::shared_ptr<arrow::Table>& table) {
3339  using namespace parquet::arrow;
3340  auto file_result = arrow::io::ReadableFile::Open(file_path);
3341  PARQUET_THROW_NOT_OK(file_result.status());
3342  infile = file_result.ValueOrDie();
3343 
3344  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3345  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3346  const auto num_row_groups = reader->num_row_groups();
3347  const auto num_columns = table->num_columns();
3348  const auto num_rows = table->num_rows();
3349  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3350  << " columns in " << num_row_groups << " groups.";
3351  return std::make_tuple(num_row_groups, num_columns, num_rows);
3352 }
3353 
3354 void Detector::import_local_parquet(const std::string& file_path) {
3355  std::shared_ptr<arrow::io::ReadableFile> infile;
3356  std::unique_ptr<parquet::arrow::FileReader> reader;
3357  std::shared_ptr<arrow::Table> table;
3358  int num_row_groups, num_columns;
3359  int64_t num_rows;
3360  std::tie(num_row_groups, num_columns, num_rows) =
3361  open_parquet_table(file_path, infile, reader, table);
3362  // make up header line if not yet
3363  if (0 == raw_data.size()) {
3365  copy_params.line_delim = '\n';
3366  copy_params.delimiter = ',';
3367  // must quote values to skip any embedded delimiter
3368  copy_params.quoted = true;
3369  copy_params.quote = '"';
3370  copy_params.escape = '"';
3371  for (int c = 0; c < num_columns; ++c) {
3372  if (c) {
3374  }
3375  raw_data += table->ColumnNames().at(c);
3376  }
3378  }
3379  // make up raw data... rowwize...
3380  const ColumnDescriptor cd;
3381  for (int g = 0; g < num_row_groups; ++g) {
3382  // data is columnwise
3383  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3384  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3385  arrays.resize(num_columns);
3386  for (int c = 0; c < num_columns; ++c) {
3387  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3388  for (auto chunk : arrays[c]->chunks()) {
3389  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3390  }
3391  }
3392  for (int r = 0; r < num_rows; ++r) {
3393  for (int c = 0; c < num_columns; ++c) {
3394  std::vector<std::string> buffer;
3395  for (auto chunk : arrays[c]->chunks()) {
3396  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3397  if (c) {
3399  }
3400  if (!chunk->IsNull(r)) {
3402  raw_data += boost::replace_all_copy(
3403  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3405  }
3406  }
3407  }
3409  if (++import_status.rows_completed >= 10000) {
3410  // as if load truncated
3412  load_failed = true;
3413  return;
3414  }
3415  }
3416  }
3417 }
3418 
3419 template <typename DATA_TYPE>
3421  std::vector<DATA_TYPE>& buffer,
3422  import_export::BadRowsTracker* const bad_rows_tracker) {
3423  const auto old_size = buffer.size();
3424  // erase backward to minimize memory movement overhead
3425  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3426  ++rit) {
3427  buffer.erase(buffer.begin() + *rit);
3428  }
3429  return std::make_tuple(old_size, buffer.size());
3430 }
3431 
3433  BadRowsTracker* const bad_rows_tracker) {
3434  switch (type) {
3435  case kBOOLEAN:
3436  return del_values(*bool_buffer_, bad_rows_tracker);
3437  case kTINYINT:
3438  return del_values(*tinyint_buffer_, bad_rows_tracker);
3439  case kSMALLINT:
3440  return del_values(*smallint_buffer_, bad_rows_tracker);
3441  case kINT:
3442  return del_values(*int_buffer_, bad_rows_tracker);
3443  case kBIGINT:
3444  case kNUMERIC:
3445  case kDECIMAL:
3446  case kTIME:
3447  case kTIMESTAMP:
3448  case kDATE:
3449  return del_values(*bigint_buffer_, bad_rows_tracker);
3450  case kFLOAT:
3451  return del_values(*float_buffer_, bad_rows_tracker);
3452  case kDOUBLE:
3453  return del_values(*double_buffer_, bad_rows_tracker);
3454  case kTEXT:
3455  case kVARCHAR:
3456  case kCHAR:
3457  return del_values(*string_buffer_, bad_rows_tracker);
3458  case kPOINT:
3459  case kLINESTRING:
3460  case kPOLYGON:
3461  case kMULTIPOLYGON:
3462  return del_values(*geo_string_buffer_, bad_rows_tracker);
3463  case kARRAY:
3464  return del_values(*array_buffer_, bad_rows_tracker);
3465  default:
3466  throw std::runtime_error("Invalid Type");
3467  }
3468 }
3469 
3470 void Importer::import_local_parquet(const std::string& file_path) {
3471  std::shared_ptr<arrow::io::ReadableFile> infile;
3472  std::unique_ptr<parquet::arrow::FileReader> reader;
3473  std::shared_ptr<arrow::Table> table;
3474  int num_row_groups, num_columns;
3475  int64_t nrow_in_file;
3476  std::tie(num_row_groups, num_columns, nrow_in_file) =
3477  open_parquet_table(file_path, infile, reader, table);
3478  // column_list has no $deleted
3479  const auto& column_list = get_column_descs();
3480  // for now geo columns expect a wkt string
3481  std::vector<const ColumnDescriptor*> cds;
3482  int num_physical_cols = 0;
3483  for (auto& cd : column_list) {
3484  cds.push_back(cd);
3485  num_physical_cols += cd->columnType.get_physical_cols();
3486  }
3487  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3488  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3489  std::to_string(num_columns) + " columns in file vs " +
3490  std::to_string(column_list.size() - num_physical_cols) +
3491  " columns in table.");
3492  // slice each group to import slower columns faster, eg. geo or string
3494  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3495  // init row estimate for this file
3496  const auto filesize = get_filesize(file_path);
3497  size_t nrow_completed{0};
3498  file_offsets.push_back(0);
3499  // map logic column index to physical column index
3500  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3501  int physical_col_idx = 0;
3502  for (int i = 0; i < logic_col_idx; ++i) {
3503  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3504  }
3505  return physical_col_idx;
3506  };
3507  // load a file = nested iteration of row groups, row slices and logical columns
3508  auto ms_load_a_file = measure<>::execution([&]() {
3509  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3510  // a sliced row group will be handled like a (logic) parquet file, with
3511  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3512  import_buffers_vec.resize(num_slices);
3513  for (int slice = 0; slice < num_slices; slice++) {
3514  import_buffers_vec[slice].clear();
3515  for (const auto cd : cds) {
3516  import_buffers_vec[slice].emplace_back(
3517  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3518  }
3519  }
3520  /*
3521  * A caveat here is: Parquet files or arrow data is imported column wise.
3522  * Unlike importing row-wise csv files, a error on any row of any column
3523  * forces to give up entire row group of all columns, unless there is a
3524  * sophisticated method to trace erroneous rows in individual columns so
3525  * that we can union bad rows and drop them from corresponding
3526  * import_buffers_vec; otherwise, we may exceed maximum number of
3527  * truncated rows easily even with very sparse errors in the files.
3528  */
3529  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3530  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3531  auto& bad_rows_tracker = bad_rows_trackers[slice];
3532  bad_rows_tracker.file_name = file_path;
3533  bad_rows_tracker.row_group = slice;
3534  bad_rows_tracker.importer = this;
3535  }
3536  // process arrow arrays to import buffers
3537  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3538  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3539  const auto cd = cds[physical_col_idx];
3540  std::shared_ptr<arrow::ChunkedArray> array;
3541  PARQUET_THROW_NOT_OK(
3542  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3543  const size_t array_size = array->length();
3544  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3545  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3546  for (int slice = 0; slice < num_slices; ++slice) {
3547  thread_controller.startThread([&, slice] {
3548  const auto slice_offset = slice % num_slices;
3549  ArraySliceRange slice_range(
3550  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3551  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3552  auto& bad_rows_tracker = bad_rows_trackers[slice];
3553  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3554  import_buffer->import_buffers = &import_buffers_vec[slice];
3555  import_buffer->col_idx = physical_col_idx + 1;
3556  for (auto chunk : array->chunks()) {
3557  import_buffer->add_arrow_values(
3558  cd, *chunk, false, slice_range, &bad_rows_tracker);
3559  }
3560  });
3561  }
3562  thread_controller.finish();
3563  }
3564  std::vector<size_t> nrow_in_slice_raw(num_slices);
3565  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3566  // trim bad rows from import buffers
3567  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3568  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3569  const auto cd = cds[physical_col_idx];
3570  for (int slice = 0; slice < num_slices; ++slice) {
3571  auto& bad_rows_tracker = bad_rows_trackers[slice];
3572  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3573  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3574  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3575  }
3576  }
3577  // flush slices of this row group to chunks
3578  for (int slice = 0; slice < num_slices; ++slice) {
3579  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3580  }
3581  // update import stats
3582  const auto nrow_original =
3583  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3584  const auto nrow_imported =
3585  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3586  nrow_in_slice_successfully_loaded.end(),
3587  0);
3588  const auto nrow_dropped = nrow_original - nrow_imported;
3589  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3590  << " rows, drop " << nrow_dropped << " rows.";
3591  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3592  import_status.rows_completed += nrow_imported;
3593  import_status.rows_rejected += nrow_dropped;
3596  load_failed = true;
3597  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3598  << ") rows rejected exceeded. Halting load.";
3599  }
3600  // row estimate
3601  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3602  nrow_completed += nrow_imported;
3603  file_offsets.back() =
3604  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3605  // sum up current total file offsets
3606  const auto total_file_offset =
3607  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3608  // estimate number of rows per current total file offset
3609  if (total_file_offset) {
3611  (float)total_file_size / total_file_offset * import_status.rows_completed;
3612  VLOG(3) << "rows_completed " << import_status.rows_completed
3613  << ", rows_estimated " << import_status.rows_estimated
3614  << ", total_file_size " << total_file_size << ", total_file_offset "
3615  << total_file_offset;
3616  }
3617  }
3618  });
3619  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3620  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3621 }
3622 
3623 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3624  auto importer = dynamic_cast<Importer*>(this);
3625  auto start_epoch = importer ? importer->getLoader()->getTableEpoch() : 0;
3626  try {
3627  std::exception_ptr teptr;
3628  // file_paths may contain one local file path, a list of local file paths
3629  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3630  for (auto const& file_path : file_paths) {
3631  std::map<int, std::string> url_parts;
3632  Archive::parse_url(file_path, url_parts);
3633 
3634  // for a s3 url we need to know the obj keys that it comprises
3635  std::vector<std::string> objkeys;
3636  std::unique_ptr<S3ParquetArchive> us3arch;
3637  if ("s3" == url_parts[2]) {
3638 #ifdef HAVE_AWS_S3
3639  us3arch.reset(new S3ParquetArchive(file_path,
3645  us3arch->init_for_read();
3646  total_file_size += us3arch->get_total_file_size();
3647  objkeys = us3arch->get_objkeys();
3648 #else
3649  throw std::runtime_error("AWS S3 support not available");
3650 #endif // HAVE_AWS_S3
3651  } else {
3652  objkeys.emplace_back(file_path);
3653  }
3654 
3655  // for each obj key of a s3 url we need to land it before
3656  // importing it like doing with a 'local file'.
3657  for (auto const& objkey : objkeys) {
3658  try {
3659  auto file_path =
3660  us3arch
3661  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3662  : objkey;
3663  import_local_parquet(file_path);
3664  if (us3arch) {
3665  us3arch->vacuum(objkey);
3666  }
3667  } catch (...) {
3668  if (us3arch) {
3669  us3arch->vacuum(objkey);
3670  }
3671  throw;
3672  }
3674  break;
3675  }
3676  }
3677  }
3678  // rethrow any exception happened herebefore
3679  if (teptr) {
3680  std::rethrow_exception(teptr);
3681  }
3682  } catch (...) {
3683  load_failed = true;
3685  throw;
3686  }
3687  }
3688 
3689  if (importer) {
3690  importer->checkpoint(start_epoch);
3691  }
3692 }
3693 #endif // ENABLE_IMPORT_PARQUET
3694 
3695 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3696  // a new requirement is to have one single input stream into
3697  // Importer::importDelimited, so need to move pipe related
3698  // stuff to the outmost block.
3699  int fd[2];
3700  if (pipe(fd) < 0) {
3701  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3702  }
3703  signal(SIGPIPE, SIG_IGN);
3704 
3705  std::exception_ptr teptr;
3706  // create a thread to read uncompressed byte stream out of pipe and
3707  // feed into importDelimited()
3708  ImportStatus ret;
3709  auto th_pipe_reader = std::thread([&]() {
3710  try {
3711  // importDelimited will read from FILE* p_file
3712  if (0 == (p_file = fdopen(fd[0], "r"))) {
3713  throw std::runtime_error(std::string("failed to open a pipe: ") +
3714  strerror(errno));
3715  }
3716 
3717  // in future, depending on data types of this uncompressed stream
3718  // it can be feed into other function such like importParquet, etc
3719  ret = importDelimited(file_path, true);
3720  } catch (...) {
3721  if (!teptr) { // no replace
3722  teptr = std::current_exception();
3723  }
3724  }
3725 
3726  if (p_file) {
3727  fclose(p_file);
3728  }
3729  p_file = 0;
3730  });
3731 
3732  // create a thread to iterate all files (in all archives) and
3733  // forward the uncompressed byte stream to fd[1] which is
3734  // then feed into importDelimited, importParquet, and etc.
3735  auto th_pipe_writer = std::thread([&]() {
3736  std::unique_ptr<S3Archive> us3arch;
3737  bool stop = false;
3738  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3739  try {
3740  auto file_path = file_paths[fi];
3741  std::unique_ptr<Archive> uarch;
3742  std::map<int, std::string> url_parts;
3743  Archive::parse_url(file_path, url_parts);
3744  const std::string S3_objkey_url_scheme = "s3ok";
3745  if ("file" == url_parts[2] || "" == url_parts[2]) {
3746  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3747  } else if ("s3" == url_parts[2]) {
3748 #ifdef HAVE_AWS_S3
3749  // new a S3Archive with a shared s3client.
3750  // should be safe b/c no wildcard with s3 url
3751  us3arch.reset(new S3Archive(file_path,
3757  us3arch->init_for_read();
3758  total_file_size += us3arch->get_total_file_size();
3759  // not land all files here but one by one in following iterations
3760  for (const auto& objkey : us3arch->get_objkeys()) {
3761  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3762  }
3763  continue;
3764 #else
3765  throw std::runtime_error("AWS S3 support not available");
3766 #endif // HAVE_AWS_S3
3767  } else if (S3_objkey_url_scheme == url_parts[2]) {
3768 #ifdef HAVE_AWS_S3
3769  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3770  auto file_path =
3771  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3772  if (0 == file_path.size()) {
3773  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3774  }
3775  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3776  // file not removed until file closed
3777  us3arch->vacuum(objkey);
3778 #else
3779  throw std::runtime_error("AWS S3 support not available");
3780 #endif // HAVE_AWS_S3
3781  }
3782 #if 0 // TODO(ppan): implement and enable any other archive class
3783  else
3784  if ("hdfs" == url_parts[2])
3785  uarch.reset(new HdfsArchive(file_path));
3786 #endif
3787  else {
3788  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3789  }
3790 
3791  // init the archive for read
3792  auto& arch = *uarch;
3793 
3794  // coming here, the archive of url should be ready to be read, unarchived
3795  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3796  const void* buf;
3797  size_t size;
3798  bool just_saw_archive_header;
3799  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3800  bool first_text_header_skipped = false;
3801  // start reading uncompressed bytes of this archive from libarchive
3802  // note! this archive may contain more than one files!
3803  file_offsets.push_back(0);
3804  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3805  bool insert_line_delim_after_this_file = false;
3806  while (!stop) {
3807  int64_t offset{-1};
3808  auto ok = arch.read_data_block(&buf, &size, &offset);
3809  // can't use (uncompressed) size, so track (max) file offset.
3810  // also we want to capture offset even on e.o.f.
3811  if (offset > 0) {
3812  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3813  file_offsets.back() = offset;
3814  }
3815  if (!ok) {
3816  break;
3817  }
3818  // one subtle point here is now we concatenate all files
3819  // to a single FILE stream with which we call importDelimited
3820  // only once. this would make it misunderstand that only one
3821  // header line is with this 'single' stream, while actually
3822  // we may have one header line for each of the files.
3823  // so we need to skip header lines here instead in importDelimited.
3824  const char* buf2 = (const char*)buf;
3825  int size2 = size;
3827  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3828  while (size2-- > 0) {
3829  if (*buf2++ == copy_params.line_delim) {
3830  break;
3831  }
3832  }
3833  if (size2 <= 0) {
3834  LOG(WARNING) << "No line delimiter in block." << std::endl;
3835  } else {
3836  just_saw_archive_header = false;
3837  first_text_header_skipped = true;
3838  }
3839  }
3840  // In very rare occasions the write pipe somehow operates in a mode similar to
3841  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3842  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3843  // loop reading till no bytes left, otherwise the annoying `failed to write
3844  // pipe: Success`...
3845  if (size2 > 0) {
3846  int nremaining = size2;
3847  while (nremaining > 0) {
3848  // try to write the entire remainder of the buffer to the pipe
3849  int nwritten = write(fd[1], buf2, nremaining);
3850  // how did we do?
3851  if (nwritten < 0) {
3852  // something bad happened
3853  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3854  // ignore these, assume nothing written, try again
3855  nwritten = 0;
3856  } else {
3857  // a real error
3858  throw std::runtime_error(
3859  std::string("failed or interrupted write to pipe: ") +
3860  strerror(errno));
3861  }
3862  } else if (nwritten == nremaining) {
3863  // we wrote everything; we're done
3864  break;
3865  }
3866  // only wrote some (or nothing), try again
3867  nremaining -= nwritten;
3868  buf2 += nwritten;
3869  // no exception when too many rejected
3870  // @simon.eves how would this get set? from the other thread? mutex
3871  // needed?
3873  stop = true;
3874  break;
3875  }
3876  }
3877  // check that this file (buf for size) ended with a line delim
3878  if (size > 0) {
3879  const char* plast = static_cast<const char*>(buf) + (size - 1);
3880  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3881  }
3882  }
3883  }
3884  // if that file didn't end with a line delim, we insert one here to terminate
3885  // that file's stream use a loop for the same reason as above
3886  if (insert_line_delim_after_this_file) {
3887  while (true) {
3888  // write the delim char to the pipe
3889  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3890  // how did we do?
3891  if (nwritten < 0) {
3892  // something bad happened
3893  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3894  // ignore these, assume nothing written, try again
3895  nwritten = 0;
3896  } else {
3897  // a real error
3898  throw std::runtime_error(
3899  std::string("failed or interrupted write to pipe: ") +
3900  strerror(errno));
3901  }
3902  } else if (nwritten == 1) {
3903  // we wrote it; we're done
3904  break;
3905  }
3906  }
3907  }
3908  }
3909  } catch (...) {
3910  // when import is aborted because too many data errors or because end of a
3911  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3912  // suppressed.
3913  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3915  break;
3916  }
3917  if (import_status.rows_completed > 0) {
3918  if (nullptr != dynamic_cast<Detector*>(this)) {
3919  break;
3920  }
3921  }
3922  if (!teptr) { // no replace
3923  teptr = std::current_exception();
3924  }
3925  break;
3926  }
3927  }
3928  // close writer end
3929  close(fd[1]);
3930  });
3931 
3932  th_pipe_reader.join();
3933  th_pipe_writer.join();
3934 
3935  // rethrow any exception happened herebefore
3936  if (teptr) {
3937  std::rethrow_exception(teptr);
3938  }
3939 }
3940 
3943 }
3944 
3945 ImportStatus Importer::importDelimited(const std::string& file_path,
3946  const bool decompressed) {
3947  bool load_truncated = false;
3949 
3950  if (!p_file) {
3951  p_file = fopen(file_path.c_str(), "rb");
3952  }
3953  if (!p_file) {
3954  throw std::runtime_error("failed to open file '" + file_path +
3955  "': " + strerror(errno));
3956  }
3957 
3958  if (!decompressed) {
3959  (void)fseek(p_file, 0, SEEK_END);
3960  file_size = ftell(p_file);
3961  }
3962 
3963  if (copy_params.threads == 0) {
3964  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3965  } else {
3966  max_threads = static_cast<size_t>(copy_params.threads);
3967  }
3968 
3969  // deal with small files
3970  size_t alloc_size = copy_params.buffer_size;
3971  if (!decompressed && file_size < alloc_size) {
3972  alloc_size = file_size;
3973  }
3974 
3975  for (size_t i = 0; i < max_threads; i++) {
3976  import_buffers_vec.emplace_back();
3977  for (const auto cd : loader->get_column_descs()) {
3978  import_buffers_vec[i].emplace_back(
3979  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
3980  }
3981  }
3982 
3983  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3984  size_t current_pos = 0;
3985  size_t end_pos;
3986  size_t begin_pos = 0;
3987 
3988  (void)fseek(p_file, current_pos, SEEK_SET);
3989  size_t size =
3990  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3991 
3992  // make render group analyzers for each poly column
3993  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3995  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3996  loader->getTableDesc()->tableId, false, false, false);
3997  for (auto cd : columnDescriptors) {
3998  SQLTypes ct = cd->columnType.get_type();
3999  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4000  auto rga = std::make_shared<RenderGroupAnalyzer>();
4001  rga->seedFromExistingTableContents(loader, cd->columnName);
4002  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4003  }
4004  }
4005  }
4006 
4007  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4008  loader->getTableDesc()->tableId};
4009  auto start_epoch = loader->getTableEpoch();
4010  {
4011  std::list<std::future<ImportStatus>> threads;
4012 
4013  // use a stack to track thread_ids which must not overlap among threads
4014  // because thread_id is used to index import_buffers_vec[]
4015  std::stack<size_t> stack_thread_ids;
4016  for (size_t i = 0; i < max_threads; i++) {
4017  stack_thread_ids.push(i);
4018  }
4019  // added for true row index on error
4020  size_t first_row_index_this_buffer = 0;
4021 
4022  while (size > 0) {
4023  unsigned int num_rows_this_buffer = 0;
4024  CHECK(scratch_buffer);
4025  end_pos = delimited_parser::find_end(
4026  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
4027 
4028  // unput residual
4029  int nresidual = size - end_pos;
4030  std::unique_ptr<char[]> unbuf;
4031  if (nresidual > 0) {
4032  unbuf = std::make_unique<char[]>(nresidual);
4033  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4034  }
4035 
4036  // get a thread_id not in use
4037  auto thread_id = stack_thread_ids.top();
4038  stack_thread_ids.pop();
4039  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4040 
4041  threads.push_back(std::async(std::launch::async,
4043  thread_id,
4044  this,
4045  std::move(scratch_buffer),
4046  begin_pos,
4047  end_pos,
4048  end_pos,
4049  columnIdToRenderGroupAnalyzerMap,
4050  first_row_index_this_buffer));
4051 
4052  first_row_index_this_buffer += num_rows_this_buffer;
4053 
4054  current_pos += end_pos;
4055  scratch_buffer = std::make_unique<char[]>(alloc_size);
4056  CHECK(scratch_buffer);
4057  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4058  size = nresidual + fread(scratch_buffer.get() + nresidual,
4059  1,
4060  copy_params.buffer_size - nresidual,
4061  p_file);
4062 
4063  begin_pos = 0;
4064 
4065  while (threads.size() > 0) {
4066  int nready = 0;
4067  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4068  it != threads.end();) {
4069  auto& p = *it;
4070  std::chrono::milliseconds span(
4071  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4072  if (p.wait_for(span) == std::future_status::ready) {
4073  auto ret_import_status = p.get();
4074  import_status += ret_import_status;
4075  // sum up current total file offsets
4076  size_t total_file_offset{0};
4077  if (decompressed) {
4078  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4079  for (const auto file_offset : file_offsets) {
4080  total_file_offset += file_offset;
4081  }
4082  }
4083  // estimate number of rows per current total file offset
4084  if (decompressed ? total_file_offset : current_pos) {
4086  (decompressed ? (float)total_file_size / total_file_offset
4087  : (float)file_size / current_pos) *
4089  }
4090  VLOG(3) << "rows_completed " << import_status.rows_completed
4091  << ", rows_estimated " << import_status.rows_estimated
4092  << ", total_file_size " << total_file_size << ", total_file_offset "
4093  << total_file_offset;
4095  // recall thread_id for reuse
4096  stack_thread_ids.push(ret_import_status.thread_id);
4097  threads.erase(it++);
4098  ++nready;
4099  } else {
4100  ++it;
4101  }
4102  }
4103 
4104  if (nready == 0) {
4105  std::this_thread::yield();
4106  }
4107 
4108  // on eof, wait all threads to finish
4109  if (0 == size) {
4110  continue;
4111  }
4112 
4113  // keep reading if any free thread slot
4114  // this is one of the major difference from old threading model !!
4115  if (threads.size() < max_threads) {
4116  break;
4117  }
4118  }
4119 
4120  if (import_status.rows_rejected > copy_params.max_reject) {
4121  load_truncated = true;
4122  load_failed = true;
4123  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4124  break;
4125  }
4126  if (load_failed) {
4127  load_truncated = true;
4128  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
4129  "more details";
4130  break;
4131  }
4132  }
4133 
4134  // join dangling threads in case of LOG(ERROR) above
4135  for (auto& p : threads) {
4136  p.wait();
4137  }
4138  }
4139 
4140  checkpoint(start_epoch);
4141 
4142  // must set import_status.load_truncated before closing this end of pipe
4143  // otherwise, the thread on the other end would throw an unwanted 'write()'
4144  // exception
4145  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
4146  import_status.load_truncated = load_truncated;
4147 
4148  fclose(p_file);
4149  p_file = nullptr;
4150  return import_status;
4151 }
4152 
4154  if (getTableDesc()->persistenceLevel ==
4155  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4156  getCatalog().checkpoint(getTableDesc()->tableId);
4157  }
4158 }
4159 
4161  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4162  getTableDesc()->tableId);
4163 }
4164 
4165 void Loader::setTableEpoch(const int32_t start_epoch) {
4167  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4168 }
4169 
4170 /* static */
4172  // for now we only support S3
4173  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4174  // only set if non-empty to allow GDAL defaults to persist
4175  // explicitly clear if empty to revert to default and not reuse a previous session's
4176  // keys
4177  if (copy_params.s3_region.size()) {
4178 #if DEBUG_AWS_AUTHENTICATION
4179  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4180 #endif
4181  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4182  } else {
4183 #if DEBUG_AWS_AUTHENTICATION
4184  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4185 #endif
4186  CPLSetConfigOption("AWS_REGION", nullptr);
4187  }
4188  if (copy_params.s3_endpoint.size()) {
4189 #if DEBUG_AWS_AUTHENTICATION
4190  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4191 #endif
4192  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4193  } else {
4194 #if DEBUG_AWS_AUTHENTICATION
4195  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4196 #endif
4197  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4198  }
4199  if (copy_params.s3_access_key.size()) {
4200 #if DEBUG_AWS_AUTHENTICATION
4201  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4202  << "'";
4203 #endif
4204  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4205  } else {
4206 #if DEBUG_AWS_AUTHENTICATION
4207  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4208 #endif
4209  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4210  }
4211  if (copy_params.s3_secret_key.size()) {
4212 #if DEBUG_AWS_AUTHENTICATION
4213  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4214  << "'";
4215 #endif
4216  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4217  } else {
4218 #if DEBUG_AWS_AUTHENTICATION
4219  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4220 #endif
4221  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4222  }
4223 
4224 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4225  // if we haven't set keys, we need to disable signed access
4226  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4227 #if DEBUG_AWS_AUTHENTICATION
4228  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4229 #endif
4230  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4231  } else {
4232 #if DEBUG_AWS_AUTHENTICATION
4233  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4234 #endif
4235  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4236  }
4237 #endif
4238 }
4239 
4240 /* static */
4241 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4242  const CopyParams& copy_params) {
4243  // lazy init GDAL
4244  GDAL::init();
4245 
4246  // set authorization tokens
4247  setGDALAuthorizationTokens(copy_params);
4248 
4249  // open the file
4250  OGRDataSource* poDS;
4251 #if GDAL_VERSION_MAJOR == 1
4252  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4253 #else
4254  poDS = (OGRDataSource*)GDALOpenEx(
4255  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4256  if (poDS == nullptr) {
4257  poDS = (OGRDataSource*)GDALOpenEx(
4258  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4259  if (poDS) {
4260  LOG(INFO) << "openGDALDataset had to open as read-only";
4261  }
4262  }
4263 #endif
4264  if (poDS == nullptr) {
4265  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4266  }
4267  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4268  // in a memory leak if GDAL successfully opened the input dataset.
4269  return poDS;
4270 }
4271 
4272 namespace {
4273 
4274 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4275  const OGRDataSourceUqPtr& poDS,
4276  const std::string& file_name) {
4277  // get layer with specified name, or default to first layer
4278  OGRLayer* poLayer = nullptr;
4279  if (geo_layer_name.size()) {
4280  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4281  if (poLayer == nullptr) {
4282  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4283  file_name);
4284  }
4285  } else {
4286  poLayer = poDS->GetLayer(0);
4287  if (poLayer == nullptr) {
4288  throw std::runtime_error("No layers found in " + file_name);
4289  }
4290  }
4291  return *poLayer;
4292 }
4293 
4294 } // namespace
4295 
4296 /* static */
4298  const std::string& file_name,
4299  const std::string& geo_column_name,
4300  std::map<std::string, std::vector<std::string>>& metadata,
4301  int rowLimit,
4302  const CopyParams& copy_params) {
4303  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4304  if (poDS == nullptr) {
4305  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4306  file_name);
4307  }
4308 
4309  OGRLayer& layer =
4310  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4311 
4312  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4313  CHECK(poFDefn);
4314 
4315  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4316  auto nFeats = layer.GetFeatureCount();
4317  size_t numFeatures =
4318  std::max(static_cast<decltype(nFeats)>(0),
4319  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4320  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4321  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4322  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4323  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4324  }
4325  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4326  layer.ResetReading();
4327  size_t iFeature = 0;
4328  while (iFeature < numFeatures) {
4329  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4330  if (!poFeature) {
4331  break;
4332  }
4333 
4334  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4335  if (poGeometry != nullptr) {
4336  // validate geom type (again?)
4337  switch (wkbFlatten(poGeometry->getGeometryType())) {
4338  case wkbPoint:
4339  case wkbLineString:
4340  case wkbPolygon:
4341  case wkbMultiPolygon:
4342  break;
4343  case wkbMultiPoint:
4344  case wkbMultiLineString:
4345  // supported if geo_explode_collections is specified
4346  if (!copy_params.geo_explode_collections) {
4347  throw std::runtime_error("Unsupported geometry type: " +
4348  std::string(poGeometry->getGeometryName()));
4349  }
4350  break;
4351  default:
4352  throw std::runtime_error("Unsupported geometry type: " +
4353  std::string(poGeometry->getGeometryName()));
4354  }
4355 
4356  // populate metadata for regular fields
4357  for (auto i : metadata) {
4358  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4359  if (iField >= 0) { // geom is -1
4360  metadata[i.first].at(iFeature) =
4361  std::string(poFeature->GetFieldAsString(iField));
4362  }
4363  }
4364 
4365  // populate metadata for geo column with WKT string
4366  char* wkts = nullptr;
4367  poGeometry->exportToWkt(&wkts);
4368  CHECK(wkts);
4369  metadata[geo_column_name].at(iFeature) = wkts;
4370  CPLFree(wkts);
4371  }
4372  iFeature++;
4373  }
4374 }
4375 
4376 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4377  switch (ogr_type) {
4378  case OFTInteger:
4379  return std::make_pair(kINT, false);
4380  case OFTIntegerList:
4381  return std::make_pair(kINT, true);
4382 #if GDAL_VERSION_MAJOR > 1
4383  case OFTInteger64:
4384  return std::make_pair(kBIGINT, false);
4385  case OFTInteger64List:
4386  return std::make_pair(kBIGINT, true);
4387 #endif
4388  case OFTReal:
4389  return std::make_pair(kDOUBLE, false);
4390  case OFTRealList:
4391  return std::make_pair(kDOUBLE, true);
4392  case OFTString:
4393  return std::make_pair(kTEXT, false);
4394  case OFTStringList:
4395  return std::make_pair(kTEXT, true);
4396  case OFTDate:
4397  return std::make_pair(kDATE, false);
4398  case OFTTime:
4399  return std::make_pair(kTIME, false);
4400  case OFTDateTime:
4401  return std::make_pair(kTIMESTAMP, false);
4402  case OFTBinary:
4403  // Interpret binary blobs as byte arrays here
4404  // but actual import will store NULL as GDAL will not
4405  // extract the blob (OGRFeature::GetFieldAsString will
4406  // result in the import buffers having an empty string)
4407  return std::make_pair(kTINYINT, true);
4408  default:
4409  break;
4410  }
4411  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4412 }
4413 
4414 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4415  switch (ogr_type) {
4416  case wkbPoint:
4417  return kPOINT;
4418  case wkbLineString:
4419  return kLINESTRING;
4420  case wkbPolygon:
4421  return kPOLYGON;
4422  case wkbMultiPolygon:
4423  return kMULTIPOLYGON;
4424  default:
4425  break;
4426  }
4427  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4428 }
4429 
4430 /* static */
4431 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4432  const std::string& file_name,
4433  const std::string& geo_column_name,
4434  const CopyParams& copy_params) {
4435  std::list<ColumnDescriptor> cds;
4436 
4437  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4438  if (poDS == nullptr) {
4439  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4440  file_name);
4441  }
4442 
4443  OGRLayer& layer =
4444  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4445 
4446  layer.ResetReading();
4447  // TODO(andrewseidl): support multiple features
4448  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4449  if (poFeature == nullptr) {
4450  throw std::runtime_error("No features found in " + file_name);
4451  }
4452  // get fields as regular columns
4453  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4454  CHECK(poFDefn);
4455  int iField;
4456  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4457  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4458  auto typePair = ogr_to_type(poFieldDefn->GetType());
4459  ColumnDescriptor cd;
4460  cd.columnName = poFieldDefn->GetNameRef();
4461  cd.sourceName = poFieldDefn->GetNameRef();
4462  SQLTypeInfo ti;
4463  if (typePair.second) {
4464  ti.set_type(kARRAY);
4465  ti.set_subtype(typePair.first);
4466  } else {
4467  ti.set_type(typePair.first);
4468  }
4469  if (typePair.first == kTEXT) {
4471  ti.set_comp_param(32);
4472  }
4473  ti.set_fixed_size();
4474  cd.columnType = ti;
4475  cds.push_back(cd);
4476  }
4477  // get geo column, if any
4478  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4479  if (poGeometry) {
4480  ColumnDescriptor cd;
4481  cd.columnName = geo_column_name;
4482  cd.sourceName = geo_column_name;
4483 
4484  // get GDAL type
4485  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4486 
4487  // if exploding, override any collection type to child type
4488  if (copy_params.geo_explode_collections) {
4489  if (ogr_type == wkbMultiPolygon) {
4490  ogr_type = wkbPolygon;
4491  } else if (ogr_type == wkbMultiLineString) {
4492  ogr_type = wkbLineString;
4493  } else if (ogr_type == wkbMultiPoint) {
4494  ogr_type = wkbPoint;
4495  }
4496  }
4497 
4498  // convert to internal type
4499  SQLTypes geoType = ogr_to_type(ogr_type);
4500 
4501  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4503  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4504  }
4505 
4506  // build full internal type
4507  SQLTypeInfo ti;
4508  ti.set_type(geoType);
4509  ti.set_subtype(copy_params.geo_coords_type);
4510  ti.set_input_srid(copy_params.geo_coords_srid);
4511  ti.set_output_srid(copy_params.geo_coords_srid);
4512  ti.set_compression(copy_params.geo_coords_encoding);
4513  ti.set_comp_param(copy_params.geo_coords_comp_param);
4514  cd.columnType = ti;
4515 
4516  cds.push_back(cd);
4517  }
4518  return cds;
4519 }
4520 
4521 bool Importer::gdalStatInternal(const std::string& path,
4522  const CopyParams& copy_params,
4523  bool also_dir) {
4524  // lazy init GDAL
4525  GDAL::init();
4526 
4527  // set authorization tokens
4528  setGDALAuthorizationTokens(copy_params);
4529 
4530 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4531  // clear GDAL stat cache
4532  // without this, file existence will be cached, even if authentication changes
4533  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4534  VSICurlClearCache();
4535 #endif
4536 
4537  // stat path
4538  VSIStatBufL sb;
4539  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4540  if (result < 0) {
4541  return false;
4542  }
4543 
4544  // exists?
4545  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4546  return true;
4547  } else if (VSI_ISREG(sb.st_mode)) {
4548  return true;
4549  }
4550  return false;
4551 }
4552 
4553 /* static */
4554 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4555  return gdalStatInternal(path, copy_params, false);
4556 }
4557 
4558 /* static */
4559 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4560  const CopyParams& copy_params) {
4561  return gdalStatInternal(path, copy_params, true);
4562 }
4563 
4564 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4565  std::vector<std::string>& files) {
4566  // prepare to gather subdirectories
4567  std::vector<std::string> subdirectories;
4568 
4569  // get entries
4570  char** entries = VSIReadDir(archive_path.c_str());
4571  if (!entries) {
4572  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4573  return;
4574  }
4575 
4576  // force scope
4577  {
4578  // request clean-up
4579  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4580 
4581  // check all the entries
4582  int index = 0;
4583  while (true) {
4584  // get next entry, or drop out if there isn't one
4585  char* entry_c = entries[index++];
4586  if (!entry_c) {
4587  break;
4588  }
4589  std::string entry(entry_c);
4590 
4591  // ignore '.' and '..'
4592  if (entry == "." || entry == "..") {
4593  continue;
4594  }
4595 
4596  // build the full path
4597  std::string entry_path = archive_path + std::string("/") + entry;
4598 
4599  // is it a file or a sub-folder
4600  VSIStatBufL sb;
4601  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4602  if (result < 0) {
4603  break;
4604  }
4605 
4606  if (VSI_ISDIR(sb.st_mode)) {
4607  // a directory that ends with .gdb could be a Geodatabase bundle
4608  // arguably dangerous to decide this purely by name, but any further
4609  // validation would be very complex especially at this scope
4610  if (boost::iends_with(entry_path, ".gdb")) {
4611  // add the directory as if it was a file and don't recurse into it
4612  files.push_back(entry_path);
4613  } else {
4614  // add subdirectory to be recursed into
4615  subdirectories.push_back(entry_path);
4616  }
4617  } else {
4618  // add this file
4619  files.push_back(entry_path);
4620  }
4621  }
4622  }
4623 
4624  // recurse into each subdirectories we found
4625  for (const auto& subdirectory : subdirectories) {
4626  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4627  }
4628 }
4629 
4630 /* static */
4631 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4632  const std::string& archive_path,
4633  const CopyParams& copy_params) {
4634  // lazy init GDAL
4635  GDAL::init();
4636 
4637  // set authorization tokens
4638  setGDALAuthorizationTokens(copy_params);
4639 
4640  // prepare to gather files
4641  std::vector<std::string> files;
4642 
4643  // gather the files recursively
4644  gdalGatherFilesInArchiveRecursive(archive_path, files);
4645 
4646  // convert to relative paths inside archive
4647  for (auto& file : files) {
4648  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4649  }
4650 
4651  // done
4652  return files;
4653 }
4654 
4655 /* static */
4656 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4657  const std::string& file_name,
4658  const CopyParams& copy_params) {
4659  // lazy init GDAL
4660  GDAL::init();
4661 
4662  // set authorization tokens
4663  setGDALAuthorizationTokens(copy_params);
4664 
4665  // prepare to gather layer info
4666  std::vector<GeoFileLayerInfo> layer_info;
4667 
4668  // open the data set
4669  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4670  if (poDS == nullptr) {
4671  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4672  file_name);
4673  }
4674 
4675  // enumerate the layers
4676  for (auto&& poLayer : poDS->GetLayers()) {
4678  // prepare to read this layer
4679  poLayer->ResetReading();
4680  // skip layer if empty
4681  if (poLayer->GetFeatureCount() > 0) {
4682  // get first feature
4683  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4684  CHECK(first_feature);
4685  // check feature for geometry
4686  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4687  if (!geometry) {
4688  // layer has no geometry
4689  contents = GeoFileLayerContents::NON_GEO;
4690  } else {
4691  // check the geometry type
4692  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4693  switch (wkbFlatten(geometry_type)) {
4694  case wkbPoint:
4695  case wkbLineString:
4696  case wkbPolygon:
4697  case wkbMultiPolygon:
4698  // layer has supported geo
4699  contents = GeoFileLayerContents::GEO;
4700  break;
4701  case wkbMultiPoint:
4702  case wkbMultiLineString:
4703  // supported if geo_explode_collections is specified
4704  contents = copy_params.geo_explode_collections
4707  break;
4708  default:
4709  // layer has unsupported geometry
4711  break;
4712  }
4713  }
4714  }
4715  // store info for this layer
4716  layer_info.emplace_back(poLayer->GetName(), contents);
4717  }
4718 
4719  // done
4720  return layer_info;
4721 }
4722 
4724  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4725  // initial status
4726  bool load_truncated = false;
4728 
4730  if (poDS == nullptr) {
4731  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4732  file_path);
4733  }
4734 
4735  OGRLayer& layer =
4737 
4738  // get the number of features in this layer
4739  size_t numFeatures = layer.GetFeatureCount();
4740 
4741  // build map of metadata field (additional columns) name to index
4742  // use shared_ptr since we need to pass it to the worker
4743  FieldNameToIndexMapType fieldNameToIndexMap;
4744  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4745  CHECK(poFDefn);
4746  size_t numFields = poFDefn->GetFieldCount();
4747  for (size_t iField = 0; iField < numFields; iField++) {
4748  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4749  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4750  }
4751 
4752  // the geographic spatial reference we want to put everything in
4753  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4754  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4755 
4756 #if GDAL_VERSION_MAJOR >= 3
4757  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4758  // this results in X and Y being transposed for angle-based
4759  // coordinate systems. This restores the previous behavior.
4760  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4761 #endif
4762 
4763 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4764  // just one "thread"
4765  size_t max_threads = 1;
4766 #else
4767  // how many threads to use
4768  size_t max_threads = 0;
4769  if (copy_params.threads == 0) {
4770  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4771  } else {
4772  max_threads = copy_params.threads;
4773  }
4774 #endif
4775 
4776  // make an import buffer for each thread
4777  CHECK_EQ(import_buffers_vec.size(), 0u);
4778  import_buffers_vec.resize(max_threads);
4779  for (size_t i = 0; i < max_threads; i++) {
4780  for (const auto cd : loader->get_column_descs()) {
4781  import_buffers_vec[i].emplace_back(
4782  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4783  }
4784  }
4785 
4786  // make render group analyzers for each poly column
4787  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4789  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4790  loader->getTableDesc()->tableId, false, false, false);
4791  for (auto cd : columnDescriptors) {
4792  SQLTypes ct = cd->columnType.get_type();
4793  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4794  auto rga = std::make_shared<RenderGroupAnalyzer>();
4795  rga->seedFromExistingTableContents(loader, cd->columnName);
4796  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4797  }
4798  }
4799  }
4800 
4801 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4802  // threads
4803  std::list<std::future<ImportStatus>> threads;
4804 
4805  // use a stack to track thread_ids which must not overlap among threads
4806  // because thread_id is used to index import_buffers_vec[]
4807  std::stack<size_t> stack_thread_ids;
4808  for (size_t i = 0; i < max_threads; i++) {
4809  stack_thread_ids.push(i);
4810  }
4811 #endif
4812 
4813  // checkpoint the table
4814  auto start_epoch = loader->getTableEpoch();
4815 
4816  // reset the layer
4817  layer.ResetReading();
4818 
4819  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4820 
4821  // make a features buffer for each thread
4822  std::vector<FeaturePtrVector> features(max_threads);
4823 
4824  // for each feature...
4825  size_t firstFeatureThisChunk = 0;
4826  while (firstFeatureThisChunk < numFeatures) {
4827  // how many features this chunk
4828  size_t numFeaturesThisChunk =
4829  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4830 
4831 // get a thread_id not in use
4832 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4833  size_t thread_id = 0;
4834 #else
4835  auto thread_id = stack_thread_ids.top();
4836  stack_thread_ids.pop();
4837  CHECK(thread_id < max_threads);
4838 #endif
4839 
4840  // fill features buffer for new thread
4841  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4842  features[thread_id].emplace_back(layer.GetNextFeature());
4843  }
4844 
4845 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4846  // call worker function directly
4847  auto ret_import_status = import_thread_shapefile(0,
4848  this,
4849  poGeographicSR.get(),
4850  std::move(features[thread_id]),
4851  firstFeatureThisChunk,
4852  numFeaturesThisChunk,
4853  fieldNameToIndexMap,
4854  columnNameToSourceNameMap,
4855  columnIdToRenderGroupAnalyzerMap);
4856  import_status += ret_import_status;
4857  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4858  import_status.rows_completed;
4859  set_import_status(import_id, import_status);
4860 #else
4861  // fire up that thread to import this geometry
4862  threads.push_back(std::async(std::launch::async,
4864  thread_id,
4865  this,
4866  poGeographicSR.get(),
4867  std::move(features[thread_id]),
4868  firstFeatureThisChunk,
4869  numFeaturesThisChunk,
4870  fieldNameToIndexMap,
4871  columnNameToSourceNameMap,
4872  columnIdToRenderGroupAnalyzerMap));
4873 
4874  // let the threads run
4875  while (threads.size() > 0) {
4876  int nready = 0;
4877  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4878  it != threads.end();) {
4879  auto& p = *it;
4880  std::chrono::milliseconds span(
4881  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4882  if (p.wait_for(span) == std::future_status::ready) {
4883  auto ret_import_status = p.get();
4884  import_status += ret_import_status;
4885  import_status.rows_estimated =
4886  ((float)firstFeatureThisChunk / (float)numFeatures) *
4887  import_status.rows_completed;
4888  set_import_status(import_id, import_status);
4889 
4890  // recall thread_id for reuse
4891  stack_thread_ids.push(ret_import_status.thread_id);
4892 
4893  threads.erase(it++);
4894  ++nready;
4895  } else {
4896  ++it;
4897  }
4898  }
4899 
4900  if (nready == 0) {
4901  std::this_thread::yield();
4902  }
4903 
4904  // keep reading if any free thread slot
4905  // this is one of the major difference from old threading model !!
4906  if (threads.size() < max_threads) {
4907  break;
4908  }
4909  }
4910 #endif
4911 
4912  // out of rows?
4913  if (import_status.rows_rejected > copy_params.max_reject) {
4914  load_truncated = true;
4915  load_failed = true;
4916  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4917  break;
4918  }
4919 
4920  // failed?
4921  if (load_failed) {
4922  load_truncated = true;
4923  LOG(ERROR)
4924  << "A call to the Loader::load failed, Please review the logs for more details";
4925  break;
4926  }
4927 
4928  firstFeatureThisChunk += numFeaturesThisChunk;
4929  }
4930 
4931 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4932  // wait for any remaining threads
4933  if (threads.size()) {
4934  for (auto& p : threads) {
4935  // wait for the thread
4936  p.wait();
4937  // get the result and update the final import status
4938  auto ret_import_status = p.get();
4939  import_status += ret_import_status;
4942  }
4943  }
4944 #endif
4945 
4946  checkpoint(start_epoch);
4947 
4948  // must set import_status.load_truncated before closing this end of pipe
4949  // otherwise, the thread on the other end would throw an unwanted 'write()'
4950  // exception
4951  import_status.load_truncated = load_truncated;
4952  return import_status;
4953 }
4954 
4955 //
4956 // class RenderGroupAnalyzer
4957 //
4958 
4960  const std::unique_ptr<Loader>& loader,
4961  const std::string& geoColumnBaseName) {
4962  // start timer
4963  auto seedTimer = timer_start();
4964 
4965  // start with a fresh tree
4966  _rtree = nullptr;
4967  _numRenderGroups = 0;
4968 
4969  // get the table descriptor
4970  const auto& cat = loader->getCatalog();
4971  if (loader->getTableDesc()->storageType == StorageType::FOREIGN_TABLE) {
4973  LOG(INFO) << "DEBUG: Table is a foreign table";
4974  }
4975  _rtree = std::make_unique<RTree>();
4976  CHECK(_rtree);
4977  return;
4978  }
4979 
4980  const std::string& tableName = loader->getTableDesc()->tableName;
4981  const auto td = cat.getMetadataForTable(tableName);
4982  CHECK(td);
4983  CHECK(td->fragmenter);
4984 
4985  // if the table is empty, just make an empty tree
4986  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4988  LOG(INFO) << "DEBUG: Table is empty!";
4989  }
4990  _rtree = std::make_unique<RTree>();
4991  CHECK(_rtree);
4992  return;
4993  }
4994 
4995  // no seeding possible without these two columns
4996  const auto cd_bounds =
4997  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
4998  const auto cd_render_group =
4999  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
5000  if (!cd_bounds || !cd_render_group) {
5001  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5002  " doesn't have bounds or render_group columns!");
5003  }
5004 
5005  // and validate their types
5006  if (cd_bounds->columnType.get_type() != kARRAY ||
5007  cd_bounds->columnType.get_subtype() != kDOUBLE) {
5008  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5009  " bounds column is wrong type!");
5010  }
5011  if (cd_render_group->columnType.get_type() != kINT) {
5012  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5013  " render_group column is wrong type!");
5014  }
5015 
5016  // get chunk accessor table
5017  auto chunkAccessorTable = getChunkAccessorTable(
5018  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
5019  const auto table_count = std::get<0>(chunkAccessorTable.back());
5020 
5022  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
5023  << "'";
5024  }
5025 
5026  std::vector<Node> nodes;
5027  try {
5028  nodes.resize(table_count);
5029  } catch (const std::exception& e) {
5030  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
5031  std::to_string(table_count) + " rows");
5032  }
5033 
5034  for (size_t row = 0; row < table_count; row++) {
5035  ArrayDatum ad;
5036  VarlenDatum vd;
5037  bool is_end;
5038 
5039  // get ChunkIters and fragment row offset
5040  size_t rowOffset = 0;
5041  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
5042  auto& boundsChunkIter = chunkIters[0];
5043  auto& renderGroupChunkIter = chunkIters[1];
5044 
5045  // get bounds values
5046  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
5047  CHECK(!is_end);
5048  CHECK(ad.pointer);
5049  int numBounds = (int)(ad.length / sizeof(double));
5050  CHECK(numBounds == 4);
5051 
5052  // convert to bounding box
5053  double* bounds = reinterpret_cast<double*>(ad.pointer);
5054  BoundingBox bounding_box;
5055  boost::geometry::assign_inverse(bounding_box);
5056  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5057  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5058 
5059  // get render group
5060  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
5061  CHECK(!is_end);
5062  CHECK(vd.pointer);
5063  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
5064  CHECK_GE(renderGroup, 0);
5065 
5066  // store
5067  nodes[row] = std::make_pair(bounding_box, renderGroup);
5068 
5069  // how many render groups do we have now?
5070  if (renderGroup >= _numRenderGroups) {
5071  _numRenderGroups = renderGroup + 1;
5072  }
5073 
5075  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
5076  }
5077  }
5078 
5079  // bulk-load the tree
5080  auto bulk_load_timer = timer_start();
5081  _rtree = std::make_unique<RTree>(nodes);
5082  CHECK(_rtree);
5083  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
5084  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
5085  << timer_stop(bulk_load_timer) << " ms for tree)";
5086 
5088  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
5089  }
5090 }
5091 
5093  const std::vector<double>& bounds) {
5094  // validate
5095  CHECK(bounds.size() == 4);
5096 
5097  // get bounds
5098  BoundingBox bounding_box;
5099  boost::geometry::assign_inverse(bounding_box);
5100  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5101  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5102 
5103  // remainder under mutex to allow this to be multi-threaded
5104  std::lock_guard<std::mutex> guard(_rtreeMutex);
5105 
5106  // get the intersecting nodes
5107  std::vector<Node> intersects;
5108  _rtree->query(boost::geometry::index::intersects(bounding_box),
5109  std::back_inserter(intersects));
5110 
5111  // build bitset of render groups of the intersecting rectangles
5112  // clear bit means available, allows use of find_first()
5113  boost::dynamic_bitset<> bits(_numRenderGroups);
5114  bits.set();
5115  for (const auto& intersection : intersects) {
5116  CHECK(intersection.second < _numRenderGroups);
5117  bits.reset(intersection.second);
5118  }
5119 
5120  // find first available group
5121  int firstAvailableRenderGroup = 0;
5122  size_t firstSetBit = bits.find_first();
5123  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5124  // all known groups represented, add a new one
5125  firstAvailableRenderGroup = _numRenderGroups;
5126  _numRenderGroups++;
5127  } else {
5128  firstAvailableRenderGroup = (int)firstSetBit;
5129  }
5130 
5131  // insert new node
5132  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5133 
5134  // return it
5135  return firstAvailableRenderGroup;
5136 }
5137 
5138 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
5139  const TableDescriptor* td,
5140  Loader* loader) {
5141  CHECK(td);
5142  auto col_descs = loader->get_column_descs();
5143 
5144  std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
5145  for (auto cd : col_descs) {
5146  import_buffers.emplace_back(
5147  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
5148  }
5149 
5150  return import_buffers;
5151 }
5152 
5153 } // namespace import_export
int8_t tinyintval
Definition: sqltypes.h:134
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2879
std::pair< size_t, size_t > ArraySliceRange
Definition: Importer.h:72
virtual bool loadNoCheckpoint(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2502
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:541
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:259
void set_compression(EncodingType c)
Definition: sqltypes.h:358
std::map< std::string, size_t > FieldNameToIndexMapType
Definition: Importer.cpp:133
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string s3_secret_key
Definition: CopyParams.h:63
std::mutex loader_mutex_
Definition: Importer.h:599
std::vector< int > ChunkKey
Definition: types.h:35
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:280
#define NULL_DOUBLE
Definition: sqltypes.h:185
HOST DEVICE int get_size() const
Definition: sqltypes.h:268
void addBigint(const int64_t v)
Definition: Importer.h:231
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2992
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
std::string cat(Ts &&...args)
const int8_t const int64_t * num_rows
void addSmallint(const int16_t v)
Definition: Importer.h:227
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
Definition: sqltypes.h:50
SQLTypes
Definition: sqltypes.h:39
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:149
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4723
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:150
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:113
std::vector< OGRFeatureUqPtr > FeaturePtrVector
Definition: Importer.cpp:137
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:78
auto value_getter(const Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1714
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
void dropColumns(const std::vector< int > &columns)
Definition: Importer.cpp:2845
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:193
std::vector< std::string > * string_buffer_
Definition: Importer.h:498
void addString(const std::string_view v)
Definition: Importer.h:237
#define LOG(tag)
Definition: Logger.h:188
std::vector< ArrayDatum > * array_buffer_
Definition: Importer.h:500
bool boolval
Definition: sqltypes.h:133
void find_best_sqltypes_and_headers()
Definition: Importer.cpp:3112
StringDictionary * string_dict_
Definition: Importer.h:510
void add_value(const ColumnDescriptor *cd, const std::string_view val, const bool is_null, const CopyParams &copy_params, const int64_t replicate_count=0)
Definition: Importer.cpp:499
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3494
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4274
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:396
void set_replicate_count(const int64_t replicate_count)
Definition: Importer.h:474
void addDouble(const double v)
Definition: Importer.h:235
size_t find_beginning(const char *buffer, size_t begin, size_t end, const import_export::CopyParams &copy_params)
Finds the closest possible row beginning in the given buffer.
ArrayDatum NullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:368
Importer(Catalog_Namespace::Catalog &c, const TableDescriptor *t, const std::string &f, const CopyParams &p)
Definition: Importer.cpp:150
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:493
Datum TDatumToDatum(const TDatum &datum, SQLTypeInfo &ti)
Definition: Importer.cpp:407
const bool * get_is_array() const
Definition: Importer.h:753
const TableDescriptor * table_desc_
Definition: Importer.h:584
#define UNREACHABLE()
Definition: Logger.h:241
virtual void checkpoint()
Definition: Importer.cpp:4153
std::vector< std::unique_ptr< TypedImportBuffer > > setup_column_loaders(const TableDescriptor *td, Loader *loader)
Definition: Importer.cpp:5138
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:349
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
static void init()
Definition: GDAL.cpp:56
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2688
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:148
std::vector< SQLTypes > best_sqltypes
Definition: Importer.h:670
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4656
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:608
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:94
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3259
std::vector< float > * float_buffer_
Definition: Importer.h:496
void checkpoint(const int32_t start_epoch)
Definition: Importer.cpp:3266
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
std::vector< std::unique_ptr< TypedImportBuffer > > & get_import_buffers(int i)
Definition: Importer.h:750
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:103
static bool gdalStatInternal(const std::string &path, const CopyParams &copy_params, bool also_dir)
Definition: Importer.cpp:4521
bool is_number() const
Definition: sqltypes.h:420
#define CHECK_GT(x, y)
Definition: Logger.h:209
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
int32_t intval
Definition: sqltypes.h:136
bool is_time() const
Definition: sqltypes.h:421
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4554
std::string sourceName
std::vector< double > * double_buffer_
Definition: Importer.h:497
std::string to_string(char const *&&v)
void addFloat(const float v)
Definition: Importer.h:233
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:587
size_t add_values(const ColumnDescriptor *cd, const TColumn &data)
Definition: Importer.cpp:949
int8_t * pointer
Definition: sqltypes.h:74
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords)
Definition: Importer.cpp:1409
#define DEBUG_TIMING
Definition: Importer.cpp:139
void init(const bool use_catalog_locks)
Definition: Importer.cpp:2855
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1421
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:130
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
void set_input_srid(int d)
Definition: sqltypes.h:352
float floatval
Definition: sqltypes.h:138
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed)=0
void addGeoString(const std::string_view v)
Definition: Importer.h:239
ImportHeaderRow has_header
Definition: CopyParams.h:48
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:3004
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:233
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3188
auto del_values(std::vector< DATA_TYPE > &buffer, BadRowsTracker *const bad_rows_tracker)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:576
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
int get_physical_cols() const
Definition: sqltypes.h:279
std::vector< int32_t > * int_buffer_
Definition: Importer.h:494
ArrayDatum TDatumToArrayDatum(const TDatum &datum, const SQLTypeInfo &ti)
Definition: Importer.cpp:452
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4171
CHECK(cgen_state)
void set_fixed_size()
Definition: sqltypes.h:357
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
void addBoolean(const int8_t v)
Definition: Importer.h:223
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::vector< uint8_t > * string_dict_i8_buffer_
Definition: Importer.h:504
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4431
void addTinyint(const int8_t v)
Definition: Importer.h:225
std::shared_timed_mutex mapd_shared_mutex
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3695
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
static void readMetadataSampleGDAL(const std::string &fileName, const std::string &geoColumnName, std::map< std::string, std::vector< std::string >> &metadata, int rowLimit, const CopyParams &copy_params)
Definition: Importer.cpp:4297
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:495
int8_t * getAsBytes() const
Definition: Importer.h:286
int64_t bigintval
Definition: sqltypes.h:137
#define NULL_FLOAT
Definition: sqltypes.h:184
void addInt(const int32_t v)
Definition: Importer.h:229