OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "Import/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <unistd.h>
30 
31 #include <boost/algorithm/string.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/filesystem.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
54 #include "../Archive/PosixFileArchive.h"
55 #include "../Archive/S3Archive.h"
56 #include "../QueryEngine/TypePunning.h"
57 #include "../Shared/geo_compression.h"
58 #include "../Shared/geo_types.h"
59 #include "../Shared/geosupport.h"
60 #include "../Shared/import_helpers.h"
61 #include "../Shared/mapd_glob.h"
62 #include "../Shared/mapdpath.h"
63 #include "../Shared/measure.h"
64 #include "../Shared/scope.h"
65 #include "../Shared/shard_key.h"
66 #include "../Shared/thread_count.h"
67 #include "ArrowImporter.h"
70 #include "Shared/Logger.h"
71 #include "Shared/SqlTypesLayout.h"
73 #include "gen-cpp/MapD.h"
74 
75 size_t g_archive_read_buf_size = 1 << 20;
76 
77 inline auto get_filesize(const std::string& file_path) {
78  boost::filesystem::path boost_file_path{file_path};
80  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
81  return ec ? 0 : filesize;
82 }
83 
84 namespace {
85 
87  void operator()(OGRDataSource* datasource) {
88  if (datasource) {
89  GDALClose(datasource);
90  }
91  }
92 };
93 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
94 
96  void operator()(OGRFeature* feature) {
97  if (feature) {
98  OGRFeature::DestroyFeature(feature);
99  }
100  }
101 };
102 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
103 
105  void operator()(OGRSpatialReference* ref) {
106  if (ref) {
107  OGRSpatialReference::DestroySpatialReference(ref);
108  }
109  }
110 };
112  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
113 
114 } // namespace
115 
116 // For logging std::vector<std::string> row.
117 namespace boost {
118 namespace log {
119 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
120  out << '[';
121  for (size_t i = 0; i < row.size(); ++i) {
122  out << (i ? ", " : "") << row[i];
123  }
124  out << ']';
125  return out;
126 }
127 } // namespace log
128 } // namespace boost
129 
130 namespace Importer_NS {
131 
132 using FieldNameToIndexMapType = std::map<std::string, size_t>;
133 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
135  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
136 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
137 
138 #define DEBUG_TIMING false
139 #define DEBUG_RENDER_GROUP_ANALYZER 0
140 #define DEBUG_AWS_AUTHENTICATION 0
141 
142 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
143 
144 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
145 
147 static std::map<std::string, ImportStatus> import_status_map;
148 
150  const TableDescriptor* t,
151  const std::string& f,
152  const CopyParams& p)
153  : Importer(new Loader(c, t), f, p) {}
154 
155 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
195 
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }
207 
208 ImportStatus Importer::get_import_status(const std::string& import_id) {
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
212 
213 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
218 }
219 
220 static const std::string trim_space(const char* field, const size_t len) {
221  size_t i = 0;
222  size_t j = len;
223  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
224  i++;
225  }
226  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
227  j--;
228  }
229  return std::string(field + i, j - i);
230 }
231 
232 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti) {
233  switch (ti.get_type()) {
234  case kBOOLEAN:
235  *(bool*)buf = d.boolval;
236  return buf + sizeof(bool);
237  case kNUMERIC:
238  case kDECIMAL:
239  case kBIGINT:
240  *(int64_t*)buf = d.bigintval;
241  return buf + sizeof(int64_t);
242  case kINT:
243  *(int32_t*)buf = d.intval;
244  return buf + sizeof(int32_t);
245  case kSMALLINT:
246  *(int16_t*)buf = d.smallintval;
247  return buf + sizeof(int16_t);
248  case kTINYINT:
249  *(int8_t*)buf = d.tinyintval;
250  return buf + sizeof(int8_t);
251  case kFLOAT:
252  *(float*)buf = d.floatval;
253  return buf + sizeof(float);
254  case kDOUBLE:
255  *(double*)buf = d.doubleval;
256  return buf + sizeof(double);
257  case kTIME:
258  case kTIMESTAMP:
259  case kDATE:
260  *reinterpret_cast<int64_t*>(buf) = d.bigintval;
261  return buf + sizeof(int64_t);
262  default:
263  return NULL;
264  }
265  return NULL;
266 }
267 
269  Datum d;
270  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
271  switch (type) {
272  case kBOOLEAN:
274  break;
275  case kBIGINT:
277  break;
278  case kINT:
280  break;
281  case kSMALLINT:
283  break;
284  case kTINYINT:
286  break;
287  case kFLOAT:
288  d.floatval = NULL_FLOAT;
289  break;
290  case kDOUBLE:
292  break;
293  case kTIME:
294  case kTIMESTAMP:
295  case kDATE:
297  break;
298  case kPOINT:
299  case kLINESTRING:
300  case kPOLYGON:
301  case kMULTIPOLYGON:
302  throw std::runtime_error("Internal error: geometry type in NullDatum.");
303  default:
304  throw std::runtime_error("Internal error: invalid type in NullDatum.");
305  }
306  return d;
307 }
308 
310  Datum d;
311  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
312  switch (type) {
313  case kBOOLEAN:
315  break;
316  case kBIGINT:
318  break;
319  case kINT:
321  break;
322  case kSMALLINT:
324  break;
325  case kTINYINT:
327  break;
328  case kFLOAT:
330  break;
331  case kDOUBLE:
333  break;
334  case kTIME:
335  case kTIMESTAMP:
336  case kDATE:
338  break;
339  case kPOINT:
340  case kLINESTRING:
341  case kPOLYGON:
342  case kMULTIPOLYGON:
343  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
344  default:
345  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
346  }
347  return d;
348 }
349 
350 ArrayDatum StringToArray(const std::string& s,
351  const SQLTypeInfo& ti,
352  const CopyParams& copy_params) {
353  SQLTypeInfo elem_ti = ti.get_elem_type();
354  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
355  return ArrayDatum(0, NULL, true);
356  }
357  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
358  LOG(WARNING) << "Malformed array: " << s;
359  return ArrayDatum(0, NULL, true);
360  }
361  std::vector<std::string> elem_strs;
362  size_t last = 1;
363  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
364  i = s.find(copy_params.array_delim, last)) {
365  elem_strs.push_back(s.substr(last, i - last));
366  last = i + 1;
367  }
368  if (last + 1 <= s.size()) {
369  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
370  }
371  if (elem_strs.size() == 1) {
372  auto str = elem_strs.front();
373  auto str_trimmed = trim_space(str.c_str(), str.length());
374  if (str_trimmed == "") {
375  elem_strs.clear(); // Empty array
376  }
377  }
378  if (!elem_ti.is_string()) {
379  size_t len = elem_strs.size() * elem_ti.get_size();
380  int8_t* buf = (int8_t*)checked_malloc(len);
381  int8_t* p = buf;
382  for (auto& es : elem_strs) {
383  auto e = trim_space(es.c_str(), es.length());
384  bool is_null = (e == copy_params.null_str) || e == "NULL";
385  if (!elem_ti.is_string() && e == "") {
386  is_null = true;
387  }
388  if (elem_ti.is_number() || elem_ti.is_time()) {
389  if (!isdigit(e[0]) && e[0] != '-') {
390  is_null = true;
391  }
392  }
393  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
394  p = appendDatum(p, d, elem_ti);
395  }
396  return ArrayDatum(len, buf, false);
397  }
398  // must not be called for array of strings
399  CHECK(false);
400  return ArrayDatum(0, NULL, true);
401 }
402 
404  SQLTypeInfo elem_ti = ti.get_elem_type();
405  auto len = ti.get_size();
406 
407  if (elem_ti.is_string()) {
408  // must not be called for array of strings
409  CHECK(false);
410  return ArrayDatum(0, NULL, true);
411  }
412 
413  if (len > 0) {
414  // Compose a NULL fixlen array
415  int8_t* buf = (int8_t*)checked_malloc(len);
416  // First scalar is a NULL_ARRAY sentinel
417  Datum d = NullArrayDatum(elem_ti);
418  int8_t* p = appendDatum(buf, d, elem_ti);
419  // Rest is filled with normal NULL sentinels
420  Datum d0 = NullDatum(elem_ti);
421  while ((p - buf) < len) {
422  p = appendDatum(p, d0, elem_ti);
423  }
424  CHECK((p - buf) == len);
425  return ArrayDatum(len, buf, true);
426  }
427  // NULL varlen array
428  return ArrayDatum(0, NULL, true);
429 }
430 
432  return NullArray(ti);
433 }
434 
435 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
436  const auto& arr = datum.val.arr_val;
437  for (const auto& elem_datum : arr) {
438  string_vec.push_back(elem_datum.val.str_val);
439  }
440 }
441 
442 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
443  Datum d;
444  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
445  switch (type) {
446  case kBOOLEAN:
447  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
448  break;
449  case kBIGINT:
450  d.bigintval =
451  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
452  break;
453  case kINT:
454  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
455  break;
456  case kSMALLINT:
457  d.smallintval =
458  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
459  break;
460  case kTINYINT:
461  d.tinyintval =
462  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
463  break;
464  case kFLOAT:
465  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
466  break;
467  case kDOUBLE:
468  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
469  break;
470  case kTIME:
471  case kTIMESTAMP:
472  case kDATE:
473  d.bigintval =
474  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
475  break;
476  case kPOINT:
477  case kLINESTRING:
478  case kPOLYGON:
479  case kMULTIPOLYGON:
480  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
481  default:
482  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
483  }
484  return d;
485 }
486 
487 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
488  SQLTypeInfo elem_ti = ti.get_elem_type();
489 
490  CHECK(!elem_ti.is_string());
491 
492  if (datum.is_null) {
493  return NullArray(ti);
494  }
495 
496  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
497  int8_t* buf = (int8_t*)checked_malloc(len);
498  int8_t* p = buf;
499  for (auto& e : datum.val.arr_val) {
500  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
501  }
502 
503  return ArrayDatum(len, buf, false);
504 }
505 
507  const std::string& val,
508  const bool is_null,
509  const CopyParams& copy_params,
510  const int64_t replicate_count) {
511  set_replicate_count(replicate_count);
512  const auto type = cd->columnType.get_type();
513  switch (type) {
514  case kBOOLEAN: {
515  if (is_null) {
516  if (cd->columnType.get_notnull()) {
517  throw std::runtime_error("NULL for column " + cd->columnName);
518  }
520  } else {
521  SQLTypeInfo ti = cd->columnType;
522  Datum d = StringToDatum(val, ti);
523  addBoolean((int8_t)d.boolval);
524  }
525  break;
526  }
527  case kTINYINT: {
528  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
529  addTinyint(std::stoi(val));
530  } else {
531  if (cd->columnType.get_notnull()) {
532  throw std::runtime_error("NULL for column " + cd->columnName);
533  }
535  }
536  break;
537  }
538  case kSMALLINT: {
539  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
540  addSmallint(std::stoi(val));
541  } else {
542  if (cd->columnType.get_notnull()) {
543  throw std::runtime_error("NULL for column " + cd->columnName);
544  }
546  }
547  break;
548  }
549  case kINT: {
550  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
551  addInt(std::stoi(val));
552  } else {
553  if (cd->columnType.get_notnull()) {
554  throw std::runtime_error("NULL for column " + cd->columnName);
555  }
557  }
558  break;
559  }
560  case kBIGINT: {
561  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
562  addBigint(std::stoll(val));
563  } else {
564  if (cd->columnType.get_notnull()) {
565  throw std::runtime_error("NULL for column " + cd->columnName);
566  }
568  }
569  break;
570  }
571  case kDECIMAL:
572  case kNUMERIC: {
573  if (!is_null) {
574  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
575  Datum d = StringToDatum(val, ti);
576  const auto converted_decimal_value =
578  addBigint(converted_decimal_value);
579  } else {
580  if (cd->columnType.get_notnull()) {
581  throw std::runtime_error("NULL for column " + cd->columnName);
582  }
584  }
585  break;
586  }
587  case kFLOAT:
588  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
589  addFloat((float)std::atof(val.c_str()));
590  } else {
591  if (cd->columnType.get_notnull()) {
592  throw std::runtime_error("NULL for column " + cd->columnName);
593  }
595  }
596  break;
597  case kDOUBLE:
598  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
599  addDouble(std::atof(val.c_str()));
600  } else {
601  if (cd->columnType.get_notnull()) {
602  throw std::runtime_error("NULL for column " + cd->columnName);
603  }
605  }
606  break;
607  case kTEXT:
608  case kVARCHAR:
609  case kCHAR: {
610  // @TODO(wei) for now, use empty string for nulls
611  if (is_null) {
612  if (cd->columnType.get_notnull()) {
613  throw std::runtime_error("NULL for column " + cd->columnName);
614  }
615  addString(std::string());
616  } else {
617  if (val.length() > StringDictionary::MAX_STRLEN) {
618  throw std::runtime_error("String too long for column " + cd->columnName +
619  " was " + std::to_string(val.length()) + " max is " +
621  }
622  addString(val);
623  }
624  break;
625  }
626  case kTIME:
627  case kTIMESTAMP:
628  case kDATE:
629  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
630  SQLTypeInfo ti = cd->columnType;
631  Datum d = StringToDatum(val, ti);
632  addBigint(d.bigintval);
633  } else {
634  if (cd->columnType.get_notnull()) {
635  throw std::runtime_error("NULL for column " + cd->columnName);
636  }
638  }
639  break;
640  case kARRAY: {
641  if (is_null && cd->columnType.get_notnull()) {
642  throw std::runtime_error("NULL for column " + cd->columnName);
643  }
644  SQLTypeInfo ti = cd->columnType;
645  if (IS_STRING(ti.get_subtype())) {
646  std::vector<std::string> string_vec;
647  // Just parse string array, don't push it to buffer yet as we might throw
648  Importer_NS::DelimitedParserUtils::parseStringArray(val, copy_params, string_vec);
649  if (!is_null) {
650  // TODO: add support for NULL string arrays
651  if (ti.get_size() > 0) {
652  auto sti = ti.get_elem_type();
653  size_t expected_size = ti.get_size() / sti.get_size();
654  size_t actual_size = string_vec.size();
655  if (actual_size != expected_size) {
656  throw std::runtime_error("Fixed length array column " + cd->columnName +
657  " expects " + std::to_string(expected_size) +
658  " values, received " +
659  std::to_string(actual_size));
660  }
661  }
662  addStringArray(string_vec);
663  } else {
664  if (ti.get_size() > 0) {
665  // TODO: remove once NULL fixlen arrays are allowed
666  throw std::runtime_error("Fixed length array column " + cd->columnName +
667  " currently cannot accept NULL arrays");
668  }
669  // TODO: add support for NULL string arrays, replace with addStringArray(),
670  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
671  addStringArray(string_vec);
672  }
673  } else {
674  if (!is_null) {
675  ArrayDatum d = StringToArray(val, ti, copy_params);
676  if (d.is_null) { // val could be "NULL"
677  addArray(NullArray(ti));
678  } else {
679  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
680  throw std::runtime_error("Fixed length array for column " + cd->columnName +
681  " has incorrect length: " + val);
682  }
683  addArray(d);
684  }
685  } else {
686  addArray(NullArray(ti));
687  }
688  }
689  break;
690  }
691  case kPOINT:
692  case kLINESTRING:
693  case kPOLYGON:
694  case kMULTIPOLYGON:
695  addGeoString(val);
696  break;
697  default:
698  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
699  }
700 }
701 
703  const auto type = column_desc_->columnType.is_decimal()
706  switch (type) {
707  case kBOOLEAN:
708  bool_buffer_->pop_back();
709  break;
710  case kTINYINT:
711  tinyint_buffer_->pop_back();
712  break;
713  case kSMALLINT:
714  smallint_buffer_->pop_back();
715  break;
716  case kINT:
717  int_buffer_->pop_back();
718  break;
719  case kBIGINT:
720  bigint_buffer_->pop_back();
721  break;
722  case kFLOAT:
723  float_buffer_->pop_back();
724  break;
725  case kDOUBLE:
726  double_buffer_->pop_back();
727  break;
728  case kTEXT:
729  case kVARCHAR:
730  case kCHAR:
731  string_buffer_->pop_back();
732  break;
733  case kDATE:
734  case kTIME:
735  case kTIMESTAMP:
736  bigint_buffer_->pop_back();
737  break;
738  case kARRAY:
740  string_array_buffer_->pop_back();
741  } else {
742  array_buffer_->pop_back();
743  }
744  break;
745  case kPOINT:
746  case kLINESTRING:
747  case kPOLYGON:
748  case kMULTIPOLYGON:
749  geo_string_buffer_->pop_back();
750  break;
751  default:
752  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
753  }
754 }
755 
756 struct GeoImportException : std::runtime_error {
757  using std::runtime_error::runtime_error;
758 };
759 
760 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
761 template <typename DATA_TYPE>
763  const ColumnDescriptor* cd,
764  const Array& array,
765  std::vector<DATA_TYPE>& buffer,
766  const ArraySliceRange& slice_range,
767  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
768  auto data =
769  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
770  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
771  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
772  if (bad_rows_tracker && cd->columnType.is_geometry()) {
773  f_add_geo_phy_cols = [&](const int64_t row) {
774  // Populate physical columns (ref. MapDHandler::load_table)
775  std::vector<double> coords, bounds;
776  std::vector<int> ring_sizes, poly_rings;
777  int render_group = 0;
778  SQLTypeInfo ti;
779  // replace any unexpected exception from getGeoColumns or other
780  // on this path with a GeoImportException so that we wont over
781  // push a null to the logical column...
782  try {
783  arrow_throw_if<GeoImportException>(
784  array.IsNull(row) ||
786  ti,
787  coords,
788  bounds,
789  ring_sizes,
790  poly_rings,
791  false),
792  error_context(cd, bad_rows_tracker) + "Invalid geometry");
793  arrow_throw_if<GeoImportException>(
794  cd->columnType.get_type() != ti.get_type(),
795  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
796  auto col_idx_workpad = col_idx; // what a pitfall!!
798  bad_rows_tracker->importer->getCatalog(),
799  cd,
801  col_idx_workpad,
802  coords,
803  bounds,
804  ring_sizes,
805  poly_rings,
806  render_group);
807  } catch (GeoImportException&) {
808  throw;
809  } catch (std::runtime_error& e) {
810  throw GeoImportException(e.what());
811  } catch (const std::exception& e) {
812  throw GeoImportException(e.what());
813  } catch (...) {
814  throw GeoImportException("unknown exception");
815  }
816  };
817  }
818  auto f_mark_a_bad_row = [&](const auto row) {
819  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
820  bad_rows_tracker->rows.insert(row - slice_range.first);
821  };
822  buffer.reserve(slice_range.second - slice_range.first);
823  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
824  try {
825  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
826  f_add_geo_phy_cols(row);
827  } catch (GeoImportException&) {
828  f_mark_a_bad_row(row);
829  } catch (ArrowImporterException&) {
830  // trace bad rows of each column; otherwise rethrow.
831  if (bad_rows_tracker) {
832  *data << nullptr;
833  f_mark_a_bad_row(row);
834  } else {
835  throw;
836  }
837  }
838  }
839  return buffer.size();
840 }
841 
843  const Array& col,
844  const bool exact_type_match,
845  const ArraySliceRange& slice_range,
846  BadRowsTracker* const bad_rows_tracker) {
847  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
848  : cd->columnType.get_type();
849  if (cd->columnType.get_notnull()) {
850  // We can't have any null values for this column; to have them is an error
851  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
852  }
853 
854  switch (type) {
855  case kBOOLEAN:
856  if (exact_type_match) {
857  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
858  }
860  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
861  case kTINYINT:
862  if (exact_type_match) {
863  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
864  }
866  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
867  case kSMALLINT:
868  if (exact_type_match) {
869  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
870  }
872  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
873  case kINT:
874  if (exact_type_match) {
875  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
876  }
878  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
879  case kBIGINT:
880  if (exact_type_match) {
881  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
882  }
884  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
885  case kFLOAT:
886  if (exact_type_match) {
887  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
888  }
890  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
891  case kDOUBLE:
892  if (exact_type_match) {
893  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
894  }
896  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
897  case kTEXT:
898  case kVARCHAR:
899  case kCHAR:
900  if (exact_type_match) {
901  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
902  "Expected string type");
903  }
905  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
906  case kTIME:
907  if (exact_type_match) {
908  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
909  "Expected time32 or time64 type");
910  }
912  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
913  case kTIMESTAMP:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
916  }
918  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
919  case kDATE:
920  if (exact_type_match) {
921  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
922  "Expected date32 or date64 type");
923  }
925  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
926  case kPOINT:
927  case kLINESTRING:
928  case kPOLYGON:
929  case kMULTIPOLYGON:
930  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
931  "Expected string type");
933  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
934  case kARRAY:
935  throw std::runtime_error("Arrow array appends not yet supported");
936  default:
937  throw std::runtime_error("Invalid Type");
938  }
939 }
940 
941 // this is exclusively used by load_table_binary_columnar
942 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
943  size_t dataSize = 0;
944  if (cd->columnType.get_notnull()) {
945  // We can't have any null values for this column; to have them is an error
946  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
947  throw std::runtime_error("NULL for column " + cd->columnName);
948  }
949  }
950 
951  switch (cd->columnType.get_type()) {
952  case kBOOLEAN: {
953  dataSize = col.data.int_col.size();
954  bool_buffer_->reserve(dataSize);
955  for (size_t i = 0; i < dataSize; i++) {
956  if (col.nulls[i]) {
958  } else {
959  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
960  }
961  }
962  break;
963  }
964  case kTINYINT: {
965  dataSize = col.data.int_col.size();
966  tinyint_buffer_->reserve(dataSize);
967  for (size_t i = 0; i < dataSize; i++) {
968  if (col.nulls[i]) {
970  } else {
971  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
972  }
973  }
974  break;
975  }
976  case kSMALLINT: {
977  dataSize = col.data.int_col.size();
978  smallint_buffer_->reserve(dataSize);
979  for (size_t i = 0; i < dataSize; i++) {
980  if (col.nulls[i]) {
982  } else {
983  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
984  }
985  }
986  break;
987  }
988  case kINT: {
989  dataSize = col.data.int_col.size();
990  int_buffer_->reserve(dataSize);
991  for (size_t i = 0; i < dataSize; i++) {
992  if (col.nulls[i]) {
994  } else {
995  int_buffer_->push_back((int32_t)col.data.int_col[i]);
996  }
997  }
998  break;
999  }
1000  case kBIGINT:
1001  case kNUMERIC:
1002  case kDECIMAL: {
1003  dataSize = col.data.int_col.size();
1004  bigint_buffer_->reserve(dataSize);
1005  for (size_t i = 0; i < dataSize; i++) {
1006  if (col.nulls[i]) {
1008  } else {
1009  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1010  }
1011  }
1012  break;
1013  }
1014  case kFLOAT: {
1015  dataSize = col.data.real_col.size();
1016  float_buffer_->reserve(dataSize);
1017  for (size_t i = 0; i < dataSize; i++) {
1018  if (col.nulls[i]) {
1019  float_buffer_->push_back(NULL_FLOAT);
1020  } else {
1021  float_buffer_->push_back((float)col.data.real_col[i]);
1022  }
1023  }
1024  break;
1025  }
1026  case kDOUBLE: {
1027  dataSize = col.data.real_col.size();
1028  double_buffer_->reserve(dataSize);
1029  for (size_t i = 0; i < dataSize; i++) {
1030  if (col.nulls[i]) {
1031  double_buffer_->push_back(NULL_DOUBLE);
1032  } else {
1033  double_buffer_->push_back((double)col.data.real_col[i]);
1034  }
1035  }
1036  break;
1037  }
1038  case kTEXT:
1039  case kVARCHAR:
1040  case kCHAR: {
1041  // TODO: for now, use empty string for nulls
1042  dataSize = col.data.str_col.size();
1043  string_buffer_->reserve(dataSize);
1044  for (size_t i = 0; i < dataSize; i++) {
1045  if (col.nulls[i]) {
1046  string_buffer_->push_back(std::string());
1047  } else {
1048  string_buffer_->push_back(col.data.str_col[i]);
1049  }
1050  }
1051  break;
1052  }
1053  case kTIME:
1054  case kTIMESTAMP:
1055  case kDATE: {
1056  dataSize = col.data.int_col.size();
1057  bigint_buffer_->reserve(dataSize);
1058  for (size_t i = 0; i < dataSize; i++) {
1059  if (col.nulls[i]) {
1061  } else {
1062  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1063  }
1064  }
1065  break;
1066  }
1067  case kPOINT:
1068  case kLINESTRING:
1069  case kPOLYGON:
1070  case kMULTIPOLYGON: {
1071  dataSize = col.data.str_col.size();
1072  geo_string_buffer_->reserve(dataSize);
1073  for (size_t i = 0; i < dataSize; i++) {
1074  if (col.nulls[i]) {
1075  // TODO: add support for NULL geo
1076  geo_string_buffer_->push_back(std::string());
1077  } else {
1078  geo_string_buffer_->push_back(col.data.str_col[i]);
1079  }
1080  }
1081  break;
1082  }
1083  case kARRAY: {
1084  dataSize = col.data.arr_col.size();
1085  if (IS_STRING(cd->columnType.get_subtype())) {
1086  for (size_t i = 0; i < dataSize; i++) {
1087  std::vector<std::string>& string_vec = addStringArray();
1088  if (!col.nulls[i]) {
1089  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1090  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1091  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1092  }
1093  }
1094  }
1095  } else {
1096  auto elem_ti = cd->columnType.get_subtype();
1097  switch (elem_ti) {
1098  case kBOOLEAN: {
1099  for (size_t i = 0; i < dataSize; i++) {
1100  if (col.nulls[i]) {
1102  } else {
1103  size_t len = col.data.arr_col[i].data.int_col.size();
1104  size_t byteSize = len * sizeof(int8_t);
1105  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1106  int8_t* p = buf;
1107  for (size_t j = 0; j < len; ++j) {
1108  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1109  p += sizeof(bool);
1110  }
1111  addArray(ArrayDatum(byteSize, buf, false));
1112  }
1113  }
1114  break;
1115  }
1116  case kTINYINT: {
1117  for (size_t i = 0; i < dataSize; i++) {
1118  if (col.nulls[i]) {
1120  } else {
1121  size_t len = col.data.arr_col[i].data.int_col.size();
1122  size_t byteSize = len * sizeof(int8_t);
1123  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1124  int8_t* p = buf;
1125  for (size_t j = 0; j < len; ++j) {
1126  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1127  p += sizeof(int8_t);
1128  }
1129  addArray(ArrayDatum(byteSize, buf, false));
1130  }
1131  }
1132  break;
1133  }
1134  case kSMALLINT: {
1135  for (size_t i = 0; i < dataSize; i++) {
1136  if (col.nulls[i]) {
1138  } else {
1139  size_t len = col.data.arr_col[i].data.int_col.size();
1140  size_t byteSize = len * sizeof(int16_t);
1141  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1142  int8_t* p = buf;
1143  for (size_t j = 0; j < len; ++j) {
1144  *(int16_t*)p =
1145  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1146  p += sizeof(int16_t);
1147  }
1148  addArray(ArrayDatum(byteSize, buf, false));
1149  }
1150  }
1151  break;
1152  }
1153  case kINT: {
1154  for (size_t i = 0; i < dataSize; i++) {
1155  if (col.nulls[i]) {
1157  } else {
1158  size_t len = col.data.arr_col[i].data.int_col.size();
1159  size_t byteSize = len * sizeof(int32_t);
1160  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1161  int8_t* p = buf;
1162  for (size_t j = 0; j < len; ++j) {
1163  *(int32_t*)p =
1164  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1165  p += sizeof(int32_t);
1166  }
1167  addArray(ArrayDatum(byteSize, buf, false));
1168  }
1169  }
1170  break;
1171  }
1172  case kBIGINT:
1173  case kNUMERIC:
1174  case kDECIMAL: {
1175  for (size_t i = 0; i < dataSize; i++) {
1176  if (col.nulls[i]) {
1178  } else {
1179  size_t len = col.data.arr_col[i].data.int_col.size();
1180  size_t byteSize = len * sizeof(int64_t);
1181  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1182  int8_t* p = buf;
1183  for (size_t j = 0; j < len; ++j) {
1184  *(int64_t*)p =
1185  static_cast<int64_t>(col.data.arr_col[j].data.int_col[j]);
1186  p += sizeof(int64_t);
1187  }
1188  addArray(ArrayDatum(byteSize, buf, false));
1189  }
1190  }
1191  break;
1192  }
1193  case kFLOAT: {
1194  for (size_t i = 0; i < dataSize; i++) {
1195  if (col.nulls[i]) {
1197  } else {
1198  size_t len = col.data.arr_col[i].data.real_col.size();
1199  size_t byteSize = len * sizeof(float);
1200  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1201  int8_t* p = buf;
1202  for (size_t j = 0; j < len; ++j) {
1203  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1204  p += sizeof(float);
1205  }
1206  addArray(ArrayDatum(byteSize, buf, false));
1207  }
1208  }
1209  break;
1210  }
1211  case kDOUBLE: {
1212  for (size_t i = 0; i < dataSize; i++) {
1213  if (col.nulls[i]) {
1215  } else {
1216  size_t len = col.data.arr_col[i].data.real_col.size();
1217  size_t byteSize = len * sizeof(double);
1218  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1219  int8_t* p = buf;
1220  for (size_t j = 0; j < len; ++j) {
1221  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1222  p += sizeof(double);
1223  }
1224  addArray(ArrayDatum(byteSize, buf, false));
1225  }
1226  }
1227  break;
1228  }
1229  case kTIME:
1230  case kTIMESTAMP:
1231  case kDATE: {
1232  for (size_t i = 0; i < dataSize; i++) {
1233  if (col.nulls[i]) {
1235  } else {
1236  size_t len = col.data.arr_col[i].data.int_col.size();
1237  size_t byteWidth = sizeof(int64_t);
1238  size_t byteSize = len * byteWidth;
1239  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1240  int8_t* p = buf;
1241  for (size_t j = 0; j < len; ++j) {
1242  *reinterpret_cast<int64_t*>(p) =
1243  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1244  p += sizeof(int64_t);
1245  }
1246  addArray(ArrayDatum(byteSize, buf, false));
1247  }
1248  }
1249  break;
1250  }
1251  default:
1252  throw std::runtime_error("Invalid Array Type");
1253  }
1254  }
1255  break;
1256  }
1257  default:
1258  throw std::runtime_error("Invalid Type");
1259  }
1260  return dataSize;
1261 }
1262 
1264  const TDatum& datum,
1265  const bool is_null,
1266  const int64_t replicate_count) {
1267  set_replicate_count(replicate_count);
1268  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1269  : cd->columnType.get_type();
1270  switch (type) {
1271  case kBOOLEAN: {
1272  if (is_null) {
1273  if (cd->columnType.get_notnull()) {
1274  throw std::runtime_error("NULL for column " + cd->columnName);
1275  }
1277  } else {
1278  addBoolean((int8_t)datum.val.int_val);
1279  }
1280  break;
1281  }
1282  case kTINYINT:
1283  if (!is_null) {
1284  addTinyint((int8_t)datum.val.int_val);
1285  } else {
1286  if (cd->columnType.get_notnull()) {
1287  throw std::runtime_error("NULL for column " + cd->columnName);
1288  }
1290  }
1291  break;
1292  case kSMALLINT:
1293  if (!is_null) {
1294  addSmallint((int16_t)datum.val.int_val);
1295  } else {
1296  if (cd->columnType.get_notnull()) {
1297  throw std::runtime_error("NULL for column " + cd->columnName);
1298  }
1300  }
1301  break;
1302  case kINT:
1303  if (!is_null) {
1304  addInt((int32_t)datum.val.int_val);
1305  } else {
1306  if (cd->columnType.get_notnull()) {
1307  throw std::runtime_error("NULL for column " + cd->columnName);
1308  }
1310  }
1311  break;
1312  case kBIGINT:
1313  if (!is_null) {
1314  addBigint(datum.val.int_val);
1315  } else {
1316  if (cd->columnType.get_notnull()) {
1317  throw std::runtime_error("NULL for column " + cd->columnName);
1318  }
1320  }
1321  break;
1322  case kFLOAT:
1323  if (!is_null) {
1324  addFloat((float)datum.val.real_val);
1325  } else {
1326  if (cd->columnType.get_notnull()) {
1327  throw std::runtime_error("NULL for column " + cd->columnName);
1328  }
1330  }
1331  break;
1332  case kDOUBLE:
1333  if (!is_null) {
1334  addDouble(datum.val.real_val);
1335  } else {
1336  if (cd->columnType.get_notnull()) {
1337  throw std::runtime_error("NULL for column " + cd->columnName);
1338  }
1340  }
1341  break;
1342  case kTEXT:
1343  case kVARCHAR:
1344  case kCHAR: {
1345  // @TODO(wei) for now, use empty string for nulls
1346  if (is_null) {
1347  if (cd->columnType.get_notnull()) {
1348  throw std::runtime_error("NULL for column " + cd->columnName);
1349  }
1350  addString(std::string());
1351  } else {
1352  addString(datum.val.str_val);
1353  }
1354  break;
1355  }
1356  case kTIME:
1357  case kTIMESTAMP:
1358  case kDATE: {
1359  if (!is_null) {
1360  addBigint(datum.val.int_val);
1361  } else {
1362  if (cd->columnType.get_notnull()) {
1363  throw std::runtime_error("NULL for column " + cd->columnName);
1364  }
1366  }
1367  break;
1368  }
1369  case kARRAY:
1370  if (is_null && cd->columnType.get_notnull()) {
1371  throw std::runtime_error("NULL for column " + cd->columnName);
1372  }
1373  if (IS_STRING(cd->columnType.get_subtype())) {
1374  std::vector<std::string>& string_vec = addStringArray();
1375  addBinaryStringArray(datum, string_vec);
1376  } else {
1377  if (!is_null) {
1378  addArray(TDatumToArrayDatum(datum, cd->columnType));
1379  } else {
1381  }
1382  }
1383  break;
1384  case kPOINT:
1385  case kLINESTRING:
1386  case kPOLYGON:
1387  case kMULTIPOLYGON:
1388  if (is_null) {
1389  if (cd->columnType.get_notnull()) {
1390  throw std::runtime_error("NULL for column " + cd->columnName);
1391  }
1392  addGeoString(std::string());
1393  } else {
1394  addGeoString(datum.val.str_val);
1395  }
1396  break;
1397  default:
1398  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1399  }
1400 }
1401 
1402 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1403  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1404  return false;
1405  }
1406  // we don't need to do any coordinate-system transformation
1407  // here (yet) so we don't need to use any OGR API or types
1408  // just use the values directly (assumed to be in 4326)
1409  coords.push_back(lon);
1410  coords.push_back(lat);
1411  return true;
1412 }
1413 
1414 uint64_t compress_coord(double coord, const SQLTypeInfo& ti, bool x) {
1415  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1418  }
1419  return *reinterpret_cast<uint64_t*>(may_alias_ptr(&coord));
1420 }
1421 
1422 std::vector<uint8_t> compress_coords(std::vector<double>& coords, const SQLTypeInfo& ti) {
1423  std::vector<uint8_t> compressed_coords;
1424  bool x = true;
1425  for (auto coord : coords) {
1426  auto coord_data_ptr = reinterpret_cast<uint64_t*>(&coord);
1427  uint64_t coord_data = *coord_data_ptr;
1428  size_t coord_data_size = sizeof(double);
1429 
1430  if (ti.get_output_srid() == 4326) {
1431  if (x) {
1432  if (coord < -180.0 || coord > 180.0) {
1433  throw std::runtime_error("WGS84 longitude " + std::to_string(coord) +
1434  " is out of bounds");
1435  }
1436  } else {
1437  if (coord < -90.0 || coord > 90.0) {
1438  throw std::runtime_error("WGS84 latitude " + std::to_string(coord) +
1439  " is out of bounds");
1440  }
1441  }
1442  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1443  coord_data = compress_coord(coord, ti, x);
1444  coord_data_size = ti.get_comp_param() / 8;
1445  }
1446  x = !x;
1447  }
1448 
1449  for (size_t i = 0; i < coord_data_size; i++) {
1450  compressed_coords.push_back(coord_data & 0xFF);
1451  coord_data >>= 8;
1452  }
1453  }
1454  return compressed_coords;
1455 }
1456 
1458  const Catalog_Namespace::Catalog& catalog,
1459  const ColumnDescriptor* cd,
1460  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1461  size_t& col_idx,
1462  std::vector<double>& coords,
1463  std::vector<double>& bounds,
1464  std::vector<int>& ring_sizes,
1465  std::vector<int>& poly_rings,
1466  int render_group,
1467  const int64_t replicate_count) {
1468  const auto col_ti = cd->columnType;
1469  const auto col_type = col_ti.get_type();
1470  auto columnId = cd->columnId;
1471  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1472  std::vector<TDatum> td_coords_data;
1473  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1474  for (auto cc : compressed_coords) {
1475  TDatum td_byte;
1476  td_byte.val.int_val = cc;
1477  td_coords_data.push_back(td_byte);
1478  }
1479  TDatum tdd_coords;
1480  tdd_coords.val.arr_val = td_coords_data;
1481  tdd_coords.is_null = false;
1482  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1483 
1484  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1485  // Create ring_sizes array value and add it to the physical column
1486  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1487  std::vector<TDatum> td_ring_sizes;
1488  for (auto ring_size : ring_sizes) {
1489  TDatum td_ring_size;
1490  td_ring_size.val.int_val = ring_size;
1491  td_ring_sizes.push_back(td_ring_size);
1492  }
1493  TDatum tdd_ring_sizes;
1494  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1495  tdd_ring_sizes.is_null = false;
1496  import_buffers[col_idx++]->add_value(
1497  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1498  }
1499 
1500  if (col_type == kMULTIPOLYGON) {
1501  // Create poly_rings array value and add it to the physical column
1502  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1503  std::vector<TDatum> td_poly_rings;
1504  for (auto num_rings : poly_rings) {
1505  TDatum td_num_rings;
1506  td_num_rings.val.int_val = num_rings;
1507  td_poly_rings.push_back(td_num_rings);
1508  }
1509  TDatum tdd_poly_rings;
1510  tdd_poly_rings.val.arr_val = td_poly_rings;
1511  tdd_poly_rings.is_null = false;
1512  import_buffers[col_idx++]->add_value(
1513  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1514  }
1515 
1516  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1517  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1518  std::vector<TDatum> td_bounds_data;
1519  for (auto b : bounds) {
1520  TDatum td_double;
1521  td_double.val.real_val = b;
1522  td_bounds_data.push_back(td_double);
1523  }
1524  TDatum tdd_bounds;
1525  tdd_bounds.val.arr_val = td_bounds_data;
1526  tdd_bounds.is_null = false;
1527  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1528  }
1529 
1530  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1531  // Create render_group value and add it to the physical column
1532  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1533  TDatum td_render_group;
1534  td_render_group.val.int_val = render_group;
1535  td_render_group.is_null = false;
1536  import_buffers[col_idx++]->add_value(
1537  cd_render_group, td_render_group, false, replicate_count);
1538  }
1539 }
1540 
1542  const Catalog_Namespace::Catalog& catalog,
1543  const ColumnDescriptor* cd,
1544  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1545  size_t& col_idx,
1546  std::vector<std::vector<double>>& coords_column,
1547  std::vector<std::vector<double>>& bounds_column,
1548  std::vector<std::vector<int>>& ring_sizes_column,
1549  std::vector<std::vector<int>>& poly_rings_column,
1550  int render_group,
1551  const int64_t replicate_count) {
1552  const auto col_ti = cd->columnType;
1553  const auto col_type = col_ti.get_type();
1554  auto columnId = cd->columnId;
1555 
1556  auto coords_row_count = coords_column.size();
1557  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1558  for (auto coords : coords_column) {
1559  std::vector<TDatum> td_coords_data;
1560  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1561  for (auto cc : compressed_coords) {
1562  TDatum td_byte;
1563  td_byte.val.int_val = cc;
1564  td_coords_data.push_back(td_byte);
1565  }
1566  TDatum tdd_coords;
1567  tdd_coords.val.arr_val = td_coords_data;
1568  tdd_coords.is_null = false;
1569  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1570  }
1571  col_idx++;
1572 
1573  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1574  if (ring_sizes_column.size() != coords_row_count) {
1575  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1576  }
1577  // Create ring_sizes array value and add it to the physical column
1578  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1579  for (auto ring_sizes : ring_sizes_column) {
1580  std::vector<TDatum> td_ring_sizes;
1581  for (auto ring_size : ring_sizes) {
1582  TDatum td_ring_size;
1583  td_ring_size.val.int_val = ring_size;
1584  td_ring_sizes.push_back(td_ring_size);
1585  }
1586  TDatum tdd_ring_sizes;
1587  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1588  tdd_ring_sizes.is_null = false;
1589  import_buffers[col_idx]->add_value(
1590  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1591  }
1592  col_idx++;
1593  }
1594 
1595  if (col_type == kMULTIPOLYGON) {
1596  if (poly_rings_column.size() != coords_row_count) {
1597  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1598  }
1599  // Create poly_rings array value and add it to the physical column
1600  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1601  for (auto poly_rings : poly_rings_column) {
1602  std::vector<TDatum> td_poly_rings;
1603  for (auto num_rings : poly_rings) {
1604  TDatum td_num_rings;
1605  td_num_rings.val.int_val = num_rings;
1606  td_poly_rings.push_back(td_num_rings);
1607  }
1608  TDatum tdd_poly_rings;
1609  tdd_poly_rings.val.arr_val = td_poly_rings;
1610  tdd_poly_rings.is_null = false;
1611  import_buffers[col_idx]->add_value(
1612  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1613  }
1614  col_idx++;
1615  }
1616 
1617  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1618  if (bounds_column.size() != coords_row_count) {
1619  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1620  }
1621  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1622  for (auto bounds : bounds_column) {
1623  std::vector<TDatum> td_bounds_data;
1624  for (auto b : bounds) {
1625  TDatum td_double;
1626  td_double.val.real_val = b;
1627  td_bounds_data.push_back(td_double);
1628  }
1629  TDatum tdd_bounds;
1630  tdd_bounds.val.arr_val = td_bounds_data;
1631  tdd_bounds.is_null = false;
1632  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1633  }
1634  col_idx++;
1635  }
1636 
1637  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1638  // Create render_group value and add it to the physical column
1639  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1640  TDatum td_render_group;
1641  td_render_group.val.int_val = render_group;
1642  td_render_group.is_null = false;
1643  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1644  import_buffers[col_idx]->add_value(
1645  cd_render_group, td_render_group, false, replicate_count);
1646  }
1647  col_idx++;
1648  }
1649 }
1650 
1651 namespace {
1652 
1653 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1654  const std::list<const ColumnDescriptor*>& col_descs) {
1655  // validate the columns
1656  // for now we can only explode into a single destination column
1657  // which must be of the child type (POLYGON, LINESTRING, POINT)
1658  int collection_col_idx = -1;
1659  int col_idx = 0;
1660  std::string collection_col_name;
1661  SQLTypes collection_child_type = kNULLT;
1662  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1663  auto const& cd = *cd_it;
1664  auto const col_type = cd->columnType.get_type();
1665  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1666  if (collection_col_idx >= 0) {
1667  throw std::runtime_error(
1668  "Explode Collections: Found more than one destination column");
1669  }
1670  collection_col_idx = col_idx;
1671  collection_child_type = col_type;
1672  collection_col_name = cd->columnName;
1673  }
1674  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1675  ++cd_it;
1676  }
1677  col_idx++;
1678  }
1679  if (collection_col_idx < 0) {
1680  throw std::runtime_error(
1681  "Explode Collections: Failed to find a supported column type to explode "
1682  "into");
1683  }
1684  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1685 }
1686 
1688  OGRGeometry* ogr_geometry,
1689  SQLTypes collection_child_type,
1690  const std::string& collection_col_name,
1691  size_t row_or_feature_idx,
1692  std::function<void(OGRGeometry*)> execute_import_lambda) {
1693  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1694  bool is_collection = false;
1695  switch (collection_child_type) {
1696  case kPOINT:
1697  switch (ogr_geometry_type) {
1698  case wkbMultiPoint:
1699  is_collection = true;
1700  break;
1701  case wkbPoint:
1702  break;
1703  default:
1704  throw std::runtime_error(
1705  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1706  }
1707  break;
1708  case kLINESTRING:
1709  switch (ogr_geometry_type) {
1710  case wkbMultiLineString:
1711  is_collection = true;
1712  break;
1713  case wkbLineString:
1714  break;
1715  default:
1716  throw std::runtime_error(
1717  "Explode Collections: Source geo type must be MULTILINESTRING or "
1718  "LINESTRING");
1719  }
1720  break;
1721  case kPOLYGON:
1722  switch (ogr_geometry_type) {
1723  case wkbMultiPolygon:
1724  is_collection = true;
1725  break;
1726  case wkbPolygon:
1727  break;
1728  default:
1729  throw std::runtime_error(
1730  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1731  }
1732  break;
1733  default:
1734  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1735  }
1736 
1737  int64_t us = 0LL;
1738 
1739  // explode or just import
1740  if (is_collection) {
1741  // cast to collection
1742  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1743  CHECK(collection_geometry);
1744 
1745 #if LOG_EXPLODE_COLLECTIONS
1746  // log number of children
1747  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1748  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1749  << " child rows";
1750 #endif
1751 
1752  // loop over children
1753  uint32_t child_geometry_count = 0;
1754  auto child_geometry_it = collection_geometry->begin();
1755  while (child_geometry_it != collection_geometry->end()) {
1756  // get and import this child
1757  OGRGeometry* import_geometry = *child_geometry_it;
1759  [&] { execute_import_lambda(import_geometry); });
1760 
1761  // next child
1762  child_geometry_it++;
1763  child_geometry_count++;
1764  }
1765  } else {
1766  // import non-collection row just once
1768  [&] { execute_import_lambda(ogr_geometry); });
1769  }
1770 
1771  // done
1772  return us;
1773 }
1774 
1775 } // namespace
1776 
1778  int thread_id,
1779  Importer* importer,
1780  std::unique_ptr<char[]> scratch_buffer,
1781  size_t begin_pos,
1782  size_t end_pos,
1783  size_t total_size,
1784  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1785  size_t first_row_index_this_buffer) {
1786  ImportStatus import_status;
1787  int64_t total_get_row_time_us = 0;
1788  int64_t total_str_to_val_time_us = 0;
1789  CHECK(scratch_buffer);
1790  auto buffer = scratch_buffer.get();
1791  auto load_ms = measure<>::execution([]() {});
1792  auto ms = measure<>::execution([&]() {
1793  const CopyParams& copy_params = importer->get_copy_params();
1794  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1795  size_t begin =
1796  DelimitedParserUtils::find_beginning(buffer, begin_pos, end_pos, copy_params);
1797  const char* thread_buf = buffer + begin_pos + begin;
1798  const char* thread_buf_end = buffer + end_pos;
1799  const char* buf_end = buffer + total_size;
1800  bool try_single_thread = false;
1801  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1802  importer->get_import_buffers(thread_id);
1804  int phys_cols = 0;
1805  int point_cols = 0;
1806  for (const auto cd : col_descs) {
1807  const auto& col_ti = cd->columnType;
1808  phys_cols += col_ti.get_physical_cols();
1809  if (cd->columnType.get_type() == kPOINT) {
1810  point_cols++;
1811  }
1812  }
1813  auto num_cols = col_descs.size() - phys_cols;
1814  for (const auto& p : import_buffers) {
1815  p->clear();
1816  }
1817  std::vector<std::string> row;
1818  size_t row_index_plus_one = 0;
1819  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1820  row.clear();
1821  if (DEBUG_TIMING) {
1824  thread_buf_end,
1825  buf_end,
1826  copy_params,
1827  importer->get_is_array(),
1828  row,
1829  try_single_thread);
1830  });
1831  total_get_row_time_us += us;
1832  } else {
1834  thread_buf_end,
1835  buf_end,
1836  copy_params,
1837  importer->get_is_array(),
1838  row,
1839  try_single_thread);
1840  }
1841  row_index_plus_one++;
1842  // Each POINT could consume two separate coords instead of a single WKT
1843  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1844  import_status.rows_rejected++;
1845  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1846  << row.size() << "): " << row;
1847  if (import_status.rows_rejected > copy_params.max_reject) {
1848  break;
1849  }
1850  continue;
1851  }
1852 
1853  //
1854  // lambda for importing a row (perhaps multiple times if exploding a collection)
1855  //
1856 
1857  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1858  size_t import_idx = 0;
1859  size_t col_idx = 0;
1860  try {
1861  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1862  auto cd = *cd_it;
1863  const auto& col_ti = cd->columnType;
1864  if (col_ti.get_physical_cols() == 0) {
1865  // not geo
1866 
1867  // store the string (possibly null)
1868  bool is_null =
1869  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1870  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1871  // So initially nullness may be missed and not passed to add_value,
1872  // which then might also check and still decide it's actually a NULL,
1873  // e.g. if kINT doesn't start with a digit or a '-' then it's considered
1874  // NULL. So "NULL" is not recognized as NULL but then it's not
1875  // recognized as a valid kINT, so it's a NULL after all. Checking for
1876  // "NULL" here too, as a widely accepted notation for NULL.
1877  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1878  is_null = true;
1879  }
1880  import_buffers[col_idx]->add_value(
1881  cd, row[import_idx], is_null, copy_params);
1882 
1883  // next
1884  ++import_idx;
1885  ++col_idx;
1886  } else {
1887  // geo
1888 
1889  // store null string in the base column
1890  import_buffers[col_idx]->add_value(
1891  cd, copy_params.null_str, true, copy_params);
1892 
1893  // WKT from string we're not storing
1894  auto const& wkt = row[import_idx];
1895 
1896  // next
1897  ++import_idx;
1898  ++col_idx;
1899 
1900  SQLTypes col_type = col_ti.get_type();
1901  CHECK(IS_GEO(col_type));
1902 
1903  std::vector<double> coords;
1904  std::vector<double> bounds;
1905  std::vector<int> ring_sizes;
1906  std::vector<int> poly_rings;
1907  int render_group = 0;
1908 
1909  if (col_type == kPOINT && wkt.size() > 0 &&
1910  (wkt[0] == '.' || isdigit(wkt[0]) || wkt[0] == '-')) {
1911  // Invalid WKT, looks more like a scalar.
1912  // Try custom POINT import: from two separate scalars rather than WKT
1913  // string
1914  double lon = std::atof(wkt.c_str());
1915  double lat = NAN;
1916  std::string lat_str{row[import_idx]};
1917  ++import_idx;
1918  if (lat_str.size() > 0 &&
1919  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1920  lat = std::atof(lat_str.c_str());
1921  }
1922  // Swap coordinates if this table uses a reverse order: lat/lon
1923  if (!copy_params.lonlat) {
1924  std::swap(lat, lon);
1925  }
1926  // TODO: should check if POINT column should have been declared with
1927  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1928  // throw std::runtime_error("POINT column " + cd->columnName + " is
1929  // not WGS84, cannot insert lon/lat");
1930  // }
1931  if (!importGeoFromLonLat(lon, lat, coords)) {
1932  throw std::runtime_error(
1933  "Cannot read lon/lat to insert into POINT column " +
1934  cd->columnName);
1935  }
1936  } else {
1937  // import it
1938  SQLTypeInfo import_ti;
1939  if (import_geometry) {
1940  // geometry already exploded
1942  import_geometry,
1943  import_ti,
1944  coords,
1945  bounds,
1946  ring_sizes,
1947  poly_rings,
1949  std::string msg =
1950  "Failed to extract valid geometry from exploded row " +
1951  std::to_string(first_row_index_this_buffer + row_index_plus_one) +
1952  " for column " + cd->columnName;
1953  throw std::runtime_error(msg);
1954  }
1955  } else {
1956  // extract geometry directly from WKT
1958  wkt,
1959  import_ti,
1960  coords,
1961  bounds,
1962  ring_sizes,
1963  poly_rings,
1965  std::string msg =
1966  "Failed to extract valid geometry from row " +
1967  std::to_string(first_row_index_this_buffer + row_index_plus_one) +
1968  " for column " + cd->columnName;
1969  throw std::runtime_error(msg);
1970  }
1971  }
1972 
1973  // validate types
1974  if (col_type != import_ti.get_type()) {
1976  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
1977  col_type == SQLTypes::kMULTIPOLYGON)) {
1978  throw std::runtime_error(
1979  "Imported geometry doesn't match the type of column " +
1980  cd->columnName);
1981  }
1982  }
1983 
1984  // assign render group?
1985  if (columnIdToRenderGroupAnalyzerMap.size()) {
1986  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1987  if (ring_sizes.size()) {
1988  // get a suitable render group for these poly coords
1989  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
1990  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
1991  render_group =
1992  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
1993  } else {
1994  // empty poly
1995  render_group = -1;
1996  }
1997  }
1998  }
1999  }
2000 
2001  // import extracted geo
2003  cd,
2004  import_buffers,
2005  col_idx,
2006  coords,
2007  bounds,
2008  ring_sizes,
2009  poly_rings,
2010  render_group);
2011 
2012  // skip remaining physical columns
2013  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2014  ++cd_it;
2015  }
2016  }
2017  }
2018  import_status.rows_completed++;
2019  } catch (const std::exception& e) {
2020  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2021  import_buffers[col_idx_to_pop]->pop_value();
2022  }
2023  import_status.rows_rejected++;
2024  LOG(ERROR) << "Input exception thrown: " << e.what()
2025  << ". Row discarded. Data: " << row;
2026  }
2027  };
2028 
2029  if (copy_params.geo_explode_collections) {
2030  // explode and import
2031  // @TODO(se) convert to structure-bindings when we can use C++17 here
2032  auto collection_idx_type_name = explode_collections_step1(col_descs);
2033  int collection_col_idx = std::get<0>(collection_idx_type_name);
2034  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2035  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2036  // pull out the collection WKT
2037  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2038  auto const& collection_wkt = row[collection_col_idx];
2039  // convert to OGR
2040  OGRGeometry* ogr_geometry = nullptr;
2041  ScopeGuard destroy_ogr_geometry = [&] {
2042  if (ogr_geometry) {
2043  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2044  }
2045  };
2046  OGRErr ogr_status = OGRGeometryFactory::createFromWkt(
2047  collection_wkt.c_str(), nullptr, &ogr_geometry);
2048  if (ogr_status != OGRERR_NONE) {
2049  throw std::runtime_error("Failed to convert WKT to geometry");
2050  }
2051  // do the explode and import
2052  us = explode_collections_step2(ogr_geometry,
2053  collection_child_type,
2054  collection_col_name,
2055  first_row_index_this_buffer + row_index_plus_one,
2056  execute_import_row);
2057  } else {
2058  // import non-collection row just once
2060  [&] { execute_import_row(nullptr); });
2061  }
2062  total_str_to_val_time_us += us;
2063  } // end thread
2064  if (import_status.rows_completed > 0) {
2065  load_ms = measure<>::execution(
2066  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2067  }
2068  });
2069  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2070  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2071  << import_status.rows_completed << " rows inserted in "
2072  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2073  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2074  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2075  << "sec" << std::endl;
2076  }
2077 
2078  import_status.thread_id = thread_id;
2079  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2080 
2081  return import_status;
2082 }
2083 
2085  int thread_id,
2086  Importer* importer,
2087  OGRSpatialReference* poGeographicSR,
2088  const FeaturePtrVector& features,
2089  size_t firstFeature,
2090  size_t numFeatures,
2091  const FieldNameToIndexMapType& fieldNameToIndexMap,
2092  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2093  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2094  ImportStatus import_status;
2095  const CopyParams& copy_params = importer->get_copy_params();
2096  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2097  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2098  importer->get_import_buffers(thread_id);
2099 
2100  for (const auto& p : import_buffers) {
2101  p->clear();
2102  }
2103 
2104  auto convert_timer = timer_start();
2105 
2106  // we create this on the fly based on the first feature's SR
2107  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2108 
2109  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2110  if (!features[iFeature]) {
2111  continue;
2112  }
2113 
2114  // get this feature's geometry
2115  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2116  if (pGeometry) {
2117  // for geodatabase, we need to consider features with no geometry
2118  // as we still want to create a table, even if it has no geo column
2119 
2120  // transform it
2121  // avoid GDAL error if not transformable
2122  auto geometry_sr = pGeometry->getSpatialReference();
2123  if (geometry_sr) {
2124  // create an OGRCoordinateTransformation (CT) on the fly
2125  // we must assume that all geo in this file will have
2126  // the same source SR, so the CT will be valid for all
2127  // transforming to a reusable CT is faster than to an SR
2128  if (coordinate_transformation == nullptr) {
2129  coordinate_transformation.reset(
2130  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2131  if (coordinate_transformation == nullptr) {
2132  throw std::runtime_error(
2133  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2134  }
2135  }
2136  pGeometry->transform(coordinate_transformation.get());
2137  }
2138  }
2139 
2140  //
2141  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2142  //
2143 
2144  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2145  size_t col_idx = 0;
2146  try {
2147  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2148  auto cd = *cd_it;
2149 
2150  // is this a geo column?
2151  const auto& col_ti = cd->columnType;
2152  if (col_ti.is_geometry()) {
2153  // some Shapefiles get us here, but the OGRGeometryRef is null
2154  if (!import_geometry) {
2155  std::string msg = "Geometry feature " +
2156  std::to_string(firstFeature + iFeature + 1) +
2157  " has null GeometryRef";
2158  throw std::runtime_error(msg);
2159  }
2160 
2161  // Note that this assumes there is one and only one geo column in the table.
2162  // Currently, the importer only supports reading a single geospatial feature
2163  // from an input shapefile / geojson file, but this code will need to be
2164  // modified if that changes
2165  SQLTypes col_type = col_ti.get_type();
2166 
2167  // store null string in the base column
2168  import_buffers[col_idx]->add_value(
2169  cd, copy_params.null_str, true, copy_params);
2170  ++col_idx;
2171 
2172  // the data we now need to extract for the other columns
2173  std::vector<double> coords;
2174  std::vector<double> bounds;
2175  std::vector<int> ring_sizes;
2176  std::vector<int> poly_rings;
2177  int render_group = 0;
2178 
2179  // extract it
2180  SQLTypeInfo import_ti;
2181 
2183  import_geometry,
2184  import_ti,
2185  coords,
2186  bounds,
2187  ring_sizes,
2188  poly_rings,
2190  std::string msg = "Failed to extract valid geometry from feature " +
2191  std::to_string(firstFeature + iFeature + 1) +
2192  " for column " + cd->columnName;
2193  throw std::runtime_error(msg);
2194  }
2195 
2196  // validate types
2197  if (col_type != import_ti.get_type()) {
2199  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2200  col_type == SQLTypes::kMULTIPOLYGON)) {
2201  throw std::runtime_error(
2202  "Imported geometry doesn't match the type of column " +
2203  cd->columnName);
2204  }
2205  }
2206 
2207  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2208  if (ring_sizes.size()) {
2209  // get a suitable render group for these poly coords
2210  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2211  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2212  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2213  } else {
2214  // empty poly
2215  render_group = -1;
2216  }
2217  }
2218 
2219  // create coords array value and add it to the physical column
2220  ++cd_it;
2221  auto cd_coords = *cd_it;
2222  std::vector<TDatum> td_coord_data;
2223  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
2224  for (auto cc : compressed_coords) {
2225  TDatum td_byte;
2226  td_byte.val.int_val = cc;
2227  td_coord_data.push_back(td_byte);
2228  }
2229  TDatum tdd_coords;
2230  tdd_coords.val.arr_val = td_coord_data;
2231  tdd_coords.is_null = false;
2232  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2233  ++col_idx;
2234 
2235  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2236  // Create ring_sizes array value and add it to the physical column
2237  ++cd_it;
2238  auto cd_ring_sizes = *cd_it;
2239  std::vector<TDatum> td_ring_sizes;
2240  for (auto ring_size : ring_sizes) {
2241  TDatum td_ring_size;
2242  td_ring_size.val.int_val = ring_size;
2243  td_ring_sizes.push_back(td_ring_size);
2244  }
2245  TDatum tdd_ring_sizes;
2246  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2247  tdd_ring_sizes.is_null = false;
2248  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2249  ++col_idx;
2250  }
2251 
2252  if (col_type == kMULTIPOLYGON) {
2253  // Create poly_rings array value and add it to the physical column
2254  ++cd_it;
2255  auto cd_poly_rings = *cd_it;
2256  std::vector<TDatum> td_poly_rings;
2257  for (auto num_rings : poly_rings) {
2258  TDatum td_num_rings;
2259  td_num_rings.val.int_val = num_rings;
2260  td_poly_rings.push_back(td_num_rings);
2261  }
2262  TDatum tdd_poly_rings;
2263  tdd_poly_rings.val.arr_val = td_poly_rings;
2264  tdd_poly_rings.is_null = false;
2265  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2266  ++col_idx;
2267  }
2268 
2269  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2270  col_type == kMULTIPOLYGON) {
2271  // Create bounds array value and add it to the physical column
2272  ++cd_it;
2273  auto cd_bounds = *cd_it;
2274  std::vector<TDatum> td_bounds_data;
2275  for (auto b : bounds) {
2276  TDatum td_double;
2277  td_double.val.real_val = b;
2278  td_bounds_data.push_back(td_double);
2279  }
2280  TDatum tdd_bounds;
2281  tdd_bounds.val.arr_val = td_bounds_data;
2282  tdd_bounds.is_null = false;
2283  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2284  ++col_idx;
2285  }
2286 
2287  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2288  // Create render_group value and add it to the physical column
2289  ++cd_it;
2290  auto cd_render_group = *cd_it;
2291  TDatum td_render_group;
2292  td_render_group.val.int_val = render_group;
2293  td_render_group.is_null = false;
2294  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2295  ++col_idx;
2296  }
2297  } else {
2298  // regular column
2299  // pull from GDAL metadata
2300  const auto cit = columnNameToSourceNameMap.find(cd->columnName);
2301  CHECK(cit != columnNameToSourceNameMap.end());
2302  const std::string& fieldName = cit->second;
2303  const auto fit = fieldNameToIndexMap.find(fieldName);
2304  CHECK(fit != fieldNameToIndexMap.end());
2305  size_t iField = fit->second;
2306  CHECK(iField < fieldNameToIndexMap.size());
2307  std::string fieldContents = features[iFeature]->GetFieldAsString(iField);
2308  import_buffers[col_idx]->add_value(cd, fieldContents, false, copy_params);
2309  ++col_idx;
2310  }
2311  }
2312  import_status.rows_completed++;
2313  } catch (const std::exception& e) {
2314  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2315  import_buffers[col_idx_to_pop]->pop_value();
2316  }
2317  import_status.rows_rejected++;
2318  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2319  }
2320  };
2321 
2322  if (pGeometry && copy_params.geo_explode_collections) {
2323  // explode and import
2324  // @TODO(se) convert to structure-bindings when we can use C++17 here
2325  auto collection_idx_type_name = explode_collections_step1(col_descs);
2326  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2327  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2328  explode_collections_step2(pGeometry,
2329  collection_child_type,
2330  collection_col_name,
2331  firstFeature + iFeature + 1,
2332  execute_import_feature);
2333  } else {
2334  // import non-collection or null feature just once
2335  execute_import_feature(pGeometry);
2336  }
2337  } // end features
2338 
2339  float convert_ms =
2340  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2341  convert_timer)) /
2342  1000.0f;
2343 
2344  float load_ms = 0.0f;
2345  if (import_status.rows_completed > 0) {
2346  auto load_timer = timer_start();
2347  importer->load(import_buffers, import_status.rows_completed);
2348  load_ms =
2349  float(
2350  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2351  load_timer)) /
2352  1000.0f;
2353  }
2354 
2355  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2356  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2357  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2358  }
2359 
2360  import_status.thread_id = thread_id;
2361 
2362  if (DEBUG_TIMING) {
2363  LOG(INFO) << "DEBUG: Total "
2364  << float(timer_stop<std::chrono::steady_clock::time_point,
2365  std::chrono::microseconds>(convert_timer)) /
2366  1000.0f
2367  << "ms";
2368  }
2369 
2370  return import_status;
2371 }
2372 
2374  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2375  size_t row_count) {
2376  return loadImpl(import_buffers, row_count, false);
2377 }
2378 
2379 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2380  size_t row_count) {
2381  return loadImpl(import_buffers, row_count, true);
2382 }
2383 
2384 namespace {
2385 
2386 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2387  const auto& ti = import_buffer.getTypeInfo();
2388  const int8_t* values_buffer{nullptr};
2389  if (ti.is_string()) {
2390  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2391  values_buffer = import_buffer.getStringDictBuffer();
2392  } else {
2393  values_buffer = import_buffer.getAsBytes();
2394  }
2395  CHECK(values_buffer);
2396  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2397  switch (logical_size) {
2398  case 1: {
2399  return values_buffer[index];
2400  }
2401  case 2: {
2402  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2403  }
2404  case 4: {
2405  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2406  }
2407  case 8: {
2408  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2409  }
2410  default:
2411  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2412  }
2413  UNREACHABLE();
2414  return 0;
2415 }
2416 
2417 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2418  const auto& ti = import_buffer.getTypeInfo();
2419  CHECK_EQ(kFLOAT, ti.get_type());
2420  const auto values_buffer = import_buffer.getAsBytes();
2421  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2422 }
2423 
2424 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2425  const auto& ti = import_buffer.getTypeInfo();
2426  CHECK_EQ(kDOUBLE, ti.get_type());
2427  const auto values_buffer = import_buffer.getAsBytes();
2428  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2429 }
2430 
2431 } // namespace
2432 
2433 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2434  std::vector<size_t>& all_shard_row_counts,
2435  const OneShardBuffers& import_buffers,
2436  const size_t row_count,
2437  const size_t shard_count) {
2438  all_shard_row_counts.resize(shard_count);
2439  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2440  all_shard_import_buffers.emplace_back();
2441  for (const auto& typed_import_buffer : import_buffers) {
2442  all_shard_import_buffers.back().emplace_back(
2443  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2444  typed_import_buffer->getStringDictionary()));
2445  }
2446  }
2448  int col_idx{0};
2449  const ColumnDescriptor* shard_col_desc{nullptr};
2450  for (const auto col_desc : column_descs_) {
2451  ++col_idx;
2452  if (col_idx == table_desc_->shardedColumnId) {
2453  shard_col_desc = col_desc;
2454  break;
2455  }
2456  }
2457  CHECK(shard_col_desc);
2458  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2459  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2460  const auto& shard_col_ti = shard_col_desc->columnType;
2461  CHECK(shard_col_ti.is_integer() ||
2462  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2463  shard_col_ti.is_time());
2464  if (shard_col_ti.is_string()) {
2465  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2466  CHECK(payloads_ptr);
2467  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2468  }
2469 
2470  // for each replicated (alter added) columns, number of rows in a shard is
2471  // inferred from that of the sharding column, not simply evenly distributed.
2472  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2473  // Here the loop count is overloaded. For normal imports, we loop thru all
2474  // input values (rows), so the loop count is the number of input rows.
2475  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2476  // all shards, so the loop count is the number of shards.
2477  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2478  for (size_t i = 0; i < loop_count; ++i) {
2479  const size_t shard =
2480  getReplicating()
2481  ? i
2482  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2483  auto& shard_output_buffers = all_shard_import_buffers[shard];
2484 
2485  // when replicate a column, populate 'rows' to all shards only once
2486  // and its value is fetch from the first and the single row
2487  const auto row_index = getReplicating() ? 0 : i;
2488 
2489  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2490  const auto& input_buffer = import_buffers[col_idx];
2491  const auto& col_ti = input_buffer->getTypeInfo();
2492  const auto type =
2493  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2494 
2495  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2496  // count. and, bypass non-replicated column.
2497  if (getReplicating()) {
2498  if (input_buffer->get_replicate_count() > 0) {
2499  shard_output_buffers[col_idx]->set_replicate_count(
2500  shard_tds[shard]->fragmenter->getNumRows());
2501  } else {
2502  continue;
2503  }
2504  }
2505 
2506  switch (type) {
2507  case kBOOLEAN:
2508  shard_output_buffers[col_idx]->addBoolean(
2509  int_value_at(*input_buffer, row_index));
2510  break;
2511  case kTINYINT:
2512  shard_output_buffers[col_idx]->addTinyint(
2513  int_value_at(*input_buffer, row_index));
2514  break;
2515  case kSMALLINT:
2516  shard_output_buffers[col_idx]->addSmallint(
2517  int_value_at(*input_buffer, row_index));
2518  break;
2519  case kINT:
2520  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2521  break;
2522  case kBIGINT:
2523  shard_output_buffers[col_idx]->addBigint(
2524  int_value_at(*input_buffer, row_index));
2525  break;
2526  case kFLOAT:
2527  shard_output_buffers[col_idx]->addFloat(
2528  float_value_at(*input_buffer, row_index));
2529  break;
2530  case kDOUBLE:
2531  shard_output_buffers[col_idx]->addDouble(
2532  double_value_at(*input_buffer, row_index));
2533  break;
2534  case kTEXT:
2535  case kVARCHAR:
2536  case kCHAR: {
2537  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2538  shard_output_buffers[col_idx]->addString(
2539  (*input_buffer->getStringBuffer())[row_index]);
2540  break;
2541  }
2542  case kTIME:
2543  case kTIMESTAMP:
2544  case kDATE:
2545  shard_output_buffers[col_idx]->addBigint(
2546  int_value_at(*input_buffer, row_index));
2547  break;
2548  case kARRAY:
2549  if (IS_STRING(col_ti.get_subtype())) {
2550  CHECK(input_buffer->getStringArrayBuffer());
2551  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2552  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2553  shard_output_buffers[col_idx]->addStringArray(input_arr);
2554  } else {
2555  shard_output_buffers[col_idx]->addArray(
2556  (*input_buffer->getArrayBuffer())[row_index]);
2557  }
2558  break;
2559  case kPOINT:
2560  case kLINESTRING:
2561  case kPOLYGON:
2562  case kMULTIPOLYGON: {
2563  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2564  shard_output_buffers[col_idx]->addGeoString(
2565  (*input_buffer->getGeoStringBuffer())[row_index]);
2566  break;
2567  }
2568  default:
2569  CHECK(false);
2570  }
2571  }
2572  ++all_shard_row_counts[shard];
2573  // when replicating a column, row count of a shard == replicate count of the column on
2574  // the shard
2575  if (getReplicating()) {
2576  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2577  }
2578  }
2579 }
2580 
2582  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2583  size_t row_count,
2584  bool checkpoint) {
2585  if (table_desc_->nShards) {
2586  std::vector<OneShardBuffers> all_shard_import_buffers;
2587  std::vector<size_t> all_shard_row_counts;
2588  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2589  distributeToShards(all_shard_import_buffers,
2590  all_shard_row_counts,
2591  import_buffers,
2592  row_count,
2593  shard_tables.size());
2594  bool success = true;
2595  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2596  if (!all_shard_row_counts[shard_idx]) {
2597  continue;
2598  }
2599  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2600  all_shard_row_counts[shard_idx],
2601  shard_tables[shard_idx],
2602  checkpoint);
2603  }
2604  return success;
2605  }
2606  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2607 }
2608 
2609 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2610  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2611  std::vector<DataBlockPtr> result(import_buffers.size());
2612  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2613  encoded_data_block_ptrs_futures;
2614  // make all async calls to string dictionary here and then continue execution
2615  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2616  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2617  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2618  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2619  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2620 
2621  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2622  buf_idx,
2623  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2624  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2625  return import_buffers[buf_idx]->getStringDictBuffer();
2626  })));
2627  }
2628  }
2629 
2630  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2631  DataBlockPtr p;
2632  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2633  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2634  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2635  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2636  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2637  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2638  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2639  p.stringsPtr = string_payload_ptr;
2640  } else {
2641  // This condition means we have column which is ENCODED string. We already made
2642  // Async request to gain the encoded integer values above so we should skip this
2643  // iteration and continue.
2644  continue;
2645  }
2646  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2647  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2648  p.stringsPtr = geo_payload_ptr;
2649  } else {
2650  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2651  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2652  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2653  import_buffers[buf_idx]->addDictEncodedStringArray(
2654  *import_buffers[buf_idx]->getStringArrayBuffer());
2655  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2656  } else {
2657  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2658  }
2659  }
2660  result[buf_idx] = p;
2661  }
2662 
2663  // wait for the async requests we made for string dictionary
2664  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2665  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2666  }
2667  return result;
2668 }
2669 
2671  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2672  size_t row_count,
2673  const TableDescriptor* shard_table,
2674  bool checkpoint) {
2675  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2676  // patch insert_data with new column
2677  if (this->getReplicating()) {
2678  for (const auto& import_buff : import_buffers) {
2679  insert_data_.replicate_count = import_buff->get_replicate_count();
2680  insert_data_.columnDescriptors[import_buff->getColumnDesc()->columnId] =
2681  import_buff->getColumnDesc();
2682  }
2683  }
2684 
2686  ins_data.numRows = row_count;
2687  bool success = true;
2688 
2689  ins_data.data = get_data_block_pointers(import_buffers);
2690 
2691  for (const auto& import_buffer : import_buffers) {
2692  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2693  }
2694 
2695  // release loader_lock so that in InsertOrderFragmenter::insertDat
2696  // we can have multiple threads sort/shuffle InsertData
2697  loader_lock.unlock();
2698 
2699  {
2700  try {
2701  if (checkpoint) {
2702  shard_table->fragmenter->insertData(ins_data);
2703  } else {
2704  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2705  }
2706  } catch (std::exception& e) {
2707  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2708  success = false;
2709  }
2710  }
2711  return success;
2712 }
2713 
2717  for (auto cd : column_descs_) {
2718  insert_data_.columnIds.push_back(cd->columnId);
2719  if (cd->columnType.get_compression() == kENCODING_DICT) {
2720  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2721  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2722  CHECK(dd);
2723  dict_map_[cd->columnId] = dd->stringDict.get();
2724  }
2725  }
2726  insert_data_.numRows = 0;
2727 }
2728 
2731  split_raw_data();
2733 }
2734 
2735 ImportStatus Detector::importDelimited(const std::string& file_path,
2736  const bool decompressed) {
2737  if (!p_file) {
2738  p_file = fopen(file_path.c_str(), "rb");
2739  }
2740  if (!p_file) {
2741  throw std::runtime_error("failed to open file '" + file_path +
2742  "': " + strerror(errno));
2743  }
2744 
2745  // somehow clang does not support ext/stdio_filebuf.h, so
2746  // need to diy readline with customized copy_params.line_delim...
2747  char line[1 << 20];
2748  auto end_time = std::chrono::steady_clock::now() +
2749  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2750  try {
2751  while (!feof(p_file)) {
2752  int c;
2753  size_t n = 0;
2754  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2755  line[n++] = c;
2756  if (n >= sizeof(line) - 1) {
2757  break;
2758  }
2759  }
2760  if (0 == n) {
2761  break;
2762  }
2763  line[n] = 0;
2764  // remember the first line, which is possibly a header line, to
2765  // ignore identical header line(s) in 2nd+ files of a archive;
2766  // otherwise, 2nd+ header may be mistaken as an all-string row
2767  // and so be final column types.
2768  if (line1.empty()) {
2769  line1 = line;
2770  } else if (line == line1) {
2771  continue;
2772  }
2773 
2774  raw_data += std::string(line, n);
2777  if (std::chrono::steady_clock::now() > end_time) {
2778  if (import_status.rows_completed > 10000) {
2779  break;
2780  }
2781  }
2782  }
2783  } catch (std::exception& e) {
2784  }
2785 
2786  // as if load truncated
2788  load_failed = true;
2789 
2790  fclose(p_file);
2791  p_file = nullptr;
2792  return import_status;
2793 }
2794 
2796  // this becomes analogous to Importer::import()
2798 }
2799 
2801  if (copy_params.delimiter == '\0') {
2802  copy_params.delimiter = ',';
2803  if (boost::filesystem::extension(file_path) == ".tsv") {
2804  copy_params.delimiter = '\t';
2805  }
2806  }
2807 }
2808 
2810  const char* buf = raw_data.c_str();
2811  const char* buf_end = buf + raw_data.size();
2812  bool try_single_thread = false;
2813  for (const char* p = buf; p < buf_end; p++) {
2814  std::vector<std::string> row;
2816  p, buf_end, buf_end, copy_params, nullptr, row, try_single_thread);
2817  raw_rows.push_back(row);
2818  if (try_single_thread) {
2819  break;
2820  }
2821  }
2822  if (try_single_thread) {
2823  copy_params.threads = 1;
2824  raw_rows.clear();
2825  for (const char* p = buf; p < buf_end; p++) {
2826  std::vector<std::string> row;
2828  p, buf_end, buf_end, copy_params, nullptr, row, try_single_thread);
2829  raw_rows.push_back(row);
2830  }
2831  }
2832 }
2833 
2834 template <class T>
2835 bool try_cast(const std::string& str) {
2836  try {
2837  boost::lexical_cast<T>(str);
2838  } catch (const boost::bad_lexical_cast& e) {
2839  return false;
2840  }
2841  return true;
2842 }
2843 
2844 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2845  std::tm tm_struct;
2846  char* buf;
2847  for (auto format : formats) {
2848  buf = strptime(str, format.c_str(), &tm_struct);
2849  if (buf) {
2850  return buf;
2851  }
2852  }
2853  return nullptr;
2854 }
2855 
2856 SQLTypes Detector::detect_sqltype(const std::string& str) {
2857  SQLTypes type = kTEXT;
2858  if (try_cast<double>(str)) {
2859  type = kDOUBLE;
2860  /*if (try_cast<bool>(str)) {
2861  type = kBOOLEAN;
2862  }*/
2863  if (try_cast<int16_t>(str)) {
2864  type = kSMALLINT;
2865  } else if (try_cast<int32_t>(str)) {
2866  type = kINT;
2867  } else if (try_cast<int64_t>(str)) {
2868  type = kBIGINT;
2869  } else if (try_cast<float>(str)) {
2870  type = kFLOAT;
2871  }
2872  }
2873 
2874  // check for geo types
2875  if (type == kTEXT) {
2876  // convert to upper case
2877  std::string str_upper_case = str;
2878  std::transform(
2879  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
2880 
2881  // then test for leading words
2882  if (str_upper_case.find("POINT") == 0) {
2883  type = kPOINT;
2884  } else if (str_upper_case.find("LINESTRING") == 0) {
2885  type = kLINESTRING;
2886  } else if (str_upper_case.find("POLYGON") == 0) {
2888  type = kMULTIPOLYGON;
2889  } else {
2890  type = kPOLYGON;
2891  }
2892  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
2893  type = kMULTIPOLYGON;
2894  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
2895  std::string::npos &&
2896  (str_upper_case.size() % 2) == 0) {
2897  // could be a WKB hex blob
2898  // we can't handle these yet
2899  // leave as TEXT for now
2900  // deliberate return here, as otherwise this would get matched as TIME
2901  // @TODO
2902  // implement WKB import
2903  return type;
2904  }
2905  }
2906 
2907  // check for time types
2908  if (type == kTEXT) {
2909  // @TODO
2910  // make these tests more robust so they don't match stuff they should not
2911  char* buf;
2912  buf = try_strptimes(str.c_str(), {"%Y-%m-%d", "%m/%d/%Y", "%d-%b-%y", "%d/%b/%Y"});
2913  if (buf) {
2914  type = kDATE;
2915  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
2916  buf++;
2917  }
2918  }
2919  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
2920  {"%T %z", "%T", "%H%M%S", "%R"});
2921  if (buf) {
2922  if (type == kDATE) {
2923  type = kTIMESTAMP;
2924  } else {
2925  type = kTIME;
2926  }
2927  }
2928  }
2929 
2930  return type;
2931 }
2932 
2933 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
2934  std::vector<SQLTypes> types(row.size());
2935  for (size_t i = 0; i < row.size(); i++) {
2936  types[i] = detect_sqltype(row[i]);
2937  }
2938  return types;
2939 }
2940 
2942  static std::array<int, kSQLTYPE_LAST> typeorder;
2943  typeorder[kCHAR] = 0;
2944  typeorder[kBOOLEAN] = 2;
2945  typeorder[kSMALLINT] = 3;
2946  typeorder[kINT] = 4;
2947  typeorder[kBIGINT] = 5;
2948  typeorder[kFLOAT] = 6;
2949  typeorder[kDOUBLE] = 7;
2950  typeorder[kTIMESTAMP] = 8;
2951  typeorder[kTIME] = 9;
2952  typeorder[kDATE] = 10;
2953  typeorder[kPOINT] = 11;
2954  typeorder[kLINESTRING] = 11;
2955  typeorder[kPOLYGON] = 11;
2956  typeorder[kMULTIPOLYGON] = 11;
2957  typeorder[kTEXT] = 12;
2958 
2959  // note: b < a instead of a < b because the map is ordered most to least restrictive
2960  return typeorder[b] < typeorder[a];
2961 }
2962 
2965  best_encodings =
2966  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
2967  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
2968  switch (copy_params.has_header) {
2970  has_headers = detect_headers(head_types, best_sqltypes);
2971  if (has_headers) {
2973  } else {
2975  }
2976  break;
2978  has_headers = false;
2979  break;
2981  has_headers = true;
2982  break;
2983  }
2984 }
2985 
2988 }
2989 
2990 std::vector<SQLTypes> Detector::find_best_sqltypes(
2991  const std::vector<std::vector<std::string>>& raw_rows,
2992  const CopyParams& copy_params) {
2993  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
2994 }
2995 
2996 std::vector<SQLTypes> Detector::find_best_sqltypes(
2997  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
2998  const std::vector<std::vector<std::string>>::const_iterator& row_end,
2999  const CopyParams& copy_params) {
3000  if (raw_rows.size() < 1) {
3001  throw std::runtime_error("No rows found in: " +
3002  boost::filesystem::basename(file_path));
3003  }
3004  auto end_time = std::chrono::steady_clock::now() + timeout;
3005  size_t num_cols = raw_rows.front().size();
3006  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3007  std::vector<size_t> non_null_col_counts(num_cols, 0);
3008  for (auto row = row_begin; row != row_end; row++) {
3009  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3010  best_types.push_back(kCHAR);
3011  non_null_col_counts.push_back(0);
3012  }
3013  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3014  // do not count nulls
3015  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3016  continue;
3017  }
3018  SQLTypes t = detect_sqltype(row->at(col_idx));
3019  non_null_col_counts[col_idx]++;
3020  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3021  best_types[col_idx] = t;
3022  }
3023  }
3024  if (std::chrono::steady_clock::now() > end_time) {
3025  break;
3026  }
3027  }
3028  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3029  // if we don't have any non-null values for this column make it text to be
3030  // safe b/c that is least restrictive type
3031  if (non_null_col_counts[col_idx] == 0) {
3032  best_types[col_idx] = kTEXT;
3033  }
3034  }
3035 
3036  return best_types;
3037 }
3038 
3039 std::vector<EncodingType> Detector::find_best_encodings(
3040  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3041  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3042  const std::vector<SQLTypes>& best_types) {
3043  if (raw_rows.size() < 1) {
3044  throw std::runtime_error("No rows found in: " +
3045  boost::filesystem::basename(file_path));
3046  }
3047  size_t num_cols = best_types.size();
3048  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3049  std::vector<size_t> num_rows_per_col(num_cols, 1);
3050  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3051  for (auto row = row_begin; row != row_end; row++) {
3052  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3053  if (IS_STRING(best_types[col_idx])) {
3054  count_set[col_idx].insert(row->at(col_idx));
3055  num_rows_per_col[col_idx]++;
3056  }
3057  }
3058  }
3059  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3060  if (IS_STRING(best_types[col_idx])) {
3061  float uniqueRatio =
3062  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3063  if (uniqueRatio < 0.75) {
3064  best_encodes[col_idx] = kENCODING_DICT;
3065  }
3066  }
3067  }
3068  return best_encodes;
3069 }
3070 
3071 // detect_headers returns true if:
3072 // - all elements of the first argument are kTEXT
3073 // - there is at least one instance where tail_types is more restrictive than head_types
3074 // (ie, not kTEXT)
3075 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3076  const std::vector<SQLTypes>& tail_types) {
3077  if (head_types.size() != tail_types.size()) {
3078  return false;
3079  }
3080  bool has_headers = false;
3081  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3082  if (head_types[col_idx] != kTEXT) {
3083  return false;
3084  }
3085  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3086  }
3087  return has_headers;
3088 }
3089 
3090 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3091  n = std::min(n, raw_rows.size());
3092  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3093  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3094  raw_rows.begin() + n);
3095  return sample_rows;
3096 }
3097 
3098 std::vector<std::string> Detector::get_headers() {
3099  std::vector<std::string> headers(best_sqltypes.size());
3100  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3101  if (has_headers && i < raw_rows[0].size()) {
3102  headers[i] = raw_rows[0][i];
3103  } else {
3104  headers[i] = "column_" + std::to_string(i + 1);
3105  }
3106  }
3107  return headers;
3108 }
3109 
3110 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3111  size_t row_count) {
3112  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3113  load_failed = true;
3114  }
3115 }
3116 
3117 void Importer::checkpoint(const int32_t start_epoch) {
3118  if (load_failed) {
3119  // rollback to starting epoch - undo all the added records
3120  loader->setTableEpoch(start_epoch);
3121  } else {
3122  loader->checkpoint();
3123  }
3124 
3125  if (loader->getTableDesc()->persistenceLevel ==
3126  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3127  auto ms = measure<>::execution([&]() {
3128  if (!load_failed) {
3129  for (auto& p : import_buffers_vec[0]) {
3130  if (!p->stringDictCheckpoint()) {
3131  LOG(ERROR) << "Checkpointing Dictionary for Column "
3132  << p->getColumnDesc()->columnName << " failed.";
3133  load_failed = true;
3134  break;
3135  }
3136  }
3137  }
3138  });
3139  if (DEBUG_TIMING) {
3140  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3141  << std::endl;
3142  }
3143  }
3144 }
3145 
3147  // in generalized importing scheme, reaching here file_path may
3148  // contain a file path, a url or a wildcard of file paths.
3149  // see CopyTableStmt::execute.
3150  auto file_paths = mapd_glob(file_path);
3151  if (file_paths.size() == 0) {
3152  file_paths.push_back(file_path);
3153  }
3154 
3155  // sum up sizes of all local files -- only for local files. if
3156  // file_path is a s3 url, sizes will be obtained via S3Archive.
3157  for (const auto& file_path : file_paths) {
3159  }
3160 
3161 #ifdef ENABLE_IMPORT_PARQUET
3162  // s3 parquet goes different route because the files do not use libarchive
3163  // but parquet api, and they need to landed like .7z files.
3164  //
3165  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3166  // because for example spark sql users may specify a output url w/o file
3167  // extension like this:
3168  // df.write
3169  // .mode("overwrite")
3170  // .parquet("s3://bucket/folder/parquet/mydata")
3171  // without the parameter, it means plain or compressed csv files.
3172  // note: .ORC and AVRO files should follow a similar path to Parquet?
3173  if (copy_params.file_type == FileType::PARQUET) {
3174  import_parquet(file_paths);
3175  } else
3176 #endif
3177  {
3178  import_compressed(file_paths);
3179  }
3180  return import_status;
3181 }
3182 
3183 #ifdef ENABLE_IMPORT_PARQUET
3184 inline auto open_parquet_table(const std::string& file_path,
3185  std::shared_ptr<arrow::io::ReadableFile>& infile,
3186  std::unique_ptr<parquet::arrow::FileReader>& reader,
3187  std::shared_ptr<arrow::Table>& table) {
3188  using namespace parquet::arrow;
3189  using ReadableFile = arrow::io::ReadableFile;
3190  auto mempool = arrow::default_memory_pool();
3191  PARQUET_THROW_NOT_OK(ReadableFile::Open(file_path, mempool, &infile));
3192  PARQUET_THROW_NOT_OK(OpenFile(infile, mempool, &reader));
3193  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3194  const auto num_row_groups = reader->num_row_groups();
3195  const auto num_columns = table->num_columns();
3196  const auto num_rows = table->num_rows();
3197  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3198  << " columns in " << num_row_groups << " groups.";
3199  return std::make_tuple(num_row_groups, num_columns, num_rows);
3200 }
3201 
3202 void Detector::import_local_parquet(const std::string& file_path) {
3203  std::shared_ptr<arrow::io::ReadableFile> infile;
3204  std::unique_ptr<parquet::arrow::FileReader> reader;
3205  std::shared_ptr<arrow::Table> table;
3206  int num_row_groups, num_columns;
3207  int64_t num_rows;
3208  std::tie(num_row_groups, num_columns, num_rows) =
3209  open_parquet_table(file_path, infile, reader, table);
3210  // make up header line if not yet
3211  if (0 == raw_data.size()) {
3213  copy_params.line_delim = '\n';
3214  copy_params.delimiter = ',';
3215  // must quote values to skip any embedded delimiter
3216  copy_params.quoted = true;
3217  copy_params.quote = '"';
3218  copy_params.escape = '"';
3219  for (int c = 0; c < num_columns; ++c) {
3220  if (c) {
3222  }
3223  raw_data += table->column(c)->name();
3224  }
3226  }
3227  // make up raw data... rowwize...
3228  const ColumnDescriptor cd;
3229  for (int g = 0; g < num_row_groups; ++g) {
3230  // data is columnwise
3231  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3232  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3233  arrays.resize(num_columns);
3234  for (int c = 0; c < num_columns; ++c) {
3235  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3236  for (auto chunk : arrays[c]->chunks()) {
3237  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3238  }
3239  }
3240  for (int r = 0; r < num_rows; ++r) {
3241  for (int c = 0; c < num_columns; ++c) {
3242  std::vector<std::string> buffer;
3243  for (auto chunk : arrays[c]->chunks()) {
3244  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3245  if (c) {
3247  }
3248  if (!chunk->IsNull(r)) {
3250  raw_data += boost::replace_all_copy(
3251  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3253  }
3254  }
3255  }
3257  if (++import_status.rows_completed >= 10000) {
3258  // as if load truncated
3260  load_failed = true;
3261  return;
3262  }
3263  }
3264  }
3265 }
3266 
3267 template <typename DATA_TYPE>
3268 auto TypedImportBuffer::del_values(std::vector<DATA_TYPE>& buffer,
3269  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
3270  const auto old_size = buffer.size();
3271  // erase backward to minimize memory movement overhead
3272  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3273  ++rit) {
3274  buffer.erase(buffer.begin() + *rit);
3275  }
3276  return std::make_tuple(old_size, buffer.size());
3277 }
3278 
3280  BadRowsTracker* const bad_rows_tracker) {
3281  switch (type) {
3282  case kBOOLEAN:
3283  return del_values(*bool_buffer_, bad_rows_tracker);
3284  case kTINYINT:
3285  return del_values(*tinyint_buffer_, bad_rows_tracker);
3286  case kSMALLINT:
3287  return del_values(*smallint_buffer_, bad_rows_tracker);
3288  case kINT:
3289  return del_values(*int_buffer_, bad_rows_tracker);
3290  case kBIGINT:
3291  case kNUMERIC:
3292  case kDECIMAL:
3293  case kTIME:
3294  case kTIMESTAMP:
3295  case kDATE:
3296  return del_values(*bigint_buffer_, bad_rows_tracker);
3297  case kFLOAT:
3298  return del_values(*float_buffer_, bad_rows_tracker);
3299  case kDOUBLE:
3300  return del_values(*double_buffer_, bad_rows_tracker);
3301  case kTEXT:
3302  case kVARCHAR:
3303  case kCHAR:
3304  return del_values(*string_buffer_, bad_rows_tracker);
3305  case kPOINT:
3306  case kLINESTRING:
3307  case kPOLYGON:
3308  case kMULTIPOLYGON:
3309  return del_values(*geo_string_buffer_, bad_rows_tracker);
3310  case kARRAY:
3311  return del_values(*array_buffer_, bad_rows_tracker);
3312  default:
3313  throw std::runtime_error("Invalid Type");
3314  }
3315 }
3316 
3317 void Importer::import_local_parquet(const std::string& file_path) {
3318  std::shared_ptr<arrow::io::ReadableFile> infile;
3319  std::unique_ptr<parquet::arrow::FileReader> reader;
3320  std::shared_ptr<arrow::Table> table;
3321  int num_row_groups, num_columns;
3322  int64_t nrow_in_file;
3323  std::tie(num_row_groups, num_columns, nrow_in_file) =
3324  open_parquet_table(file_path, infile, reader, table);
3325  // column_list has no $deleted
3326  const auto& column_list = get_column_descs();
3327  // for now geo columns expect a wkt string
3328  std::vector<const ColumnDescriptor*> cds;
3329  int num_physical_cols = 0;
3330  for (auto& cd : column_list) {
3331  cds.push_back(cd);
3332  num_physical_cols += cd->columnType.get_physical_cols();
3333  }
3334  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3335  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3336  std::to_string(num_columns) + " columns in file vs " +
3337  std::to_string(column_list.size() - num_physical_cols) +
3338  " columns in table.");
3339  // slice each group to import slower columns faster, eg. geo or string
3341  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3342  // init row estimate for this file
3343  const auto filesize = get_filesize(file_path);
3344  size_t nrow_completed{0};
3345  file_offsets.push_back(0);
3346  // map logic column index to physical column index
3347  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3348  int physical_col_idx = 0;
3349  for (int i = 0; i < logic_col_idx; ++i) {
3350  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3351  }
3352  return physical_col_idx;
3353  };
3354  // load a file = nested iteration of row groups, row slices and logical columns
3355  auto ms_load_a_file = measure<>::execution([&]() {
3356  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3357  // a sliced row group will be handled like a (logic) parquet file, with
3358  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3359  import_buffers_vec.resize(num_slices);
3360  for (int slice = 0; slice < num_slices; slice++) {
3361  import_buffers_vec[slice].clear();
3362  for (const auto cd : cds) {
3363  import_buffers_vec[slice].emplace_back(
3364  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3365  }
3366  }
3367  /*
3368  * A caveat here is: Parquet files or arrow data is imported column wise.
3369  * Unlike importing row-wise csv files, a error on any row of any column
3370  * forces to give up entire row group of all columns, unless there is a
3371  * sophisticated method to trace erroneous rows in individual columns so
3372  * that we can union bad rows and drop them from corresponding
3373  * import_buffers_vec; otherwise, we may exceed maximum number of
3374  * truncated rows easily even with very sparse errors in the files.
3375  */
3376  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3377  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3378  auto& bad_rows_tracker = bad_rows_trackers[slice];
3379  bad_rows_tracker.file_name = file_path;
3380  bad_rows_tracker.row_group = slice;
3381  bad_rows_tracker.importer = this;
3382  }
3383  // process arrow arrays to import buffers
3384  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3385  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3386  const auto cd = cds[physical_col_idx];
3387  std::shared_ptr<arrow::ChunkedArray> array;
3388  PARQUET_THROW_NOT_OK(
3389  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3390  const size_t array_size = array->length();
3391  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3392  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3393  for (int slice = 0; slice < num_slices; ++slice) {
3394  thread_controller.startThread([&, slice] {
3395  const auto slice_offset = slice % num_slices;
3396  ArraySliceRange slice_range(
3397  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3398  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3399  auto& bad_rows_tracker = bad_rows_trackers[slice];
3400  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3401  import_buffer->import_buffers = &import_buffers_vec[slice];
3402  import_buffer->col_idx = physical_col_idx + 1;
3403  for (auto chunk : array->chunks()) {
3404  import_buffer->add_arrow_values(
3405  cd, *chunk, false, slice_range, &bad_rows_tracker);
3406  }
3407  });
3408  }
3409  thread_controller.finish();
3410  }
3411  std::vector<size_t> nrow_in_slice_raw(num_slices);
3412  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3413  // trim bad rows from import buffers
3414  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3415  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3416  const auto cd = cds[physical_col_idx];
3417  for (int slice = 0; slice < num_slices; ++slice) {
3418  auto& bad_rows_tracker = bad_rows_trackers[slice];
3419  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3420  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3421  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3422  }
3423  }
3424  // flush slices of this row group to chunks
3425  for (int slice = 0; slice < num_slices; ++slice) {
3426  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3427  }
3428  // update import stats
3429  const auto nrow_original =
3430  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3431  const auto nrow_imported =
3432  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3433  nrow_in_slice_successfully_loaded.end(),
3434  0);
3435  const auto nrow_dropped = nrow_original - nrow_imported;
3436  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3437  << " rows, drop " << nrow_dropped << " rows.";
3438  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3439  import_status.rows_completed += nrow_imported;
3440  import_status.rows_rejected += nrow_dropped;
3443  load_failed = true;
3444  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3445  << ") rows rejected exceeded. Halting load.";
3446  }
3447  // row estimate
3448  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3449  nrow_completed += nrow_imported;
3450  file_offsets.back() =
3451  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3452  // sum up current total file offsets
3453  const auto total_file_offset =
3454  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3455  // estimate number of rows per current total file offset
3456  if (total_file_offset) {
3458  (float)total_file_size / total_file_offset * import_status.rows_completed;
3459  VLOG(3) << "rows_completed " << import_status.rows_completed
3460  << ", rows_estimated " << import_status.rows_estimated
3461  << ", total_file_size " << total_file_size << ", total_file_offset "
3462  << total_file_offset;
3463  }
3464  }
3465  });
3466  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3467  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3468 }
3469 
3470 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3471  auto importer = dynamic_cast<Importer*>(this);
3472  auto start_epoch = importer ? importer->getLoader()->getTableEpoch() : 0;
3473  try {
3474  std::exception_ptr teptr;
3475  // file_paths may contain one local file path, a list of local file paths
3476  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3477  for (auto const& file_path : file_paths) {
3478  std::map<int, std::string> url_parts;
3479  Archive::parse_url(file_path, url_parts);
3480 
3481  // for a s3 url we need to know the obj keys that it comprises
3482  std::vector<std::string> objkeys;
3483  std::unique_ptr<S3ParquetArchive> us3arch;
3484  if ("s3" == url_parts[2]) {
3485 #ifdef HAVE_AWS_S3
3486  us3arch.reset(new S3ParquetArchive(file_path,
3492  us3arch->init_for_read();
3493  total_file_size += us3arch->get_total_file_size();
3494  objkeys = us3arch->get_objkeys();
3495 #else
3496  throw std::runtime_error("AWS S3 support not available");
3497 #endif // HAVE_AWS_S3
3498  } else {
3499  objkeys.emplace_back(file_path);
3500  }
3501 
3502  // for each obj key of a s3 url we need to land it before
3503  // importing it like doing with a 'local file'.
3504  for (auto const& objkey : objkeys) {
3505  try {
3506  auto file_path =
3507  us3arch
3508  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3509  : objkey;
3510  import_local_parquet(file_path);
3511  if (us3arch) {
3512  us3arch->vacuum(objkey);
3513  }
3514  } catch (...) {
3515  if (us3arch) {
3516  us3arch->vacuum(objkey);
3517  }
3518  throw;
3519  }
3521  break;
3522  }
3523  }
3524  }
3525  // rethrow any exception happened herebefore
3526  if (teptr) {
3527  std::rethrow_exception(teptr);
3528  }
3529  } catch (...) {
3530  load_failed = true;
3532  throw;
3533  }
3534  }
3535 
3536  if (importer) {
3537  importer->checkpoint(start_epoch);
3538  }
3539 }
3540 #endif // ENABLE_IMPORT_PARQUET
3541 
3542 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3543  // a new requirement is to have one single input stream into
3544  // Importer::importDelimited, so need to move pipe related
3545  // stuff to the outmost block.
3546  int fd[2];
3547  if (pipe(fd) < 0) {
3548  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3549  }
3550  signal(SIGPIPE, SIG_IGN);
3551 
3552  std::exception_ptr teptr;
3553  // create a thread to read uncompressed byte stream out of pipe and
3554  // feed into importDelimited()
3555  ImportStatus ret;
3556  auto th_pipe_reader = std::thread([&]() {
3557  try {
3558  // importDelimited will read from FILE* p_file
3559  if (0 == (p_file = fdopen(fd[0], "r"))) {
3560  throw std::runtime_error(std::string("failed to open a pipe: ") +
3561  strerror(errno));
3562  }
3563 
3564  // in future, depending on data types of this uncompressed stream
3565  // it can be feed into other function such like importParquet, etc
3566  ret = importDelimited(file_path, true);
3567  } catch (...) {
3568  if (!teptr) { // no replace
3569  teptr = std::current_exception();
3570  }
3571  }
3572 
3573  if (p_file) {
3574  fclose(p_file);
3575  }
3576  p_file = 0;
3577  });
3578 
3579  // create a thread to iterate all files (in all archives) and
3580  // forward the uncompressed byte stream to fd[1] which is
3581  // then feed into importDelimited, importParquet, and etc.
3582  auto th_pipe_writer = std::thread([&]() {
3583  std::unique_ptr<S3Archive> us3arch;
3584  bool stop = false;
3585  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3586  try {
3587  auto file_path = file_paths[fi];
3588  std::unique_ptr<Archive> uarch;
3589  std::map<int, std::string> url_parts;
3590  Archive::parse_url(file_path, url_parts);
3591  const std::string S3_objkey_url_scheme = "s3ok";
3592  if ("file" == url_parts[2] || "" == url_parts[2]) {
3593  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3594  } else if ("s3" == url_parts[2]) {
3595 #ifdef HAVE_AWS_S3
3596  // new a S3Archive with a shared s3client.
3597  // should be safe b/c no wildcard with s3 url
3598  us3arch.reset(new S3Archive(file_path,
3604  us3arch->init_for_read();
3605  total_file_size += us3arch->get_total_file_size();
3606  // not land all files here but one by one in following iterations
3607  for (const auto& objkey : us3arch->get_objkeys()) {
3608  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3609  }
3610  continue;
3611 #else
3612  throw std::runtime_error("AWS S3 support not available");
3613 #endif // HAVE_AWS_S3
3614  } else if (S3_objkey_url_scheme == url_parts[2]) {
3615 #ifdef HAVE_AWS_S3
3616  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3617  auto file_path =
3618  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3619  if (0 == file_path.size()) {
3620  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3621  }
3622  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3623  // file not removed until file closed
3624  us3arch->vacuum(objkey);
3625 #else
3626  throw std::runtime_error("AWS S3 support not available");
3627 #endif // HAVE_AWS_S3
3628  }
3629 #if 0 // TODO(ppan): implement and enable any other archive class
3630  else
3631  if ("hdfs" == url_parts[2])
3632  uarch.reset(new HdfsArchive(file_path));
3633 #endif
3634  else {
3635  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3636  }
3637 
3638  // init the archive for read
3639  auto& arch = *uarch;
3640 
3641  // coming here, the archive of url should be ready to be read, unarchived
3642  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3643  const void* buf;
3644  size_t size;
3645  bool just_saw_archive_header;
3646  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3647  bool first_text_header_skipped = false;
3648  // start reading uncompressed bytes of this archive from libarchive
3649  // note! this archive may contain more than one files!
3650  file_offsets.push_back(0);
3651  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3652  bool insert_line_delim_after_this_file = false;
3653  while (!stop) {
3654  int64_t offset{-1};
3655  auto ok = arch.read_data_block(&buf, &size, &offset);
3656  // can't use (uncompressed) size, so track (max) file offset.
3657  // also we want to capture offset even on e.o.f.
3658  if (offset > 0) {
3659  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3660  file_offsets.back() = offset;
3661  }
3662  if (!ok) {
3663  break;
3664  }
3665  // one subtle point here is now we concatenate all files
3666  // to a single FILE stream with which we call importDelimited
3667  // only once. this would make it misunderstand that only one
3668  // header line is with this 'single' stream, while actually
3669  // we may have one header line for each of the files.
3670  // so we need to skip header lines here instead in importDelimited.
3671  const char* buf2 = (const char*)buf;
3672  int size2 = size;
3674  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3675  while (size2-- > 0) {
3676  if (*buf2++ == copy_params.line_delim) {
3677  break;
3678  }
3679  }
3680  if (size2 <= 0) {
3681  LOG(WARNING) << "No line delimiter in block." << std::endl;
3682  } else {
3683  just_saw_archive_header = false;
3684  first_text_header_skipped = true;
3685  }
3686  }
3687  // In very rare occasions the write pipe somehow operates in a mode similar to
3688  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3689  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3690  // loop reading till no bytes left, otherwise the annoying `failed to write
3691  // pipe: Success`...
3692  if (size2 > 0) {
3693  int nremaining = size2;
3694  while (nremaining > 0) {
3695  // try to write the entire remainder of the buffer to the pipe
3696  int nwritten = write(fd[1], buf2, nremaining);
3697  // how did we do?
3698  if (nwritten < 0) {
3699  // something bad happened
3700  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3701  // ignore these, assume nothing written, try again
3702  nwritten = 0;
3703  } else {
3704  // a real error
3705  throw std::runtime_error(
3706  std::string("failed or interrupted write to pipe: ") +
3707  strerror(errno));
3708  }
3709  } else if (nwritten == nremaining) {
3710  // we wrote everything; we're done
3711  break;
3712  }
3713  // only wrote some (or nothing), try again
3714  nremaining -= nwritten;
3715  buf2 += nwritten;
3716  // no exception when too many rejected
3717  // @simon.eves how would this get set? from the other thread? mutex
3718  // needed?
3720  stop = true;
3721  break;
3722  }
3723  }
3724  // check that this file (buf for size) ended with a line delim
3725  if (size > 0) {
3726  const char* plast = static_cast<const char*>(buf) + (size - 1);
3727  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3728  }
3729  }
3730  }
3731  // if that file didn't end with a line delim, we insert one here to terminate
3732  // that file's stream use a loop for the same reason as above
3733  if (insert_line_delim_after_this_file) {
3734  while (true) {
3735  // write the delim char to the pipe
3736  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3737  // how did we do?
3738  if (nwritten < 0) {
3739  // something bad happened
3740  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3741  // ignore these, assume nothing written, try again
3742  nwritten = 0;
3743  } else {
3744  // a real error
3745  throw std::runtime_error(
3746  std::string("failed or interrupted write to pipe: ") +
3747  strerror(errno));
3748  }
3749  } else if (nwritten == 1) {
3750  // we wrote it; we're done
3751  break;
3752  }
3753  }
3754  }
3755  }
3756  } catch (...) {
3757  // when import is aborted because too many data errors or because end of a
3758  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3759  // suppressed.
3760  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3762  break;
3763  }
3764  if (import_status.rows_completed > 0) {
3765  if (nullptr != dynamic_cast<Detector*>(this)) {
3766  break;
3767  }
3768  }
3769  if (!teptr) { // no replace
3770  teptr = std::current_exception();
3771  }
3772  break;
3773  }
3774  }
3775  // close writer end
3776  close(fd[1]);
3777  });
3778 
3779  th_pipe_reader.join();
3780  th_pipe_writer.join();
3781 
3782  // rethrow any exception happened herebefore
3783  if (teptr) {
3784  std::rethrow_exception(teptr);
3785  }
3786 }
3787 
3790 }
3791 
3792 ImportStatus Importer::importDelimited(const std::string& file_path,
3793  const bool decompressed) {
3794  bool load_truncated = false;
3796 
3797  if (!p_file) {
3798  p_file = fopen(file_path.c_str(), "rb");
3799  }
3800  if (!p_file) {
3801  throw std::runtime_error("failed to open file '" + file_path +
3802  "': " + strerror(errno));
3803  }
3804 
3805  if (!decompressed) {
3806  (void)fseek(p_file, 0, SEEK_END);
3807  file_size = ftell(p_file);
3808  }
3809 
3810  if (copy_params.threads == 0) {
3811  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3812  } else {
3813  max_threads = static_cast<size_t>(copy_params.threads);
3814  }
3815 
3816  // deal with small files
3817  size_t alloc_size = copy_params.buffer_size;
3818  if (!decompressed && file_size < alloc_size) {
3819  alloc_size = file_size;
3820  }
3821 
3822  for (size_t i = 0; i < max_threads; i++) {
3823  import_buffers_vec.emplace_back();
3824  for (const auto cd : loader->get_column_descs()) {
3825  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3826  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3827  }
3828  }
3829 
3830  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3831  size_t current_pos = 0;
3832  size_t end_pos;
3833  size_t begin_pos = 0;
3834 
3835  (void)fseek(p_file, current_pos, SEEK_SET);
3836  size_t size =
3837  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3838 
3839  // make render group analyzers for each poly column
3840  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3842  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3843  loader->getTableDesc()->tableId, false, false, false);
3844  for (auto cd : columnDescriptors) {
3845  SQLTypes ct = cd->columnType.get_type();
3846  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3847  auto rga = std::make_shared<RenderGroupAnalyzer>();
3848  rga->seedFromExistingTableContents(loader, cd->columnName);
3849  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3850  }
3851  }
3852  }
3853 
3854  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3855  loader->getTableDesc()->tableId};
3856  auto start_epoch = loader->getTableEpoch();
3857  {
3858  std::list<std::future<ImportStatus>> threads;
3859 
3860  // use a stack to track thread_ids which must not overlap among threads
3861  // because thread_id is used to index import_buffers_vec[]
3862  std::stack<size_t> stack_thread_ids;
3863  for (size_t i = 0; i < max_threads; i++) {
3864  stack_thread_ids.push(i);
3865  }
3866  // added for true row index on error
3867  size_t first_row_index_this_buffer = 0;
3868 
3869  while (size > 0) {
3870  unsigned int num_rows_this_buffer = 0;
3871  CHECK(scratch_buffer);
3873  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
3874 
3875  // unput residual
3876  int nresidual = size - end_pos;
3877  std::unique_ptr<char[]> unbuf;
3878  if (nresidual > 0) {
3879  unbuf = std::make_unique<char[]>(nresidual);
3880  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3881  }
3882 
3883  // get a thread_id not in use
3884  auto thread_id = stack_thread_ids.top();
3885  stack_thread_ids.pop();
3886  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3887 
3888  threads.push_back(std::async(std::launch::async,
3890  thread_id,
3891  this,
3892  std::move(scratch_buffer),
3893  begin_pos,
3894  end_pos,
3895  end_pos,
3896  columnIdToRenderGroupAnalyzerMap,
3897  first_row_index_this_buffer));
3898 
3899  first_row_index_this_buffer += num_rows_this_buffer;
3900 
3901  current_pos += end_pos;
3902  scratch_buffer = std::make_unique<char[]>(alloc_size);
3903  CHECK(scratch_buffer);
3904  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3905  size = nresidual + fread(scratch_buffer.get() + nresidual,
3906  1,
3907  copy_params.buffer_size - nresidual,
3908  p_file);
3909 
3910  begin_pos = 0;
3911 
3912  while (threads.size() > 0) {
3913  int nready = 0;
3914  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3915  it != threads.end();) {
3916  auto& p = *it;
3917  std::chrono::milliseconds span(
3918  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3919  if (p.wait_for(span) == std::future_status::ready) {
3920  auto ret_import_status = p.get();
3921  import_status += ret_import_status;
3922  // sum up current total file offsets
3923  size_t total_file_offset{0};
3924  if (decompressed) {
3925  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3926  for (const auto file_offset : file_offsets) {
3927  total_file_offset += file_offset;
3928  }
3929  }
3930  // estimate number of rows per current total file offset
3931  if (decompressed ? total_file_offset : current_pos) {
3933  (decompressed ? (float)total_file_size / total_file_offset
3934  : (float)file_size / current_pos) *
3936  }
3937  VLOG(3) << "rows_completed " << import_status.rows_completed
3938  << ", rows_estimated " << import_status.rows_estimated
3939  << ", total_file_size " << total_file_size << ", total_file_offset "
3940  << total_file_offset;
3942  // recall thread_id for reuse
3943  stack_thread_ids.push(ret_import_status.thread_id);
3944  threads.erase(it++);
3945  ++nready;
3946  } else {
3947  ++it;
3948  }
3949  }
3950 
3951  if (nready == 0) {
3952  std::this_thread::yield();
3953  }
3954 
3955  // on eof, wait all threads to finish
3956  if (0 == size) {
3957  continue;
3958  }
3959 
3960  // keep reading if any free thread slot
3961  // this is one of the major difference from old threading model !!
3962  if (threads.size() < max_threads) {
3963  break;
3964  }
3965  }
3966 
3967  if (import_status.rows_rejected > copy_params.max_reject) {
3968  load_truncated = true;
3969  load_failed = true;
3970  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
3971  break;
3972  }
3973  if (load_failed) {
3974  load_truncated = true;
3975  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
3976  "more details";
3977  break;
3978  }
3979  }
3980 
3981  // join dangling threads in case of LOG(ERROR) above
3982  for (auto& p : threads) {
3983  p.wait();
3984  }
3985  }
3986 
3987  checkpoint(start_epoch);
3988 
3989  // must set import_status.load_truncated before closing this end of pipe
3990  // otherwise, the thread on the other end would throw an unwanted 'write()'
3991  // exception
3992  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3993  import_status.load_truncated = load_truncated;
3994 
3995  fclose(p_file);
3996  p_file = nullptr;
3997  return import_status;
3998 }
3999 
4001  if (getTableDesc()->persistenceLevel ==
4002  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4003  getCatalog().checkpoint(getTableDesc()->tableId);
4004  }
4005 }
4006 
4008  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4009  getTableDesc()->tableId);
4010 }
4011 
4012 void Loader::setTableEpoch(const int32_t start_epoch) {
4014  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4015 }
4016 
4017 void GDALErrorHandler(CPLErr eErrClass, int err_no, const char* msg) {
4018  CHECK(eErrClass >= CE_None && eErrClass <= CE_Fatal);
4019  static const char* errClassStrings[5] = {
4020  "Info",
4021  "Debug",
4022  "Warning",
4023  "Failure",
4024  "Fatal",
4025  };
4026  std::string log_msg = std::string("GDAL ") + errClassStrings[eErrClass] +
4027  std::string(": ") + msg + std::string(" (") +
4028  std::to_string(err_no) + std::string(")");
4029  if (eErrClass >= CE_Failure) {
4030  throw std::runtime_error(log_msg);
4031  } else {
4032  LOG(INFO) << log_msg;
4033  }
4034 }
4035 
4036 /* static */
4037 std::mutex Importer::init_gdal_mutex;
4038 
4039 /* static */
4041  // this should not be called from multiple threads, but...
4042  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
4043  // init under mutex
4044  static bool gdal_initialized = false;
4045  if (!gdal_initialized) {
4046  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
4047  setenv("GDAL_DATA",
4048  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
4049  true);
4050 
4051  // configure SSL certificate path (per S3Archive::init_for_read)
4052  // in a production build, GDAL and Curl will have been built on
4053  // CentOS, so the baked-in system path will be wrong for Ubuntu
4054  // and other Linux distros. Unless the user is deliberately
4055  // overriding it by setting SSL_CERT_FILE explicitly in the server
4056  // environment, we set it to whichever CA bundle directory exists
4057  // on the machine we're running on
4058  std::list<std::string> v_known_ca_paths({
4059  "/etc/ssl/certs/ca-certificates.crt",
4060  "/etc/pki/tls/certs/ca-bundle.crt",
4061  "/usr/share/ssl/certs/ca-bundle.crt",
4062  "/usr/local/share/certs/ca-root.crt",
4063  "/etc/ssl/cert.pem",
4064  "/etc/ssl/ca-bundle.pem",
4065  });
4066  for (const auto& known_ca_path : v_known_ca_paths) {
4067  if (boost::filesystem::exists(known_ca_path)) {
4068  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4069  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4070  break;
4071  }
4072  }
4073 
4074  GDALAllRegister();
4075  OGRRegisterAll();
4076  CPLSetErrorHandler(*GDALErrorHandler);
4077  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4078  gdal_initialized = true;
4079  }
4080 }
4081 
4083  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4084 }
4085 
4086 /* static */
4088  // for now we only support S3
4089  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4090  // only set if non-empty to allow GDAL defaults to persist
4091  // explicitly clear if empty to revert to default and not reuse a previous session's
4092  // keys
4093  if (copy_params.s3_region.size()) {
4094 #if DEBUG_AWS_AUTHENTICATION
4095  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4096 #endif
4097  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4098  } else {
4099 #if DEBUG_AWS_AUTHENTICATION
4100  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4101 #endif
4102  CPLSetConfigOption("AWS_REGION", nullptr);
4103  }
4104  if (copy_params.s3_endpoint.size()) {
4105 #if DEBUG_AWS_AUTHENTICATION
4106  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4107 #endif
4108  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4109  } else {
4110 #if DEBUG_AWS_AUTHENTICATION
4111  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4112 #endif
4113  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4114  }
4115  if (copy_params.s3_access_key.size()) {
4116 #if DEBUG_AWS_AUTHENTICATION
4117  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4118  << "'";
4119 #endif
4120  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4121  } else {
4122 #if DEBUG_AWS_AUTHENTICATION
4123  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4124 #endif
4125  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4126  }
4127  if (copy_params.s3_secret_key.size()) {
4128 #if DEBUG_AWS_AUTHENTICATION
4129  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4130  << "'";
4131 #endif
4132  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4133  } else {
4134 #if DEBUG_AWS_AUTHENTICATION
4135  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4136 #endif
4137  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4138  }
4139 
4140 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4141  // if we haven't set keys, we need to disable signed access
4142  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4143 #if DEBUG_AWS_AUTHENTICATION
4144  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4145 #endif
4146  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4147  } else {
4148 #if DEBUG_AWS_AUTHENTICATION
4149  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4150 #endif
4151  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4152  }
4153 #endif
4154 }
4155 
4156 /* static */
4157 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4158  const CopyParams& copy_params) {
4159  // lazy init GDAL
4160  initGDAL();
4161 
4162  // set authorization tokens
4163  setGDALAuthorizationTokens(copy_params);
4164 
4165  // open the file
4166  OGRDataSource* poDS;
4167 #if GDAL_VERSION_MAJOR == 1
4168  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4169 #else
4170  poDS = (OGRDataSource*)GDALOpenEx(
4171  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4172  if (poDS == nullptr) {
4173  poDS = (OGRDataSource*)GDALOpenEx(
4174  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4175  if (poDS) {
4176  LOG(INFO) << "openGDALDataset had to open as read-only";
4177  }
4178  }
4179 #endif
4180  if (poDS == nullptr) {
4181  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4182  }
4183  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4184  // in a memory leak if GDAL successfully opened the input dataset.
4185  return poDS;
4186 }
4187 
4188 namespace {
4189 
4190 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4191  const OGRDataSourceUqPtr& poDS,
4192  const std::string& file_name) {
4193  // get layer with specified name, or default to first layer
4194  OGRLayer* poLayer = nullptr;
4195  if (geo_layer_name.size()) {
4196  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4197  if (poLayer == nullptr) {
4198  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4199  file_name);
4200  }
4201  } else {
4202  poLayer = poDS->GetLayer(0);
4203  if (poLayer == nullptr) {
4204  throw std::runtime_error("No layers found in " + file_name);
4205  }
4206  }
4207  return *poLayer;
4208 }
4209 
4210 } // namespace
4211 
4212 /* static */
4214  const std::string& file_name,
4215  const std::string& geo_column_name,
4216  std::map<std::string, std::vector<std::string>>& metadata,
4217  int rowLimit,
4218  const CopyParams& copy_params) {
4219  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4220  if (poDS == nullptr) {
4221  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4222  file_name);
4223  }
4224 
4225  OGRLayer& layer =
4226  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4227 
4228  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4229  CHECK(poFDefn);
4230 
4231  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4232  auto nFeats = layer.GetFeatureCount();
4233  size_t numFeatures =
4234  std::max(static_cast<decltype(nFeats)>(0),
4235  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4236  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4237  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4238  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4239  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4240  }
4241  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4242  layer.ResetReading();
4243  size_t iFeature = 0;
4244  while (iFeature < numFeatures) {
4245  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4246  if (!poFeature) {
4247  break;
4248  }
4249 
4250  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4251  if (poGeometry != nullptr) {
4252  // validate geom type (again?)
4253  switch (wkbFlatten(poGeometry->getGeometryType())) {
4254  case wkbPoint:
4255  case wkbLineString:
4256  case wkbPolygon:
4257  case wkbMultiPolygon:
4258  break;
4259  default:
4260  throw std::runtime_error("Unsupported geometry type: " +
4261  std::string(poGeometry->getGeometryName()));
4262  }
4263 
4264  // populate metadata for regular fields
4265  for (auto i : metadata) {
4266  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4267  if (iField >= 0) { // geom is -1
4268  metadata[i.first].at(iFeature) =
4269  std::string(poFeature->GetFieldAsString(iField));
4270  }
4271  }
4272 
4273  // populate metadata for geo column with WKT string
4274  char* wkts = nullptr;
4275  poGeometry->exportToWkt(&wkts);
4276  CHECK(wkts);
4277  metadata[geo_column_name].at(iFeature) = wkts;
4278  CPLFree(wkts);
4279  }
4280  iFeature++;
4281  }
4282 }
4283 
4284 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4285  switch (ogr_type) {
4286  case OFTInteger:
4287  return std::make_pair(kINT, false);
4288  case OFTIntegerList:
4289  return std::make_pair(kINT, true);
4290 #if GDAL_VERSION_MAJOR > 1
4291  case OFTInteger64:
4292  return std::make_pair(kBIGINT, false);
4293  case OFTInteger64List:
4294  return std::make_pair(kBIGINT, true);
4295 #endif
4296  case OFTReal:
4297  return std::make_pair(kDOUBLE, false);
4298  case OFTRealList:
4299  return std::make_pair(kDOUBLE, true);
4300  case OFTString:
4301  return std::make_pair(kTEXT, false);
4302  case OFTStringList:
4303  return std::make_pair(kTEXT, true);
4304  case OFTDate:
4305  return std::make_pair(kDATE, false);
4306  case OFTTime:
4307  return std::make_pair(kTIME, false);
4308  case OFTDateTime:
4309  return std::make_pair(kTIMESTAMP, false);
4310  case OFTBinary:
4311  default:
4312  break;
4313  }
4314  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4315 }
4316 
4317 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4318  switch (ogr_type) {
4319  case wkbPoint:
4320  return kPOINT;
4321  case wkbLineString:
4322  return kLINESTRING;
4323  case wkbPolygon:
4324  return kPOLYGON;
4325  case wkbMultiPolygon:
4326  return kMULTIPOLYGON;
4327  default:
4328  break;
4329  }
4330  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4331 }
4332 
4333 /* static */
4334 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4335  const std::string& file_name,
4336  const std::string& geo_column_name,
4337  const CopyParams& copy_params) {
4338  std::list<ColumnDescriptor> cds;
4339 
4340  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4341  if (poDS == nullptr) {
4342  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4343  file_name);
4344  }
4345 
4346  OGRLayer& layer =
4347  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4348 
4349  layer.ResetReading();
4350  // TODO(andrewseidl): support multiple features
4351  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4352  if (poFeature == nullptr) {
4353  throw std::runtime_error("No features found in " + file_name);
4354  }
4355  // get fields as regular columns
4356  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4357  CHECK(poFDefn);
4358  int iField;
4359  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4360  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4361  auto typePair = ogr_to_type(poFieldDefn->GetType());
4362  ColumnDescriptor cd;
4363  cd.columnName = poFieldDefn->GetNameRef();
4364  cd.sourceName = poFieldDefn->GetNameRef();
4365  SQLTypeInfo ti;
4366  if (typePair.second) {
4367  ti.set_type(kARRAY);
4368  ti.set_subtype(typePair.first);
4369  } else {
4370  ti.set_type(typePair.first);
4371  }
4372  if (typePair.first == kTEXT) {
4374  ti.set_comp_param(32);
4375  }
4376  ti.set_fixed_size();
4377  cd.columnType = ti;
4378  cds.push_back(cd);
4379  }
4380  // get geo column, if any
4381  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4382  if (poGeometry) {
4383  ColumnDescriptor cd;
4384  cd.columnName = geo_column_name;
4385  cd.sourceName = geo_column_name;
4386 
4387  // get GDAL type
4388  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4389 
4390  // if exploding, override any collection type to child type
4391  if (copy_params.geo_explode_collections) {
4392  if (ogr_type == wkbMultiPolygon) {
4393  ogr_type = wkbPolygon;
4394  } else if (ogr_type == wkbMultiLineString) {
4395  ogr_type = wkbLineString;
4396  } else if (ogr_type == wkbMultiPoint) {
4397  ogr_type = wkbPoint;
4398  }
4399  }
4400 
4401  // convert to internal type
4402  SQLTypes geoType = ogr_to_type(ogr_type);
4403 
4404  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4406  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4407  }
4408 
4409  // build full internal type
4410  SQLTypeInfo ti;
4411  ti.set_type(geoType);
4412  ti.set_subtype(copy_params.geo_coords_type);
4413  ti.set_input_srid(copy_params.geo_coords_srid);
4414  ti.set_output_srid(copy_params.geo_coords_srid);
4415  ti.set_compression(copy_params.geo_coords_encoding);
4416  ti.set_comp_param(copy_params.geo_coords_comp_param);
4417  cd.columnType = ti;
4418 
4419  cds.push_back(cd);
4420  }
4421  return cds;
4422 }
4423 
4424 bool Importer::gdalStatInternal(const std::string& path,
4425  const CopyParams& copy_params,
4426  bool also_dir) {
4427  // lazy init GDAL
4428  initGDAL();
4429 
4430  // set authorization tokens
4431  setGDALAuthorizationTokens(copy_params);
4432 
4433 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4434  // clear GDAL stat cache
4435  // without this, file existence will be cached, even if authentication changes
4436  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4437  VSICurlClearCache();
4438 #endif
4439 
4440  // stat path
4441  VSIStatBufL sb;
4442  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4443  if (result < 0) {
4444  return false;
4445  }
4446 
4447  // exists?
4448  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4449  return true;
4450  } else if (VSI_ISREG(sb.st_mode)) {
4451  return true;
4452  }
4453  return false;
4454 }
4455 
4456 /* static */
4457 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4458  return gdalStatInternal(path, copy_params, false);
4459 }
4460 
4461 /* static */
4462 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4463  const CopyParams& copy_params) {
4464  return gdalStatInternal(path, copy_params, true);
4465 }
4466 
4467 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4468  std::vector<std::string>& files) {
4469  // prepare to gather subdirectories
4470  std::vector<std::string> subdirectories;
4471 
4472  // get entries
4473  char** entries = VSIReadDir(archive_path.c_str());
4474  if (!entries) {
4475  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4476  return;
4477  }
4478 
4479  // force scope
4480  {
4481  // request clean-up
4482  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4483 
4484  // check all the entries
4485  int index = 0;
4486  while (true) {
4487  // get next entry, or drop out if there isn't one
4488  char* entry_c = entries[index++];
4489  if (!entry_c) {
4490  break;
4491  }
4492  std::string entry(entry_c);
4493 
4494  // ignore '.' and '..'
4495  if (entry == "." || entry == "..") {
4496  continue;
4497  }
4498 
4499  // build the full path
4500  std::string entry_path = archive_path + std::string("/") + entry;
4501 
4502  // is it a file or a sub-folder
4503  VSIStatBufL sb;
4504  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4505  if (result < 0) {
4506  break;
4507  }
4508 
4509  if (VSI_ISDIR(sb.st_mode)) {
4510  // a directory that ends with .gdb could be a Geodatabase bundle
4511  // arguably dangerous to decide this purely by name, but any further
4512  // validation would be very complex especially at this scope
4513  if (boost::iends_with(entry_path, ".gdb")) {
4514  // add the directory as if it was a file and don't recurse into it
4515  files.push_back(entry_path);
4516  } else {
4517  // add subdirectory to be recursed into
4518  subdirectories.push_back(entry_path);
4519  }
4520  } else {
4521  // add this file
4522  files.push_back(entry_path);
4523  }
4524  }
4525  }
4526 
4527  // recurse into each subdirectories we found
4528  for (const auto& subdirectory : subdirectories) {
4529  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4530  }
4531 }
4532 
4533 /* static */
4534 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4535  const std::string& archive_path,
4536  const CopyParams& copy_params) {
4537  // lazy init GDAL
4538  initGDAL();
4539 
4540  // set authorization tokens
4541  setGDALAuthorizationTokens(copy_params);
4542 
4543  // prepare to gather files
4544  std::vector<std::string> files;
4545 
4546  // gather the files recursively
4547  gdalGatherFilesInArchiveRecursive(archive_path, files);
4548 
4549  // convert to relative paths inside archive
4550  for (auto& file : files) {
4551  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4552  }
4553 
4554  // done
4555  return files;
4556 }
4557 
4558 /* static */
4559 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4560  const std::string& file_name,
4561  const CopyParams& copy_params) {
4562  // lazy init GDAL
4563  initGDAL();
4564 
4565  // set authorization tokens
4566  setGDALAuthorizationTokens(copy_params);
4567 
4568  // prepare to gather layer info
4569  std::vector<GeoFileLayerInfo> layer_info;
4570 
4571  // open the data set
4572  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4573  if (poDS == nullptr) {
4574  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4575  file_name);
4576  }
4577 
4578  // enumerate the layers
4579  for (auto&& poLayer : poDS->GetLayers()) {
4581  // prepare to read this layer
4582  poLayer->ResetReading();
4583  // skip layer if empty
4584  if (poLayer->GetFeatureCount() > 0) {
4585  // get first feature
4586  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4587  CHECK(first_feature);
4588  // check feature for geometry
4589  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4590  if (!geometry) {
4591  // layer has no geometry
4592  contents = GeoFileLayerContents::NON_GEO;
4593  } else {
4594  // check the geometry type
4595  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4596  switch (wkbFlatten(geometry_type)) {
4597  case wkbPoint:
4598  case wkbLineString:
4599  case wkbPolygon:
4600  case wkbMultiPolygon:
4601  // layer has supported geo
4602  contents = GeoFileLayerContents::GEO;
4603  break;
4604  default:
4605  // layer has unsupported geometry
4607  break;
4608  }
4609  }
4610  }
4611  // store info for this layer
4612  layer_info.emplace_back(poLayer->GetName(), contents);
4613  }
4614 
4615  // done
4616  return layer_info;
4617 }
4618 
4619 /* static */
4621 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4622  return true;
4623 #else
4624  return false;
4625 #endif
4626 }
4627 
4629  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4630  // initial status
4631  bool load_truncated = false;
4633 
4635  if (poDS == nullptr) {
4636  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4637  file_path);
4638  }
4639 
4640  OGRLayer& layer =
4642 
4643  // get the number of features in this layer
4644  size_t numFeatures = layer.GetFeatureCount();
4645 
4646  // build map of metadata field (additional columns) name to index
4647  // use shared_ptr since we need to pass it to the worker
4648  FieldNameToIndexMapType fieldNameToIndexMap;
4649  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4650  CHECK(poFDefn);
4651  size_t numFields = poFDefn->GetFieldCount();
4652  for (size_t iField = 0; iField < numFields; iField++) {
4653  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4654  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4655  }
4656 
4657  // the geographic spatial reference we want to put everything in
4658  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4659  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4660 
4661 #if GDAL_VERSION_MAJOR >= 3
4662  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4663  // this results in X and Y being transposed for angle-based
4664  // coordinate systems. This restores the previous behavior.
4665  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4666 #endif
4667 
4668 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4669  // just one "thread"
4670  size_t max_threads = 1;
4671 #else
4672  // how many threads to use
4673  size_t max_threads = 0;
4674  if (copy_params.threads == 0) {
4675  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4676  } else {
4677  max_threads = copy_params.threads;
4678  }
4679 #endif
4680 
4681  // make an import buffer for each thread
4682  CHECK_EQ(import_buffers_vec.size(), 0u);
4683  import_buffers_vec.resize(max_threads);
4684  for (size_t i = 0; i < max_threads; i++) {
4685  for (const auto cd : loader->get_column_descs()) {
4686  import_buffers_vec[i].emplace_back(
4687  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4688  }
4689  }
4690 
4691  // make render group analyzers for each poly column
4692  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4694  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4695  loader->getTableDesc()->tableId, false, false, false);
4696  for (auto cd : columnDescriptors) {
4697  SQLTypes ct = cd->columnType.get_type();
4698  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4699  auto rga = std::make_shared<RenderGroupAnalyzer>();
4700  rga->seedFromExistingTableContents(loader, cd->columnName);
4701  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4702  }
4703  }
4704  }
4705 
4706 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4707  // threads
4708  std::list<std::future<ImportStatus>> threads;
4709 
4710  // use a stack to track thread_ids which must not overlap among threads
4711  // because thread_id is used to index import_buffers_vec[]
4712  std::stack<size_t> stack_thread_ids;
4713  for (size_t i = 0; i < max_threads; i++) {
4714  stack_thread_ids.push(i);
4715  }
4716 #endif
4717 
4718  // checkpoint the table
4719  auto start_epoch = loader->getTableEpoch();
4720 
4721  // reset the layer
4722  layer.ResetReading();
4723 
4724  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4725 
4726  // make a features buffer for each thread
4727  std::vector<FeaturePtrVector> features(max_threads);
4728 
4729  // for each feature...
4730  size_t firstFeatureThisChunk = 0;
4731  while (firstFeatureThisChunk < numFeatures) {
4732  // how many features this chunk
4733  size_t numFeaturesThisChunk =
4734  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4735 
4736 // get a thread_id not in use
4737 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4738  size_t thread_id = 0;
4739 #else
4740  auto thread_id = stack_thread_ids.top();
4741  stack_thread_ids.pop();
4742  CHECK(thread_id < max_threads);
4743 #endif
4744 
4745  // fill features buffer for new thread
4746  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4747  features[thread_id].emplace_back(layer.GetNextFeature());
4748  }
4749 
4750 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4751  // call worker function directly
4752  auto ret_import_status = import_thread_shapefile(0,
4753  this,
4754  poGeographicSR.get(),
4755  std::move(features[thread_id]),
4756  firstFeatureThisChunk,
4757  numFeaturesThisChunk,
4758  fieldNameToIndexMap,
4759  columnNameToSourceNameMap,
4760  columnIdToRenderGroupAnalyzerMap);
4761  import_status += ret_import_status;
4762  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4763  import_status.rows_completed;
4764  set_import_status(import_id, import_status);
4765 #else
4766  // fire up that thread to import this geometry
4767  threads.push_back(std::async(std::launch::async,
4769  thread_id,
4770  this,
4771  poGeographicSR.get(),
4772  std::move(features[thread_id]),
4773  firstFeatureThisChunk,
4774  numFeaturesThisChunk,
4775  fieldNameToIndexMap,
4776  columnNameToSourceNameMap,
4777  columnIdToRenderGroupAnalyzerMap));
4778 
4779  // let the threads run
4780  while (threads.size() > 0) {
4781  int nready = 0;
4782  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4783  it != threads.end();) {
4784  auto& p = *it;
4785  std::chrono::milliseconds span(
4786  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4787  if (p.wait_for(span) == std::future_status::ready) {
4788  auto ret_import_status = p.get();
4789  import_status += ret_import_status;
4790  import_status.rows_estimated =
4791  ((float)firstFeatureThisChunk / (float)numFeatures) *
4792  import_status.rows_completed;
4793  set_import_status(import_id, import_status);
4794 
4795  // recall thread_id for reuse
4796  stack_thread_ids.push(ret_import_status.thread_id);
4797 
4798  threads.erase(it++);
4799  ++nready;
4800  } else {
4801  ++it;
4802  }
4803  }
4804 
4805  if (nready == 0) {
4806  std::this_thread::yield();
4807  }
4808 
4809  // keep reading if any free thread slot
4810  // this is one of the major difference from old threading model !!
4811  if (threads.size() < max_threads) {
4812  break;
4813  }
4814  }
4815 #endif
4816 
4817  // out of rows?
4818  if (import_status.rows_rejected > copy_params.max_reject) {
4819  load_truncated = true;
4820  load_failed = true;
4821  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4822  break;
4823  }
4824 
4825  // failed?
4826  if (load_failed) {
4827  load_truncated = true;
4828  LOG(ERROR)
4829  << "A call to the Loader::load failed, Please review the logs for more details";
4830  break;
4831  }
4832 
4833  firstFeatureThisChunk += numFeaturesThisChunk;
4834  }
4835 
4836 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4837  // wait for any remaining threads
4838  if (threads.size()) {
4839  for (auto& p : threads) {
4840  // wait for the thread
4841  p.wait();
4842  // get the result and update the final import status
4843  auto ret_import_status = p.get();
4844  import_status += ret_import_status;
4847  }
4848  }
4849 #endif
4850 
4851  checkpoint(start_epoch);
4852 
4853  // must set import_status.load_truncated before closing this end of pipe
4854  // otherwise, the thread on the other end would throw an unwanted 'write()'
4855  // exception
4856  import_status.load_truncated = load_truncated;
4857  return import_status;
4858 }
4859 
4860 //
4861 // class RenderGroupAnalyzer
4862 //
4863 
4865  const std::unique_ptr<Loader>& loader,
4866  const std::string& geoColumnBaseName) {
4867  // start timer
4868  auto seedTimer = timer_start();
4869 
4870  // get the table descriptor
4871  const auto& cat = loader->getCatalog();
4872  const std::string& tableName = loader->getTableDesc()->tableName;
4873  const auto td = cat.getMetadataForTable(tableName);
4874  CHECK(td);
4875  CHECK(td->fragmenter);
4876 
4877  // start with a fresh tree
4878  _rtree = nullptr;
4879  _numRenderGroups = 0;
4880 
4881  // if the table is empty, just make an empty tree
4882  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4884  LOG(INFO) << "DEBUG: Table is empty!";
4885  }
4886  _rtree = std::make_unique<RTree>();
4887  CHECK(_rtree);
4888  return;
4889  }
4890 
4891  // no seeding possible without these two columns
4892  const auto cd_bounds =
4893  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
4894  const auto cd_render_group =
4895  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
4896  if (!cd_bounds || !cd_render_group) {
4897  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4898  " doesn't have bounds or render_group columns!");
4899  }
4900 
4901  // and validate their types
4902  if (cd_bounds->columnType.get_type() != kARRAY ||
4903  cd_bounds->columnType.get_subtype() != kDOUBLE) {
4904  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4905  " bounds column is wrong type!");
4906  }
4907  if (cd_render_group->columnType.get_type() != kINT) {
4908  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4909  " render_group column is wrong type!");
4910  }
4911 
4912  // get chunk accessor table
4913  auto chunkAccessorTable = getChunkAccessorTable(
4914  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
4915  const auto table_count = std::get<0>(chunkAccessorTable.back());
4916 
4918  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
4919  << "'";
4920  }
4921 
4922  std::vector<Node> nodes;
4923  try {
4924  nodes.resize(table_count);
4925  } catch (const std::exception& e) {
4926  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
4927  std::to_string(table_count) + " rows");
4928  }
4929 
4930  for (size_t row = 0; row < table_count; row++) {
4931  ArrayDatum ad;
4932  VarlenDatum vd;
4933  bool is_end;
4934 
4935  // get ChunkIters and fragment row offset
4936  size_t rowOffset = 0;
4937  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
4938  auto& boundsChunkIter = chunkIters[0];
4939  auto& renderGroupChunkIter = chunkIters[1];
4940 
4941  // get bounds values
4942  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
4943  CHECK(!is_end);
4944  CHECK(ad.pointer);
4945  int numBounds = (int)(ad.length / sizeof(double));
4946  CHECK(numBounds == 4);
4947 
4948  // convert to bounding box
4949  double* bounds = reinterpret_cast<double*>(ad.pointer);
4950  BoundingBox bounding_box;
4951  boost::geometry::assign_inverse(bounding_box);
4952  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4953  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
4954 
4955  // get render group
4956  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
4957  CHECK(!is_end);
4958  CHECK(vd.pointer);
4959  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
4960  CHECK_GE(renderGroup, 0);
4961 
4962  // store
4963  nodes[row] = std::make_pair(bounding_box, renderGroup);
4964 
4965  // how many render groups do we have now?
4966  if (renderGroup >= _numRenderGroups) {
4967  _numRenderGroups = renderGroup + 1;
4968  }
4969 
4971  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
4972  }
4973  }
4974 
4975  // bulk-load the tree
4976  auto bulk_load_timer = timer_start();
4977  _rtree = std::make_unique<RTree>(nodes);
4978  CHECK(_rtree);
4979  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
4980  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
4981  << timer_stop(bulk_load_timer) << " ms for tree)";
4982 
4984  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
4985  }
4986 }
4987 
4989  const std::vector<double>& bounds) {
4990  // validate
4991  CHECK(bounds.size() == 4);
4992 
4993  // get bounds
4994  BoundingBox bounding_box;
4995  boost::geometry::assign_inverse(bounding_box);
4996  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4997  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
4998 
4999  // remainder under mutex to allow this to be multi-threaded
5000  std::lock_guard<std::mutex> guard(_rtreeMutex);
5001 
5002  // get the intersecting nodes
5003  std::vector<Node> intersects;
5004  _rtree->query(boost::geometry::index::intersects(bounding_box),
5005  std::back_inserter(intersects));
5006 
5007  // build bitset of render groups of the intersecting rectangles
5008  // clear bit means available, allows use of find_first()
5009  boost::dynamic_bitset<> bits(_numRenderGroups);
5010  bits.set();
5011  for (const auto& intersection : intersects) {
5012  CHECK(intersection.second < _numRenderGroups);
5013  bits.reset(intersection.second);
5014  }
5015 
5016  // find first available group
5017  int firstAvailableRenderGroup = 0;
5018  size_t firstSetBit = bits.find_first();
5019  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5020  // all known groups represented, add a new one
5021  firstAvailableRenderGroup = _numRenderGroups;
5022  _numRenderGroups++;
5023  } else {
5024  firstAvailableRenderGroup = (int)firstSetBit;
5025  }
5026 
5027  // insert new node
5028  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5029 
5030  // return it
5031  return firstAvailableRenderGroup;
5032 }
5033 
5034 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
5035  const Catalog_Namespace::UserMetadata& user,
5036  const ExecutorDeviceType dt)
5037  : QueryRunner(std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, "")) {}
5038 
5039 void ImportDriver::importGeoTable(const std::string& file_path,
5040  const std::string& table_name,
5041  const bool compression,
5042  const bool create_table,
5043  const bool explode_collections) {
5045  const std::string geo_column_name(OMNISCI_GEO_PREFIX);
5046 
5047  CopyParams copy_params;
5048  if (compression) {
5050  copy_params.geo_coords_comp_param = 32;
5051  } else {
5053  copy_params.geo_coords_comp_param = 0;
5054  }
5055  copy_params.geo_assign_render_groups = true;
5056  copy_params.geo_explode_collections = explode_collections;
5057 
5058  auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
5059  std::map<std::string, std::string> colname_to_src;
5060  for (auto& cd : cds) {
5061  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
5062  const auto ret =
5063  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
5064  CHECK(ret.second);
5065  cd.columnName = col_name_sanitized;
5066  }
5067 
5068  auto& cat = session_info_->getCatalog();
5069 
5070  if (create_table) {
5071  const auto td = cat.getMetadataForTable(table_name);
5072  if (td != nullptr) {
5073  throw std::runtime_error("Error: Table " + table_name +
5074  " already exists. Possible failure to correctly re-create "
5075  "mapd_data directory.");
5076  }
5077  if (table_name != ImportHelpers::sanitize_name(table_name)) {
5078  throw std::runtime_error("Invalid characters in table name: " + table_name);
5079  }
5080 
5081  std::string stmt{"CREATE TABLE " + table_name};
5082  std::vector<std::string> col_stmts;
5083 
5084  for (auto& cd : cds) {
5087  throw std::runtime_error(
5088  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
5089  cd.columnName + " (table: " + table_name + ")");
5090  }
5091 
5092  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
5093  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
5094  cd.columnType.set_precision(14);
5095  cd.columnType.set_scale(7);
5096  }
5097  }
5098 
5099  std::string col_stmt;
5100  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
5101 
5103  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
5104  } else {
5105  if (cd.columnType.is_string()) {
5106  col_stmt.append("ENCODING NONE");
5107  } else if (cd.columnType.is_geometry()) {
5108  if (cd.columnType.get_output_srid() == 4326) {
5109  col_stmt.append("ENCODING NONE");
5110  }
5111  }
5112  }
5113  col_stmts.push_back(col_stmt);
5114  }
5115 
5116  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
5117  runDDLStatement(stmt);
5118 
5119  LOG(INFO) << "Created table: " << table_name;
5120  } else {
5121  LOG(INFO) << "Not creating table: " << table_name;
5122  }
5123 
5124  const auto td = cat.getMetadataForTable(table_name);
5125  if (td == nullptr) {
5126  throw std::runtime_error("Error: Failed to create table " + table_name);
5127  }
5128 
5129  Importer_NS::Importer importer(cat, td, file_path, copy_params);
5130  auto ms = measure<>::execution([&]() { importer.importGDAL(colname_to_src); });
5131  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
5132 }
5133 
5134 } // namespace Importer_NS
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
int8_t tinyintval
Definition: sqltypes.h:126
std::unique_ptr< Loader > loader
Definition: Importer.h:833
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4559
void addSmallint(const int16_t v)
Definition: Importer.h:229
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4087
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4284
#define CHECK_EQ(x, y)
Definition: Logger.h:198
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE int get_output_srid() const
Definition: sqltypes.h:332
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2433
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:208
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:516
#define NULL_DOUBLE
Definition: sqltypes.h:179
std::chrono::duration< double > timeout
Definition: Importer.h:709
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3039
std::vector< size_t > file_offsets
Definition: Importer.h:659
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3542
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
Definition: Importer.cpp:5039
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:584
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor * > &col_descs)
Definition: Importer.cpp:1653
void addGeoString(const std::string &v)
Definition: Importer.h:241
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:518
const int8_t const int64_t * num_rows
std::vector< EncodingType > best_encodings
Definition: Importer.h:678
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::string null_str
Definition: CopyParams.h:47
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2735
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
SQLTypes
Definition: sqltypes.h:41
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
void addTinyint(const int8_t v)
Definition: Importer.h:227
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:842
static const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const Importer_NS::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:77
std::string error_context(const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
Definition: ArrowImporter.h:75
virtual void checkpoint()
Definition: Importer.cpp:4000
ExecutorDeviceType
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2856
std::vector< float > * float_buffer_
Definition: Importer.h:519
std::mutex loader_mutex_
Definition: Importer.h:606
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:187
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2386
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
#define LOG(tag)
Definition: Logger.h:185
bool boolval
Definition: sqltypes.h:125
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:4988
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
const CopyParams & get_copy_params() const
Definition: Importer.h:749
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4628
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:594
std::chrono::steady_clock::time_point end
Definition: Importer.h:611
~Importer() override
Definition: Importer.cpp:196
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
bool try_cast(const std::string &str)
Definition: Importer.cpp:2835
std::string join(T const &container, std::string const &delim)
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1422
void set_input_srid(int d)
Definition: sqltypes.h:420
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:431
#define UNREACHABLE()
Definition: Logger.h:234
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
#define CHECK_GE(x, y)
Definition: Logger.h:203
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:595
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2379
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:834
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2125
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:416
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
void set_fixed_size()
Definition: sqltypes.h:425
int8_t * getAsBytes() const
Definition: Importer.h:311
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:726
void addInt(const int32_t v)
Definition: Importer.h:231
auto value_getter(const Array &array, const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:305
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
void set_scale(int s)
Definition: sqltypes.h:421
std::chrono::steady_clock::time_point start
Definition: Importer.h:610
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2581
#define CHECK_GT(x, y)
Definition: Logger.h:202
void set_compression(EncodingType c)
Definition: sqltypes.h:426
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
int32_t intval
Definition: sqltypes.h:128
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:220
std::string sourceName
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:350
std::string to_string(char const *&&v)
std::map< int, const ColumnDescriptor * > columnDescriptors
Definition: Fragmenter.h:69
void set_output_srid(int s)
Definition: sqltypes.h:422
int8_t * pointer
Definition: sqltypes.h:75
bool is_number() const
Definition: sqltypes.h:482
std::string line1
Definition: Importer.h:710
std::string get_type_name() const
Definition: sqltypes.h:429
EncodingType geo_coords_encoding
Definition: CopyParams.h:73
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4462
#define DEBUG_TIMING
Definition: Importer.cpp:138
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4334
void set_replicate_count(const int64_t replicate_count)
Definition: Importer.h:497
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2844
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:268
void addBoolean(const int8_t v)
Definition: Importer.h:225
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
float floatval
Definition: sqltypes.h:130
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed)=0
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1687
virtual void setTableEpoch(const int32_t new_epoch)
Definition: Importer.cpp:4012
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:232
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:615
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:417
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
void set_precision(int d)
Definition: sqltypes.h:419
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:515
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:794
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4190
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3090
void addFloat(const float v)
Definition: Importer.h:235
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
Definition: Importer.cpp:4864
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
CHECK(cgen_state)
std::vector< std::string > * string_buffer_
Definition: Importer.h:521
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:727
std::string s3_access_key
Definition: CopyParams.h:62
bool is_time() const
Definition: sqltypes.h:483
static bool hasGDALLibKML()
Definition: