OmniSciDB  0bd2ec9cf4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "Import/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <unistd.h>
30 
31 #include <boost/algorithm/string.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/filesystem.hpp>
34 #include <boost/variant.hpp>
35 #include <csignal>
36 #include <cstdio>
37 #include <cstdlib>
38 #include <fstream>
39 #include <future>
40 #include <iomanip>
41 #include <list>
42 #include <memory>
43 #include <mutex>
44 #include <numeric>
45 #include <stack>
46 #include <stdexcept>
47 #include <thread>
48 #include <typeinfo>
49 #include <unordered_map>
50 #include <unordered_set>
51 #include <utility>
52 #include <vector>
53 
54 #include "../Archive/PosixFileArchive.h"
55 #include "../Archive/S3Archive.h"
56 #include "../QueryEngine/TypePunning.h"
57 #include "../Shared/geo_compression.h"
58 #include "../Shared/geo_types.h"
59 #include "../Shared/geosupport.h"
60 #include "../Shared/import_helpers.h"
61 #include "../Shared/mapd_glob.h"
62 #include "../Shared/mapdpath.h"
63 #include "../Shared/measure.h"
64 #include "../Shared/scope.h"
65 #include "../Shared/shard_key.h"
66 #include "../Shared/thread_count.h"
67 #include "ArrowImporter.h"
70 #include "Shared/Logger.h"
71 #include "Shared/SqlTypesLayout.h"
73 #include "gen-cpp/MapD.h"
74 
75 size_t g_archive_read_buf_size = 1 << 20;
76 
77 inline auto get_filesize(const std::string& file_path) {
78  boost::filesystem::path boost_file_path{file_path};
80  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
81  return ec ? 0 : filesize;
82 }
83 
84 namespace {
85 
87  void operator()(OGRDataSource* datasource) {
88  if (datasource) {
89  GDALClose(datasource);
90  }
91  }
92 };
93 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
94 
96  void operator()(OGRFeature* feature) {
97  if (feature) {
98  OGRFeature::DestroyFeature(feature);
99  }
100  }
101 };
102 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
103 
105  void operator()(OGRSpatialReference* ref) {
106  if (ref) {
107  OGRSpatialReference::DestroySpatialReference(ref);
108  }
109  }
110 };
112  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
113 
114 } // namespace
115 
116 // For logging std::vector<std::string> row.
117 namespace boost {
118 namespace log {
119 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
120  out << '[';
121  for (size_t i = 0; i < row.size(); ++i) {
122  out << (i ? ", " : "") << row[i];
123  }
124  out << ']';
125  return out;
126 }
127 } // namespace log
128 } // namespace boost
129 
130 namespace Importer_NS {
131 
132 using FieldNameToIndexMapType = std::map<std::string, size_t>;
133 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
135  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
136 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
137 
138 #define DEBUG_TIMING false
139 #define DEBUG_RENDER_GROUP_ANALYZER 0
140 #define DEBUG_AWS_AUTHENTICATION 0
141 
142 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
143 
144 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
145 
147 static std::map<std::string, ImportStatus> import_status_map;
148 
150  const TableDescriptor* t,
151  const std::string& f,
152  const CopyParams& p)
153  : Importer(new Loader(c, t), f, p) {}
154 
155 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
156  : DataStreamSink(p, f), loader(providedLoader) {
157  import_id = boost::filesystem::path(file_path).filename().string();
158  file_size = 0;
159  max_threads = 0;
160  p_file = nullptr;
161  buffer[0] = nullptr;
162  buffer[1] = nullptr;
163  // we may be overallocating a little more memory here due to dropping phy cols.
164  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
165  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
166  int i = 0;
167  bool has_array = false;
168  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
169  int skip_physical_cols = 0;
170  for (auto& p : loader->get_column_descs()) {
171  // phy geo columns can't be in input file
172  if (skip_physical_cols-- > 0) {
173  continue;
174  }
175  // neither are rowid or $deleted$
176  // note: columns can be added after rowid/$deleted$
177  if (p->isVirtualCol || p->isDeletedCol) {
178  continue;
179  }
180  skip_physical_cols = p->columnType.get_physical_cols();
181  if (p->columnType.get_type() == kARRAY) {
182  is_array.get()[i] = true;
183  has_array = true;
184  } else {
185  is_array.get()[i] = false;
186  }
187  ++i;
188  }
189  if (has_array) {
190  is_array_a = std::unique_ptr<bool[]>(is_array.release());
191  } else {
192  is_array_a = std::unique_ptr<bool[]>(nullptr);
193  }
194 }
195 
197  if (p_file != nullptr) {
198  fclose(p_file);
199  }
200  if (buffer[0] != nullptr) {
201  free(buffer[0]);
202  }
203  if (buffer[1] != nullptr) {
204  free(buffer[1]);
205  }
206 }
207 
208 ImportStatus Importer::get_import_status(const std::string& import_id) {
209  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
210  return import_status_map.at(import_id);
211 }
212 
213 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
214  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
215  is.end = std::chrono::steady_clock::now();
216  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
218 }
219 
220 static const std::string trim_space(const char* field, const size_t len) {
221  size_t i = 0;
222  size_t j = len;
223  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
224  i++;
225  }
226  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
227  j--;
228  }
229  return std::string(field + i, j - i);
230 }
231 
232 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti) {
233  switch (ti.get_type()) {
234  case kBOOLEAN:
235  *(bool*)buf = d.boolval;
236  return buf + sizeof(bool);
237  case kNUMERIC:
238  case kDECIMAL:
239  case kBIGINT:
240  *(int64_t*)buf = d.bigintval;
241  return buf + sizeof(int64_t);
242  case kINT:
243  *(int32_t*)buf = d.intval;
244  return buf + sizeof(int32_t);
245  case kSMALLINT:
246  *(int16_t*)buf = d.smallintval;
247  return buf + sizeof(int16_t);
248  case kTINYINT:
249  *(int8_t*)buf = d.tinyintval;
250  return buf + sizeof(int8_t);
251  case kFLOAT:
252  *(float*)buf = d.floatval;
253  return buf + sizeof(float);
254  case kDOUBLE:
255  *(double*)buf = d.doubleval;
256  return buf + sizeof(double);
257  case kTIME:
258  case kTIMESTAMP:
259  case kDATE:
260  *reinterpret_cast<int64_t*>(buf) = d.bigintval;
261  return buf + sizeof(int64_t);
262  default:
263  return NULL;
264  }
265  return NULL;
266 }
267 
269  Datum d;
270  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
271  switch (type) {
272  case kBOOLEAN:
274  break;
275  case kBIGINT:
277  break;
278  case kINT:
280  break;
281  case kSMALLINT:
283  break;
284  case kTINYINT:
286  break;
287  case kFLOAT:
288  d.floatval = NULL_FLOAT;
289  break;
290  case kDOUBLE:
292  break;
293  case kTIME:
294  case kTIMESTAMP:
295  case kDATE:
297  break;
298  case kPOINT:
299  case kLINESTRING:
300  case kPOLYGON:
301  case kMULTIPOLYGON:
302  throw std::runtime_error("Internal error: geometry type in NullDatum.");
303  default:
304  throw std::runtime_error("Internal error: invalid type in NullDatum.");
305  }
306  return d;
307 }
308 
310  Datum d;
311  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
312  switch (type) {
313  case kBOOLEAN:
315  break;
316  case kBIGINT:
318  break;
319  case kINT:
321  break;
322  case kSMALLINT:
324  break;
325  case kTINYINT:
327  break;
328  case kFLOAT:
330  break;
331  case kDOUBLE:
333  break;
334  case kTIME:
335  case kTIMESTAMP:
336  case kDATE:
338  break;
339  case kPOINT:
340  case kLINESTRING:
341  case kPOLYGON:
342  case kMULTIPOLYGON:
343  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
344  default:
345  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
346  }
347  return d;
348 }
349 
350 ArrayDatum StringToArray(const std::string& s,
351  const SQLTypeInfo& ti,
352  const CopyParams& copy_params) {
353  SQLTypeInfo elem_ti = ti.get_elem_type();
354  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
355  return ArrayDatum(0, NULL, true);
356  }
357  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
358  LOG(WARNING) << "Malformed array: " << s;
359  return ArrayDatum(0, NULL, true);
360  }
361  std::vector<std::string> elem_strs;
362  size_t last = 1;
363  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
364  i = s.find(copy_params.array_delim, last)) {
365  elem_strs.push_back(s.substr(last, i - last));
366  last = i + 1;
367  }
368  if (last + 1 <= s.size()) {
369  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
370  }
371  if (elem_strs.size() == 1) {
372  auto str = elem_strs.front();
373  auto str_trimmed = trim_space(str.c_str(), str.length());
374  if (str_trimmed == "") {
375  elem_strs.clear(); // Empty array
376  }
377  }
378  if (!elem_ti.is_string()) {
379  size_t len = elem_strs.size() * elem_ti.get_size();
380  int8_t* buf = (int8_t*)checked_malloc(len);
381  int8_t* p = buf;
382  for (auto& es : elem_strs) {
383  auto e = trim_space(es.c_str(), es.length());
384  bool is_null = (e == copy_params.null_str) || e == "NULL";
385  if (!elem_ti.is_string() && e == "") {
386  is_null = true;
387  }
388  if (elem_ti.is_number() || elem_ti.is_time()) {
389  if (!isdigit(e[0]) && e[0] != '-') {
390  is_null = true;
391  }
392  }
393  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
394  p = appendDatum(p, d, elem_ti);
395  }
396  return ArrayDatum(len, buf, false);
397  }
398  // must not be called for array of strings
399  CHECK(false);
400  return ArrayDatum(0, NULL, true);
401 }
402 
404  SQLTypeInfo elem_ti = ti.get_elem_type();
405  auto len = ti.get_size();
406 
407  if (elem_ti.is_string()) {
408  // must not be called for array of strings
409  CHECK(false);
410  return ArrayDatum(0, NULL, true);
411  }
412 
413  if (len > 0) {
414  // Compose a NULL fixlen array
415  int8_t* buf = (int8_t*)checked_malloc(len);
416  // First scalar is a NULL_ARRAY sentinel
417  Datum d = NullArrayDatum(elem_ti);
418  int8_t* p = appendDatum(buf, d, elem_ti);
419  // Rest is filled with normal NULL sentinels
420  Datum d0 = NullDatum(elem_ti);
421  while ((p - buf) < len) {
422  p = appendDatum(p, d0, elem_ti);
423  }
424  CHECK((p - buf) == len);
425  return ArrayDatum(len, buf, true);
426  }
427  // NULL varlen array
428  return ArrayDatum(0, NULL, true);
429 }
430 
432  return NullArray(ti);
433 }
434 
435 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
436  const auto& arr = datum.val.arr_val;
437  for (const auto& elem_datum : arr) {
438  string_vec.push_back(elem_datum.val.str_val);
439  }
440 }
441 
442 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
443  Datum d;
444  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
445  switch (type) {
446  case kBOOLEAN:
447  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
448  break;
449  case kBIGINT:
450  d.bigintval =
451  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
452  break;
453  case kINT:
454  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
455  break;
456  case kSMALLINT:
457  d.smallintval =
458  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
459  break;
460  case kTINYINT:
461  d.tinyintval =
462  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
463  break;
464  case kFLOAT:
465  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
466  break;
467  case kDOUBLE:
468  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
469  break;
470  case kTIME:
471  case kTIMESTAMP:
472  case kDATE:
473  d.bigintval =
474  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
475  break;
476  case kPOINT:
477  case kLINESTRING:
478  case kPOLYGON:
479  case kMULTIPOLYGON:
480  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
481  default:
482  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
483  }
484  return d;
485 }
486 
487 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
488  SQLTypeInfo elem_ti = ti.get_elem_type();
489 
490  CHECK(!elem_ti.is_string());
491 
492  if (datum.is_null) {
493  return NullArray(ti);
494  }
495 
496  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
497  int8_t* buf = (int8_t*)checked_malloc(len);
498  int8_t* p = buf;
499  for (auto& e : datum.val.arr_val) {
500  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
501  }
502 
503  return ArrayDatum(len, buf, false);
504 }
505 
507  const std::string& val,
508  const bool is_null,
509  const CopyParams& copy_params,
510  const int64_t replicate_count) {
511  set_replicate_count(replicate_count);
512  const auto type = cd->columnType.get_type();
513  switch (type) {
514  case kBOOLEAN: {
515  if (is_null) {
516  if (cd->columnType.get_notnull()) {
517  throw std::runtime_error("NULL for column " + cd->columnName);
518  }
520  } else {
521  SQLTypeInfo ti = cd->columnType;
522  Datum d = StringToDatum(val, ti);
523  addBoolean((int8_t)d.boolval);
524  }
525  break;
526  }
527  case kTINYINT: {
528  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
529  addTinyint(std::stoi(val));
530  } else {
531  if (cd->columnType.get_notnull()) {
532  throw std::runtime_error("NULL for column " + cd->columnName);
533  }
535  }
536  break;
537  }
538  case kSMALLINT: {
539  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
540  addSmallint(std::stoi(val));
541  } else {
542  if (cd->columnType.get_notnull()) {
543  throw std::runtime_error("NULL for column " + cd->columnName);
544  }
546  }
547  break;
548  }
549  case kINT: {
550  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
551  addInt(std::stoi(val));
552  } else {
553  if (cd->columnType.get_notnull()) {
554  throw std::runtime_error("NULL for column " + cd->columnName);
555  }
557  }
558  break;
559  }
560  case kBIGINT: {
561  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
562  addBigint(std::stoll(val));
563  } else {
564  if (cd->columnType.get_notnull()) {
565  throw std::runtime_error("NULL for column " + cd->columnName);
566  }
568  }
569  break;
570  }
571  case kDECIMAL:
572  case kNUMERIC: {
573  if (!is_null) {
574  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
575  Datum d = StringToDatum(val, ti);
576  const auto converted_decimal_value =
578  addBigint(converted_decimal_value);
579  } else {
580  if (cd->columnType.get_notnull()) {
581  throw std::runtime_error("NULL for column " + cd->columnName);
582  }
584  }
585  break;
586  }
587  case kFLOAT:
588  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
589  addFloat((float)std::atof(val.c_str()));
590  } else {
591  if (cd->columnType.get_notnull()) {
592  throw std::runtime_error("NULL for column " + cd->columnName);
593  }
595  }
596  break;
597  case kDOUBLE:
598  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
599  addDouble(std::atof(val.c_str()));
600  } else {
601  if (cd->columnType.get_notnull()) {
602  throw std::runtime_error("NULL for column " + cd->columnName);
603  }
605  }
606  break;
607  case kTEXT:
608  case kVARCHAR:
609  case kCHAR: {
610  // @TODO(wei) for now, use empty string for nulls
611  if (is_null) {
612  if (cd->columnType.get_notnull()) {
613  throw std::runtime_error("NULL for column " + cd->columnName);
614  }
615  addString(std::string());
616  } else {
617  if (val.length() > StringDictionary::MAX_STRLEN) {
618  throw std::runtime_error("String too long for column " + cd->columnName +
619  " was " + std::to_string(val.length()) + " max is " +
621  }
622  addString(val);
623  }
624  break;
625  }
626  case kTIME:
627  case kTIMESTAMP:
628  case kDATE:
629  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
630  SQLTypeInfo ti = cd->columnType;
631  Datum d = StringToDatum(val, ti);
632  addBigint(d.bigintval);
633  } else {
634  if (cd->columnType.get_notnull()) {
635  throw std::runtime_error("NULL for column " + cd->columnName);
636  }
638  }
639  break;
640  case kARRAY: {
641  if (is_null && cd->columnType.get_notnull()) {
642  throw std::runtime_error("NULL for column " + cd->columnName);
643  }
644  SQLTypeInfo ti = cd->columnType;
645  if (IS_STRING(ti.get_subtype())) {
646  std::vector<std::string> string_vec;
647  // Just parse string array, don't push it to buffer yet as we might throw
648  Importer_NS::DelimitedParserUtils::parseStringArray(val, copy_params, string_vec);
649  if (!is_null) {
650  // TODO: add support for NULL string arrays
651  if (ti.get_size() > 0) {
652  auto sti = ti.get_elem_type();
653  size_t expected_size = ti.get_size() / sti.get_size();
654  size_t actual_size = string_vec.size();
655  if (actual_size != expected_size) {
656  throw std::runtime_error("Fixed length array column " + cd->columnName +
657  " expects " + std::to_string(expected_size) +
658  " values, received " +
659  std::to_string(actual_size));
660  }
661  }
662  addStringArray(string_vec);
663  } else {
664  if (ti.get_size() > 0) {
665  // TODO: remove once NULL fixlen arrays are allowed
666  throw std::runtime_error("Fixed length array column " + cd->columnName +
667  " currently cannot accept NULL arrays");
668  }
669  // TODO: add support for NULL string arrays, replace with addStringArray(),
670  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
671  addStringArray(string_vec);
672  }
673  } else {
674  if (!is_null) {
675  ArrayDatum d = StringToArray(val, ti, copy_params);
676  if (d.is_null) { // val could be "NULL"
677  addArray(NullArray(ti));
678  } else {
679  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
680  throw std::runtime_error("Fixed length array for column " + cd->columnName +
681  " has incorrect length: " + val);
682  }
683  addArray(d);
684  }
685  } else {
686  addArray(NullArray(ti));
687  }
688  }
689  break;
690  }
691  case kPOINT:
692  case kLINESTRING:
693  case kPOLYGON:
694  case kMULTIPOLYGON:
695  addGeoString(val);
696  break;
697  default:
698  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
699  }
700 }
701 
703  const auto type = column_desc_->columnType.is_decimal()
706  switch (type) {
707  case kBOOLEAN:
708  bool_buffer_->pop_back();
709  break;
710  case kTINYINT:
711  tinyint_buffer_->pop_back();
712  break;
713  case kSMALLINT:
714  smallint_buffer_->pop_back();
715  break;
716  case kINT:
717  int_buffer_->pop_back();
718  break;
719  case kBIGINT:
720  bigint_buffer_->pop_back();
721  break;
722  case kFLOAT:
723  float_buffer_->pop_back();
724  break;
725  case kDOUBLE:
726  double_buffer_->pop_back();
727  break;
728  case kTEXT:
729  case kVARCHAR:
730  case kCHAR:
731  string_buffer_->pop_back();
732  break;
733  case kDATE:
734  case kTIME:
735  case kTIMESTAMP:
736  bigint_buffer_->pop_back();
737  break;
738  case kARRAY:
740  string_array_buffer_->pop_back();
741  } else {
742  array_buffer_->pop_back();
743  }
744  break;
745  case kPOINT:
746  case kLINESTRING:
747  case kPOLYGON:
748  case kMULTIPOLYGON:
749  geo_string_buffer_->pop_back();
750  break;
751  default:
752  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
753  }
754 }
755 
756 struct GeoImportException : std::runtime_error {
757  using std::runtime_error::runtime_error;
758 };
759 
760 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
761 template <typename DATA_TYPE>
763  const ColumnDescriptor* cd,
764  const Array& array,
765  std::vector<DATA_TYPE>& buffer,
766  const ArraySliceRange& slice_range,
767  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
768  auto data =
769  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
770  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
771  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
772  if (bad_rows_tracker && cd->columnType.is_geometry()) {
773  f_add_geo_phy_cols = [&](const int64_t row) {
774  // Populate physical columns (ref. MapDHandler::load_table)
775  std::vector<double> coords, bounds;
776  std::vector<int> ring_sizes, poly_rings;
777  int render_group = 0;
778  SQLTypeInfo ti;
779  // replace any unexpected exception from getGeoColumns or other
780  // on this path with a GeoImportException so that we wont over
781  // push a null to the logical column...
782  try {
783  arrow_throw_if<GeoImportException>(
784  array.IsNull(row) ||
786  ti,
787  coords,
788  bounds,
789  ring_sizes,
790  poly_rings,
791  false),
792  error_context(cd, bad_rows_tracker) + "Invalid geometry");
793  arrow_throw_if<GeoImportException>(
794  cd->columnType.get_type() != ti.get_type(),
795  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
796  auto col_idx_workpad = col_idx; // what a pitfall!!
798  bad_rows_tracker->importer->getCatalog(),
799  cd,
801  col_idx_workpad,
802  coords,
803  bounds,
804  ring_sizes,
805  poly_rings,
806  render_group);
807  } catch (GeoImportException&) {
808  throw;
809  } catch (std::runtime_error& e) {
810  throw GeoImportException(e.what());
811  } catch (const std::exception& e) {
812  throw GeoImportException(e.what());
813  } catch (...) {
814  throw GeoImportException("unknown exception");
815  }
816  };
817  }
818  auto f_mark_a_bad_row = [&](const auto row) {
819  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
820  bad_rows_tracker->rows.insert(row - slice_range.first);
821  };
822  buffer.reserve(slice_range.second - slice_range.first);
823  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
824  try {
825  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
826  f_add_geo_phy_cols(row);
827  } catch (GeoImportException&) {
828  f_mark_a_bad_row(row);
829  } catch (ArrowImporterException&) {
830  // trace bad rows of each column; otherwise rethrow.
831  if (bad_rows_tracker) {
832  *data << nullptr;
833  f_mark_a_bad_row(row);
834  } else {
835  throw;
836  }
837  }
838  }
839  return buffer.size();
840 }
841 
843  const Array& col,
844  const bool exact_type_match,
845  const ArraySliceRange& slice_range,
846  BadRowsTracker* const bad_rows_tracker) {
847  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
848  : cd->columnType.get_type();
849  if (cd->columnType.get_notnull()) {
850  // We can't have any null values for this column; to have them is an error
851  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
852  }
853 
854  switch (type) {
855  case kBOOLEAN:
856  if (exact_type_match) {
857  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
858  }
860  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
861  case kTINYINT:
862  if (exact_type_match) {
863  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
864  }
866  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
867  case kSMALLINT:
868  if (exact_type_match) {
869  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
870  }
872  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
873  case kINT:
874  if (exact_type_match) {
875  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
876  }
878  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
879  case kBIGINT:
880  if (exact_type_match) {
881  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
882  }
884  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
885  case kFLOAT:
886  if (exact_type_match) {
887  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
888  }
890  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
891  case kDOUBLE:
892  if (exact_type_match) {
893  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
894  }
896  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
897  case kTEXT:
898  case kVARCHAR:
899  case kCHAR:
900  if (exact_type_match) {
901  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
902  "Expected string type");
903  }
905  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
906  case kTIME:
907  if (exact_type_match) {
908  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
909  "Expected time32 or time64 type");
910  }
912  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
913  case kTIMESTAMP:
914  if (exact_type_match) {
915  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
916  }
918  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
919  case kDATE:
920  if (exact_type_match) {
921  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
922  "Expected date32 or date64 type");
923  }
925  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
926  case kPOINT:
927  case kLINESTRING:
928  case kPOLYGON:
929  case kMULTIPOLYGON:
930  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
931  "Expected string type");
933  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
934  case kARRAY:
935  throw std::runtime_error("Arrow array appends not yet supported");
936  default:
937  throw std::runtime_error("Invalid Type");
938  }
939 }
940 
941 // this is exclusively used by load_table_binary_columnar
942 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
943  size_t dataSize = 0;
944  if (cd->columnType.get_notnull()) {
945  // We can't have any null values for this column; to have them is an error
946  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
947  throw std::runtime_error("NULL for column " + cd->columnName);
948  }
949  }
950 
951  switch (cd->columnType.get_type()) {
952  case kBOOLEAN: {
953  dataSize = col.data.int_col.size();
954  bool_buffer_->reserve(dataSize);
955  for (size_t i = 0; i < dataSize; i++) {
956  if (col.nulls[i]) {
958  } else {
959  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
960  }
961  }
962  break;
963  }
964  case kTINYINT: {
965  dataSize = col.data.int_col.size();
966  tinyint_buffer_->reserve(dataSize);
967  for (size_t i = 0; i < dataSize; i++) {
968  if (col.nulls[i]) {
970  } else {
971  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
972  }
973  }
974  break;
975  }
976  case kSMALLINT: {
977  dataSize = col.data.int_col.size();
978  smallint_buffer_->reserve(dataSize);
979  for (size_t i = 0; i < dataSize; i++) {
980  if (col.nulls[i]) {
982  } else {
983  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
984  }
985  }
986  break;
987  }
988  case kINT: {
989  dataSize = col.data.int_col.size();
990  int_buffer_->reserve(dataSize);
991  for (size_t i = 0; i < dataSize; i++) {
992  if (col.nulls[i]) {
994  } else {
995  int_buffer_->push_back((int32_t)col.data.int_col[i]);
996  }
997  }
998  break;
999  }
1000  case kBIGINT:
1001  case kNUMERIC:
1002  case kDECIMAL: {
1003  dataSize = col.data.int_col.size();
1004  bigint_buffer_->reserve(dataSize);
1005  for (size_t i = 0; i < dataSize; i++) {
1006  if (col.nulls[i]) {
1008  } else {
1009  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1010  }
1011  }
1012  break;
1013  }
1014  case kFLOAT: {
1015  dataSize = col.data.real_col.size();
1016  float_buffer_->reserve(dataSize);
1017  for (size_t i = 0; i < dataSize; i++) {
1018  if (col.nulls[i]) {
1019  float_buffer_->push_back(NULL_FLOAT);
1020  } else {
1021  float_buffer_->push_back((float)col.data.real_col[i]);
1022  }
1023  }
1024  break;
1025  }
1026  case kDOUBLE: {
1027  dataSize = col.data.real_col.size();
1028  double_buffer_->reserve(dataSize);
1029  for (size_t i = 0; i < dataSize; i++) {
1030  if (col.nulls[i]) {
1031  double_buffer_->push_back(NULL_DOUBLE);
1032  } else {
1033  double_buffer_->push_back((double)col.data.real_col[i]);
1034  }
1035  }
1036  break;
1037  }
1038  case kTEXT:
1039  case kVARCHAR:
1040  case kCHAR: {
1041  // TODO: for now, use empty string for nulls
1042  dataSize = col.data.str_col.size();
1043  string_buffer_->reserve(dataSize);
1044  for (size_t i = 0; i < dataSize; i++) {
1045  if (col.nulls[i]) {
1046  string_buffer_->push_back(std::string());
1047  } else {
1048  string_buffer_->push_back(col.data.str_col[i]);
1049  }
1050  }
1051  break;
1052  }
1053  case kTIME:
1054  case kTIMESTAMP:
1055  case kDATE: {
1056  dataSize = col.data.int_col.size();
1057  bigint_buffer_->reserve(dataSize);
1058  for (size_t i = 0; i < dataSize; i++) {
1059  if (col.nulls[i]) {
1061  } else {
1062  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1063  }
1064  }
1065  break;
1066  }
1067  case kPOINT:
1068  case kLINESTRING:
1069  case kPOLYGON:
1070  case kMULTIPOLYGON: {
1071  dataSize = col.data.str_col.size();
1072  geo_string_buffer_->reserve(dataSize);
1073  for (size_t i = 0; i < dataSize; i++) {
1074  if (col.nulls[i]) {
1075  // TODO: add support for NULL geo
1076  geo_string_buffer_->push_back(std::string());
1077  } else {
1078  geo_string_buffer_->push_back(col.data.str_col[i]);
1079  }
1080  }
1081  break;
1082  }
1083  case kARRAY: {
1084  dataSize = col.data.arr_col.size();
1085  if (IS_STRING(cd->columnType.get_subtype())) {
1086  for (size_t i = 0; i < dataSize; i++) {
1087  std::vector<std::string>& string_vec = addStringArray();
1088  if (!col.nulls[i]) {
1089  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1090  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1091  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1092  }
1093  }
1094  }
1095  } else {
1096  auto elem_ti = cd->columnType.get_subtype();
1097  switch (elem_ti) {
1098  case kBOOLEAN: {
1099  for (size_t i = 0; i < dataSize; i++) {
1100  if (col.nulls[i]) {
1102  } else {
1103  size_t len = col.data.arr_col[i].data.int_col.size();
1104  size_t byteSize = len * sizeof(int8_t);
1105  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1106  int8_t* p = buf;
1107  for (size_t j = 0; j < len; ++j) {
1108  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1109  p += sizeof(bool);
1110  }
1111  addArray(ArrayDatum(byteSize, buf, false));
1112  }
1113  }
1114  break;
1115  }
1116  case kTINYINT: {
1117  for (size_t i = 0; i < dataSize; i++) {
1118  if (col.nulls[i]) {
1120  } else {
1121  size_t len = col.data.arr_col[i].data.int_col.size();
1122  size_t byteSize = len * sizeof(int8_t);
1123  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1124  int8_t* p = buf;
1125  for (size_t j = 0; j < len; ++j) {
1126  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1127  p += sizeof(int8_t);
1128  }
1129  addArray(ArrayDatum(byteSize, buf, false));
1130  }
1131  }
1132  break;
1133  }
1134  case kSMALLINT: {
1135  for (size_t i = 0; i < dataSize; i++) {
1136  if (col.nulls[i]) {
1138  } else {
1139  size_t len = col.data.arr_col[i].data.int_col.size();
1140  size_t byteSize = len * sizeof(int16_t);
1141  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1142  int8_t* p = buf;
1143  for (size_t j = 0; j < len; ++j) {
1144  *(int16_t*)p =
1145  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1146  p += sizeof(int16_t);
1147  }
1148  addArray(ArrayDatum(byteSize, buf, false));
1149  }
1150  }
1151  break;
1152  }
1153  case kINT: {
1154  for (size_t i = 0; i < dataSize; i++) {
1155  if (col.nulls[i]) {
1157  } else {
1158  size_t len = col.data.arr_col[i].data.int_col.size();
1159  size_t byteSize = len * sizeof(int32_t);
1160  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1161  int8_t* p = buf;
1162  for (size_t j = 0; j < len; ++j) {
1163  *(int32_t*)p =
1164  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1165  p += sizeof(int32_t);
1166  }
1167  addArray(ArrayDatum(byteSize, buf, false));
1168  }
1169  }
1170  break;
1171  }
1172  case kBIGINT:
1173  case kNUMERIC:
1174  case kDECIMAL: {
1175  for (size_t i = 0; i < dataSize; i++) {
1176  if (col.nulls[i]) {
1178  } else {
1179  size_t len = col.data.arr_col[i].data.int_col.size();
1180  size_t byteSize = len * sizeof(int64_t);
1181  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1182  int8_t* p = buf;
1183  for (size_t j = 0; j < len; ++j) {
1184  *(int64_t*)p =
1185  static_cast<int64_t>(col.data.arr_col[j].data.int_col[j]);
1186  p += sizeof(int64_t);
1187  }
1188  addArray(ArrayDatum(byteSize, buf, false));
1189  }
1190  }
1191  break;
1192  }
1193  case kFLOAT: {
1194  for (size_t i = 0; i < dataSize; i++) {
1195  if (col.nulls[i]) {
1197  } else {
1198  size_t len = col.data.arr_col[i].data.real_col.size();
1199  size_t byteSize = len * sizeof(float);
1200  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1201  int8_t* p = buf;
1202  for (size_t j = 0; j < len; ++j) {
1203  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1204  p += sizeof(float);
1205  }
1206  addArray(ArrayDatum(byteSize, buf, false));
1207  }
1208  }
1209  break;
1210  }
1211  case kDOUBLE: {
1212  for (size_t i = 0; i < dataSize; i++) {
1213  if (col.nulls[i]) {
1215  } else {
1216  size_t len = col.data.arr_col[i].data.real_col.size();
1217  size_t byteSize = len * sizeof(double);
1218  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1219  int8_t* p = buf;
1220  for (size_t j = 0; j < len; ++j) {
1221  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1222  p += sizeof(double);
1223  }
1224  addArray(ArrayDatum(byteSize, buf, false));
1225  }
1226  }
1227  break;
1228  }
1229  case kTIME:
1230  case kTIMESTAMP:
1231  case kDATE: {
1232  for (size_t i = 0; i < dataSize; i++) {
1233  if (col.nulls[i]) {
1235  } else {
1236  size_t len = col.data.arr_col[i].data.int_col.size();
1237  size_t byteWidth = sizeof(int64_t);
1238  size_t byteSize = len * byteWidth;
1239  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1240  int8_t* p = buf;
1241  for (size_t j = 0; j < len; ++j) {
1242  *reinterpret_cast<int64_t*>(p) =
1243  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1244  p += sizeof(int64_t);
1245  }
1246  addArray(ArrayDatum(byteSize, buf, false));
1247  }
1248  }
1249  break;
1250  }
1251  default:
1252  throw std::runtime_error("Invalid Array Type");
1253  }
1254  }
1255  break;
1256  }
1257  default:
1258  throw std::runtime_error("Invalid Type");
1259  }
1260  return dataSize;
1261 }
1262 
1264  const TDatum& datum,
1265  const bool is_null,
1266  const int64_t replicate_count) {
1267  set_replicate_count(replicate_count);
1268  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1269  : cd->columnType.get_type();
1270  switch (type) {
1271  case kBOOLEAN: {
1272  if (is_null) {
1273  if (cd->columnType.get_notnull()) {
1274  throw std::runtime_error("NULL for column " + cd->columnName);
1275  }
1277  } else {
1278  addBoolean((int8_t)datum.val.int_val);
1279  }
1280  break;
1281  }
1282  case kTINYINT:
1283  if (!is_null) {
1284  addTinyint((int8_t)datum.val.int_val);
1285  } else {
1286  if (cd->columnType.get_notnull()) {
1287  throw std::runtime_error("NULL for column " + cd->columnName);
1288  }
1290  }
1291  break;
1292  case kSMALLINT:
1293  if (!is_null) {
1294  addSmallint((int16_t)datum.val.int_val);
1295  } else {
1296  if (cd->columnType.get_notnull()) {
1297  throw std::runtime_error("NULL for column " + cd->columnName);
1298  }
1300  }
1301  break;
1302  case kINT:
1303  if (!is_null) {
1304  addInt((int32_t)datum.val.int_val);
1305  } else {
1306  if (cd->columnType.get_notnull()) {
1307  throw std::runtime_error("NULL for column " + cd->columnName);
1308  }
1310  }
1311  break;
1312  case kBIGINT:
1313  if (!is_null) {
1314  addBigint(datum.val.int_val);
1315  } else {
1316  if (cd->columnType.get_notnull()) {
1317  throw std::runtime_error("NULL for column " + cd->columnName);
1318  }
1320  }
1321  break;
1322  case kFLOAT:
1323  if (!is_null) {
1324  addFloat((float)datum.val.real_val);
1325  } else {
1326  if (cd->columnType.get_notnull()) {
1327  throw std::runtime_error("NULL for column " + cd->columnName);
1328  }
1330  }
1331  break;
1332  case kDOUBLE:
1333  if (!is_null) {
1334  addDouble(datum.val.real_val);
1335  } else {
1336  if (cd->columnType.get_notnull()) {
1337  throw std::runtime_error("NULL for column " + cd->columnName);
1338  }
1340  }
1341  break;
1342  case kTEXT:
1343  case kVARCHAR:
1344  case kCHAR: {
1345  // @TODO(wei) for now, use empty string for nulls
1346  if (is_null) {
1347  if (cd->columnType.get_notnull()) {
1348  throw std::runtime_error("NULL for column " + cd->columnName);
1349  }
1350  addString(std::string());
1351  } else {
1352  addString(datum.val.str_val);
1353  }
1354  break;
1355  }
1356  case kTIME:
1357  case kTIMESTAMP:
1358  case kDATE: {
1359  if (!is_null) {
1360  addBigint(datum.val.int_val);
1361  } else {
1362  if (cd->columnType.get_notnull()) {
1363  throw std::runtime_error("NULL for column " + cd->columnName);
1364  }
1366  }
1367  break;
1368  }
1369  case kARRAY:
1370  if (is_null && cd->columnType.get_notnull()) {
1371  throw std::runtime_error("NULL for column " + cd->columnName);
1372  }
1373  if (IS_STRING(cd->columnType.get_subtype())) {
1374  std::vector<std::string>& string_vec = addStringArray();
1375  addBinaryStringArray(datum, string_vec);
1376  } else {
1377  if (!is_null) {
1378  addArray(TDatumToArrayDatum(datum, cd->columnType));
1379  } else {
1381  }
1382  }
1383  break;
1384  case kPOINT:
1385  case kLINESTRING:
1386  case kPOLYGON:
1387  case kMULTIPOLYGON:
1388  if (is_null) {
1389  if (cd->columnType.get_notnull()) {
1390  throw std::runtime_error("NULL for column " + cd->columnName);
1391  }
1392  addGeoString(std::string());
1393  } else {
1394  addGeoString(datum.val.str_val);
1395  }
1396  break;
1397  default:
1398  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1399  }
1400 }
1401 
1402 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1403  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1404  return false;
1405  }
1406  // we don't need to do any coordinate-system transformation
1407  // here (yet) so we don't need to use any OGR API or types
1408  // just use the values directly (assumed to be in 4326)
1409  coords.push_back(lon);
1410  coords.push_back(lat);
1411  return true;
1412 }
1413 
1414 uint64_t compress_coord(double coord, const SQLTypeInfo& ti, bool x) {
1415  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1418  }
1419  return *reinterpret_cast<uint64_t*>(may_alias_ptr(&coord));
1420 }
1421 
1422 std::vector<uint8_t> compress_coords(std::vector<double>& coords, const SQLTypeInfo& ti) {
1423  std::vector<uint8_t> compressed_coords;
1424  bool x = true;
1425  for (auto coord : coords) {
1426  auto coord_data_ptr = reinterpret_cast<uint64_t*>(&coord);
1427  uint64_t coord_data = *coord_data_ptr;
1428  size_t coord_data_size = sizeof(double);
1429 
1430  if (ti.get_output_srid() == 4326) {
1431  if (x) {
1432  if (coord < -180.0 || coord > 180.0) {
1433  throw std::runtime_error("WGS84 longitude " + std::to_string(coord) +
1434  " is out of bounds");
1435  }
1436  } else {
1437  if (coord < -90.0 || coord > 90.0) {
1438  throw std::runtime_error("WGS84 latitude " + std::to_string(coord) +
1439  " is out of bounds");
1440  }
1441  }
1442  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1443  coord_data = compress_coord(coord, ti, x);
1444  coord_data_size = ti.get_comp_param() / 8;
1445  }
1446  x = !x;
1447  }
1448 
1449  for (size_t i = 0; i < coord_data_size; i++) {
1450  compressed_coords.push_back(coord_data & 0xFF);
1451  coord_data >>= 8;
1452  }
1453  }
1454  return compressed_coords;
1455 }
1456 
1458  const Catalog_Namespace::Catalog& catalog,
1459  const ColumnDescriptor* cd,
1460  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1461  size_t& col_idx,
1462  std::vector<double>& coords,
1463  std::vector<double>& bounds,
1464  std::vector<int>& ring_sizes,
1465  std::vector<int>& poly_rings,
1466  int render_group,
1467  const int64_t replicate_count) {
1468  const auto col_ti = cd->columnType;
1469  const auto col_type = col_ti.get_type();
1470  auto columnId = cd->columnId;
1471  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1472  std::vector<TDatum> td_coords_data;
1473  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1474  for (auto cc : compressed_coords) {
1475  TDatum td_byte;
1476  td_byte.val.int_val = cc;
1477  td_coords_data.push_back(td_byte);
1478  }
1479  TDatum tdd_coords;
1480  tdd_coords.val.arr_val = td_coords_data;
1481  tdd_coords.is_null = false;
1482  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1483 
1484  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1485  // Create ring_sizes array value and add it to the physical column
1486  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1487  std::vector<TDatum> td_ring_sizes;
1488  for (auto ring_size : ring_sizes) {
1489  TDatum td_ring_size;
1490  td_ring_size.val.int_val = ring_size;
1491  td_ring_sizes.push_back(td_ring_size);
1492  }
1493  TDatum tdd_ring_sizes;
1494  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1495  tdd_ring_sizes.is_null = false;
1496  import_buffers[col_idx++]->add_value(
1497  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1498  }
1499 
1500  if (col_type == kMULTIPOLYGON) {
1501  // Create poly_rings array value and add it to the physical column
1502  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1503  std::vector<TDatum> td_poly_rings;
1504  for (auto num_rings : poly_rings) {
1505  TDatum td_num_rings;
1506  td_num_rings.val.int_val = num_rings;
1507  td_poly_rings.push_back(td_num_rings);
1508  }
1509  TDatum tdd_poly_rings;
1510  tdd_poly_rings.val.arr_val = td_poly_rings;
1511  tdd_poly_rings.is_null = false;
1512  import_buffers[col_idx++]->add_value(
1513  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1514  }
1515 
1516  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1517  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1518  std::vector<TDatum> td_bounds_data;
1519  for (auto b : bounds) {
1520  TDatum td_double;
1521  td_double.val.real_val = b;
1522  td_bounds_data.push_back(td_double);
1523  }
1524  TDatum tdd_bounds;
1525  tdd_bounds.val.arr_val = td_bounds_data;
1526  tdd_bounds.is_null = false;
1527  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1528  }
1529 
1530  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1531  // Create render_group value and add it to the physical column
1532  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1533  TDatum td_render_group;
1534  td_render_group.val.int_val = render_group;
1535  td_render_group.is_null = false;
1536  import_buffers[col_idx++]->add_value(
1537  cd_render_group, td_render_group, false, replicate_count);
1538  }
1539 }
1540 
1542  const Catalog_Namespace::Catalog& catalog,
1543  const ColumnDescriptor* cd,
1544  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1545  size_t& col_idx,
1546  std::vector<std::vector<double>>& coords_column,
1547  std::vector<std::vector<double>>& bounds_column,
1548  std::vector<std::vector<int>>& ring_sizes_column,
1549  std::vector<std::vector<int>>& poly_rings_column,
1550  int render_group,
1551  const int64_t replicate_count) {
1552  const auto col_ti = cd->columnType;
1553  const auto col_type = col_ti.get_type();
1554  auto columnId = cd->columnId;
1555 
1556  auto coords_row_count = coords_column.size();
1557  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1558  for (auto coords : coords_column) {
1559  std::vector<TDatum> td_coords_data;
1560  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1561  for (auto cc : compressed_coords) {
1562  TDatum td_byte;
1563  td_byte.val.int_val = cc;
1564  td_coords_data.push_back(td_byte);
1565  }
1566  TDatum tdd_coords;
1567  tdd_coords.val.arr_val = td_coords_data;
1568  tdd_coords.is_null = false;
1569  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1570  }
1571  col_idx++;
1572 
1573  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1574  if (ring_sizes_column.size() != coords_row_count) {
1575  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1576  }
1577  // Create ring_sizes array value and add it to the physical column
1578  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1579  for (auto ring_sizes : ring_sizes_column) {
1580  std::vector<TDatum> td_ring_sizes;
1581  for (auto ring_size : ring_sizes) {
1582  TDatum td_ring_size;
1583  td_ring_size.val.int_val = ring_size;
1584  td_ring_sizes.push_back(td_ring_size);
1585  }
1586  TDatum tdd_ring_sizes;
1587  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1588  tdd_ring_sizes.is_null = false;
1589  import_buffers[col_idx]->add_value(
1590  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1591  }
1592  col_idx++;
1593  }
1594 
1595  if (col_type == kMULTIPOLYGON) {
1596  if (poly_rings_column.size() != coords_row_count) {
1597  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1598  }
1599  // Create poly_rings array value and add it to the physical column
1600  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1601  for (auto poly_rings : poly_rings_column) {
1602  std::vector<TDatum> td_poly_rings;
1603  for (auto num_rings : poly_rings) {
1604  TDatum td_num_rings;
1605  td_num_rings.val.int_val = num_rings;
1606  td_poly_rings.push_back(td_num_rings);
1607  }
1608  TDatum tdd_poly_rings;
1609  tdd_poly_rings.val.arr_val = td_poly_rings;
1610  tdd_poly_rings.is_null = false;
1611  import_buffers[col_idx]->add_value(
1612  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1613  }
1614  col_idx++;
1615  }
1616 
1617  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1618  if (bounds_column.size() != coords_row_count) {
1619  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1620  }
1621  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1622  for (auto bounds : bounds_column) {
1623  std::vector<TDatum> td_bounds_data;
1624  for (auto b : bounds) {
1625  TDatum td_double;
1626  td_double.val.real_val = b;
1627  td_bounds_data.push_back(td_double);
1628  }
1629  TDatum tdd_bounds;
1630  tdd_bounds.val.arr_val = td_bounds_data;
1631  tdd_bounds.is_null = false;
1632  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1633  }
1634  col_idx++;
1635  }
1636 
1637  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1638  // Create render_group value and add it to the physical column
1639  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1640  TDatum td_render_group;
1641  td_render_group.val.int_val = render_group;
1642  td_render_group.is_null = false;
1643  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1644  import_buffers[col_idx]->add_value(
1645  cd_render_group, td_render_group, false, replicate_count);
1646  }
1647  col_idx++;
1648  }
1649 }
1650 
1651 namespace {
1652 
1653 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1654  const std::list<const ColumnDescriptor*>& col_descs) {
1655  // validate the columns
1656  // for now we can only explode into a single destination column
1657  // which must be of the child type (POLYGON, LINESTRING, POINT)
1658  int collection_col_idx = -1;
1659  int col_idx = 0;
1660  std::string collection_col_name;
1661  SQLTypes collection_child_type = kNULLT;
1662  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1663  auto const& cd = *cd_it;
1664  auto const col_type = cd->columnType.get_type();
1665  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1666  if (collection_col_idx >= 0) {
1667  throw std::runtime_error(
1668  "Explode Collections: Found more than one destination column");
1669  }
1670  collection_col_idx = col_idx;
1671  collection_child_type = col_type;
1672  collection_col_name = cd->columnName;
1673  }
1674  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1675  ++cd_it;
1676  }
1677  col_idx++;
1678  }
1679  if (collection_col_idx < 0) {
1680  throw std::runtime_error(
1681  "Explode Collections: Failed to find a supported column type to explode "
1682  "into");
1683  }
1684  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1685 }
1686 
1688  OGRGeometry* ogr_geometry,
1689  SQLTypes collection_child_type,
1690  const std::string& collection_col_name,
1691  size_t row_or_feature_idx,
1692  std::function<void(OGRGeometry*)> execute_import_lambda) {
1693  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1694  bool is_collection = false;
1695  switch (collection_child_type) {
1696  case kPOINT:
1697  switch (ogr_geometry_type) {
1698  case wkbMultiPoint:
1699  is_collection = true;
1700  break;
1701  case wkbPoint:
1702  break;
1703  default:
1704  throw std::runtime_error(
1705  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1706  }
1707  break;
1708  case kLINESTRING:
1709  switch (ogr_geometry_type) {
1710  case wkbMultiLineString:
1711  is_collection = true;
1712  break;
1713  case wkbLineString:
1714  break;
1715  default:
1716  throw std::runtime_error(
1717  "Explode Collections: Source geo type must be MULTILINESTRING or "
1718  "LINESTRING");
1719  }
1720  break;
1721  case kPOLYGON:
1722  switch (ogr_geometry_type) {
1723  case wkbMultiPolygon:
1724  is_collection = true;
1725  break;
1726  case wkbPolygon:
1727  break;
1728  default:
1729  throw std::runtime_error(
1730  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1731  }
1732  break;
1733  default:
1734  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1735  }
1736 
1737  int64_t us = 0LL;
1738 
1739  // explode or just import
1740  if (is_collection) {
1741  // cast to collection
1742  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1743  CHECK(collection_geometry);
1744 
1745 #if LOG_EXPLODE_COLLECTIONS
1746  // log number of children
1747  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1748  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1749  << " child rows";
1750 #endif
1751 
1752  // loop over children
1753  uint32_t child_geometry_count = 0;
1754  auto child_geometry_it = collection_geometry->begin();
1755  while (child_geometry_it != collection_geometry->end()) {
1756  // get and import this child
1757  OGRGeometry* import_geometry = *child_geometry_it;
1759  [&] { execute_import_lambda(import_geometry); });
1760 
1761  // next child
1762  child_geometry_it++;
1763  child_geometry_count++;
1764  }
1765  } else {
1766  // import non-collection row just once
1768  [&] { execute_import_lambda(ogr_geometry); });
1769  }
1770 
1771  // done
1772  return us;
1773 }
1774 
1775 } // namespace
1776 
1778  int thread_id,
1779  Importer* importer,
1780  std::unique_ptr<char[]> scratch_buffer,
1781  size_t begin_pos,
1782  size_t end_pos,
1783  size_t total_size,
1784  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1785  size_t first_row_index_this_buffer) {
1786  ImportStatus import_status;
1787  int64_t total_get_row_time_us = 0;
1788  int64_t total_str_to_val_time_us = 0;
1789  CHECK(scratch_buffer);
1790  auto buffer = scratch_buffer.get();
1791  auto load_ms = measure<>::execution([]() {});
1792  auto ms = measure<>::execution([&]() {
1793  const CopyParams& copy_params = importer->get_copy_params();
1794  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1795  size_t begin =
1796  DelimitedParserUtils::find_beginning(buffer, begin_pos, end_pos, copy_params);
1797  const char* thread_buf = buffer + begin_pos + begin;
1798  const char* thread_buf_end = buffer + end_pos;
1799  const char* buf_end = buffer + total_size;
1800  bool try_single_thread = false;
1801  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1802  importer->get_import_buffers(thread_id);
1804  int phys_cols = 0;
1805  int point_cols = 0;
1806  for (const auto cd : col_descs) {
1807  const auto& col_ti = cd->columnType;
1808  phys_cols += col_ti.get_physical_cols();
1809  if (cd->columnType.get_type() == kPOINT) {
1810  point_cols++;
1811  }
1812  }
1813  auto num_cols = col_descs.size() - phys_cols;
1814  for (const auto& p : import_buffers) {
1815  p->clear();
1816  }
1817  std::vector<std::string> row;
1818  size_t row_index_plus_one = 0;
1819  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1820  row.clear();
1821  if (DEBUG_TIMING) {
1824  thread_buf_end,
1825  buf_end,
1826  copy_params,
1827  importer->get_is_array(),
1828  row,
1829  try_single_thread);
1830  });
1831  total_get_row_time_us += us;
1832  } else {
1834  thread_buf_end,
1835  buf_end,
1836  copy_params,
1837  importer->get_is_array(),
1838  row,
1839  try_single_thread);
1840  }
1841  row_index_plus_one++;
1842  // Each POINT could consume two separate coords instead of a single WKT
1843  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1844  import_status.rows_rejected++;
1845  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1846  << row.size() << "): " << row;
1847  if (import_status.rows_rejected > copy_params.max_reject) {
1848  break;
1849  }
1850  continue;
1851  }
1852 
1853  //
1854  // lambda for importing a row (perhaps multiple times if exploding a collection)
1855  //
1856 
1857  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1858  size_t import_idx = 0;
1859  size_t col_idx = 0;
1860  try {
1861  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1862  auto cd = *cd_it;
1863  const auto& col_ti = cd->columnType;
1864  if (col_ti.get_physical_cols() == 0) {
1865  // not geo
1866 
1867  // store the string (possibly null)
1868  bool is_null =
1869  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1870  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1871  // So initially nullness may be missed and not passed to add_value,
1872  // which then might also check and still decide it's actually a NULL,
1873  // e.g. if kINT doesn't start with a digit or a '-' then it's considered
1874  // NULL. So "NULL" is not recognized as NULL but then it's not
1875  // recognized as a valid kINT, so it's a NULL after all. Checking for
1876  // "NULL" here too, as a widely accepted notation for NULL.
1877  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1878  is_null = true;
1879  }
1880  import_buffers[col_idx]->add_value(
1881  cd, row[import_idx], is_null, copy_params);
1882 
1883  // next
1884  ++import_idx;
1885  ++col_idx;
1886  } else {
1887  // geo
1888 
1889  // store null string in the base column
1890  import_buffers[col_idx]->add_value(
1891  cd, copy_params.null_str, true, copy_params);
1892 
1893  // WKT from string we're not storing
1894  auto const& wkt = row[import_idx];
1895 
1896  // next
1897  ++import_idx;
1898  ++col_idx;
1899 
1900  SQLTypes col_type = col_ti.get_type();
1901  CHECK(IS_GEO(col_type));
1902 
1903  std::vector<double> coords;
1904  std::vector<double> bounds;
1905  std::vector<int> ring_sizes;
1906  std::vector<int> poly_rings;
1907  int render_group = 0;
1908 
1909  if (col_type == kPOINT && wkt.size() > 0 &&
1910  (wkt[0] == '.' || isdigit(wkt[0]) || wkt[0] == '-')) {
1911  // Invalid WKT, looks more like a scalar.
1912  // Try custom POINT import: from two separate scalars rather than WKT
1913  // string
1914  double lon = std::atof(wkt.c_str());
1915  double lat = NAN;
1916  std::string lat_str{row[import_idx]};
1917  ++import_idx;
1918  if (lat_str.size() > 0 &&
1919  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1920  lat = std::atof(lat_str.c_str());
1921  }
1922  // Swap coordinates if this table uses a reverse order: lat/lon
1923  if (!copy_params.lonlat) {
1924  std::swap(lat, lon);
1925  }
1926  // TODO: should check if POINT column should have been declared with
1927  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1928  // throw std::runtime_error("POINT column " + cd->columnName + " is
1929  // not WGS84, cannot insert lon/lat");
1930  // }
1931  if (!importGeoFromLonLat(lon, lat, coords)) {
1932  throw std::runtime_error(
1933  "Cannot read lon/lat to insert into POINT column " +
1934  cd->columnName);
1935  }
1936  } else {
1937  // import it
1938  SQLTypeInfo import_ti;
1939  if (import_geometry) {
1940  // geometry already exploded
1942  import_geometry,
1943  import_ti,
1944  coords,
1945  bounds,
1946  ring_sizes,
1947  poly_rings,
1949  std::string msg =
1950  "Failed to extract valid geometry from exploded row " +
1951  std::to_string(first_row_index_this_buffer + row_index_plus_one) +
1952  " for column " + cd->columnName;
1953  throw std::runtime_error(msg);
1954  }
1955  } else {
1956  // extract geometry directly from WKT
1958  wkt,
1959  import_ti,
1960  coords,
1961  bounds,
1962  ring_sizes,
1963  poly_rings,
1965  std::string msg =
1966  "Failed to extract valid geometry from row " +
1967  std::to_string(first_row_index_this_buffer + row_index_plus_one) +
1968  " for column " + cd->columnName;
1969  throw std::runtime_error(msg);
1970  }
1971  }
1972 
1973  // validate types
1974  if (col_type != import_ti.get_type()) {
1976  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
1977  col_type == SQLTypes::kMULTIPOLYGON)) {
1978  throw std::runtime_error(
1979  "Imported geometry doesn't match the type of column " +
1980  cd->columnName);
1981  }
1982  }
1983 
1984  // assign render group?
1985  if (columnIdToRenderGroupAnalyzerMap.size()) {
1986  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1987  if (ring_sizes.size()) {
1988  // get a suitable render group for these poly coords
1989  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
1990  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
1991  render_group =
1992  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
1993  } else {
1994  // empty poly
1995  render_group = -1;
1996  }
1997  }
1998  }
1999  }
2000 
2001  // import extracted geo
2003  cd,
2004  import_buffers,
2005  col_idx,
2006  coords,
2007  bounds,
2008  ring_sizes,
2009  poly_rings,
2010  render_group);
2011 
2012  // skip remaining physical columns
2013  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2014  ++cd_it;
2015  }
2016  }
2017  }
2018  import_status.rows_completed++;
2019  } catch (const std::exception& e) {
2020  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2021  import_buffers[col_idx_to_pop]->pop_value();
2022  }
2023  import_status.rows_rejected++;
2024  LOG(ERROR) << "Input exception thrown: " << e.what()
2025  << ". Row discarded. Data: " << row;
2026  }
2027  };
2028 
2029  if (copy_params.geo_explode_collections) {
2030  // explode and import
2031  // @TODO(se) convert to structure-bindings when we can use C++17 here
2032  auto collection_idx_type_name = explode_collections_step1(col_descs);
2033  int collection_col_idx = std::get<0>(collection_idx_type_name);
2034  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2035  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2036  // pull out the collection WKT
2037  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2038  auto const& collection_wkt = row[collection_col_idx];
2039  // convert to OGR
2040  OGRGeometry* ogr_geometry = nullptr;
2041  ScopeGuard destroy_ogr_geometry = [&] {
2042  if (ogr_geometry) {
2043  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2044  }
2045  };
2046  OGRErr ogr_status = OGRGeometryFactory::createFromWkt(
2047  collection_wkt.c_str(), nullptr, &ogr_geometry);
2048  if (ogr_status != OGRERR_NONE) {
2049  throw std::runtime_error("Failed to convert WKT to geometry");
2050  }
2051  // do the explode and import
2052  us = explode_collections_step2(ogr_geometry,
2053  collection_child_type,
2054  collection_col_name,
2055  first_row_index_this_buffer + row_index_plus_one,
2056  execute_import_row);
2057  } else {
2058  // import non-collection row just once
2060  [&] { execute_import_row(nullptr); });
2061  }
2062  total_str_to_val_time_us += us;
2063  } // end thread
2064  if (import_status.rows_completed > 0) {
2065  load_ms = measure<>::execution(
2066  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2067  }
2068  });
2069  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2070  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2071  << import_status.rows_completed << " rows inserted in "
2072  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2073  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2074  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2075  << "sec" << std::endl;
2076  }
2077 
2078  import_status.thread_id = thread_id;
2079  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2080 
2081  return import_status;
2082 }
2083 
2085  int thread_id,
2086  Importer* importer,
2087  OGRSpatialReference* poGeographicSR,
2088  const FeaturePtrVector& features,
2089  size_t firstFeature,
2090  size_t numFeatures,
2091  const FieldNameToIndexMapType& fieldNameToIndexMap,
2092  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2093  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2094  ImportStatus import_status;
2095  const CopyParams& copy_params = importer->get_copy_params();
2096  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2097  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2098  importer->get_import_buffers(thread_id);
2099 
2100  for (const auto& p : import_buffers) {
2101  p->clear();
2102  }
2103 
2104  auto convert_timer = timer_start();
2105 
2106  // we create this on the fly based on the first feature's SR
2107  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2108 
2109  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2110  if (!features[iFeature]) {
2111  continue;
2112  }
2113 
2114  // get this feature's geometry
2115  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2116  if (pGeometry) {
2117  // for geodatabase, we need to consider features with no geometry
2118  // as we still want to create a table, even if it has no geo column
2119 
2120  // transform it
2121  // avoid GDAL error if not transformable
2122  auto geometry_sr = pGeometry->getSpatialReference();
2123  if (geometry_sr) {
2124  // create an OGRCoordinateTransformation (CT) on the fly
2125  // we must assume that all geo in this file will have
2126  // the same source SR, so the CT will be valid for all
2127  // transforming to a reusable CT is faster than to an SR
2128  if (coordinate_transformation == nullptr) {
2129  coordinate_transformation.reset(
2130  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2131  if (coordinate_transformation == nullptr) {
2132  throw std::runtime_error(
2133  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2134  }
2135  }
2136  pGeometry->transform(coordinate_transformation.get());
2137  }
2138  }
2139 
2140  //
2141  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2142  //
2143 
2144  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2145  size_t col_idx = 0;
2146  try {
2147  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2148  auto cd = *cd_it;
2149 
2150  // is this a geo column?
2151  const auto& col_ti = cd->columnType;
2152  if (col_ti.is_geometry()) {
2153  // some Shapefiles get us here, but the OGRGeometryRef is null
2154  if (!import_geometry) {
2155  std::string msg = "Geometry feature " +
2156  std::to_string(firstFeature + iFeature + 1) +
2157  " has null GeometryRef";
2158  throw std::runtime_error(msg);
2159  }
2160 
2161  // Note that this assumes there is one and only one geo column in the table.
2162  // Currently, the importer only supports reading a single geospatial feature
2163  // from an input shapefile / geojson file, but this code will need to be
2164  // modified if that changes
2165  SQLTypes col_type = col_ti.get_type();
2166 
2167  // store null string in the base column
2168  import_buffers[col_idx]->add_value(
2169  cd, copy_params.null_str, true, copy_params);
2170  ++col_idx;
2171 
2172  // the data we now need to extract for the other columns
2173  std::vector<double> coords;
2174  std::vector<double> bounds;
2175  std::vector<int> ring_sizes;
2176  std::vector<int> poly_rings;
2177  int render_group = 0;
2178 
2179  // extract it
2180  SQLTypeInfo import_ti;
2181 
2183  import_geometry,
2184  import_ti,
2185  coords,
2186  bounds,
2187  ring_sizes,
2188  poly_rings,
2190  std::string msg = "Failed to extract valid geometry from feature " +
2191  std::to_string(firstFeature + iFeature + 1) +
2192  " for column " + cd->columnName;
2193  throw std::runtime_error(msg);
2194  }
2195 
2196  // validate types
2197  if (col_type != import_ti.get_type()) {
2199  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2200  col_type == SQLTypes::kMULTIPOLYGON)) {
2201  throw std::runtime_error(
2202  "Imported geometry doesn't match the type of column " +
2203  cd->columnName);
2204  }
2205  }
2206 
2207  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2208  if (ring_sizes.size()) {
2209  // get a suitable render group for these poly coords
2210  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2211  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2212  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2213  } else {
2214  // empty poly
2215  render_group = -1;
2216  }
2217  }
2218 
2219  // create coords array value and add it to the physical column
2220  ++cd_it;
2221  auto cd_coords = *cd_it;
2222  std::vector<TDatum> td_coord_data;
2223  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
2224  for (auto cc : compressed_coords) {
2225  TDatum td_byte;
2226  td_byte.val.int_val = cc;
2227  td_coord_data.push_back(td_byte);
2228  }
2229  TDatum tdd_coords;
2230  tdd_coords.val.arr_val = td_coord_data;
2231  tdd_coords.is_null = false;
2232  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2233  ++col_idx;
2234 
2235  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2236  // Create ring_sizes array value and add it to the physical column
2237  ++cd_it;
2238  auto cd_ring_sizes = *cd_it;
2239  std::vector<TDatum> td_ring_sizes;
2240  for (auto ring_size : ring_sizes) {
2241  TDatum td_ring_size;
2242  td_ring_size.val.int_val = ring_size;
2243  td_ring_sizes.push_back(td_ring_size);
2244  }
2245  TDatum tdd_ring_sizes;
2246  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2247  tdd_ring_sizes.is_null = false;
2248  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2249  ++col_idx;
2250  }
2251 
2252  if (col_type == kMULTIPOLYGON) {
2253  // Create poly_rings array value and add it to the physical column
2254  ++cd_it;
2255  auto cd_poly_rings = *cd_it;
2256  std::vector<TDatum> td_poly_rings;
2257  for (auto num_rings : poly_rings) {
2258  TDatum td_num_rings;
2259  td_num_rings.val.int_val = num_rings;
2260  td_poly_rings.push_back(td_num_rings);
2261  }
2262  TDatum tdd_poly_rings;
2263  tdd_poly_rings.val.arr_val = td_poly_rings;
2264  tdd_poly_rings.is_null = false;
2265  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2266  ++col_idx;
2267  }
2268 
2269  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2270  col_type == kMULTIPOLYGON) {
2271  // Create bounds array value and add it to the physical column
2272  ++cd_it;
2273  auto cd_bounds = *cd_it;
2274  std::vector<TDatum> td_bounds_data;
2275  for (auto b : bounds) {
2276  TDatum td_double;
2277  td_double.val.real_val = b;
2278  td_bounds_data.push_back(td_double);
2279  }
2280  TDatum tdd_bounds;
2281  tdd_bounds.val.arr_val = td_bounds_data;
2282  tdd_bounds.is_null = false;
2283  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2284  ++col_idx;
2285  }
2286 
2287  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2288  // Create render_group value and add it to the physical column
2289  ++cd_it;
2290  auto cd_render_group = *cd_it;
2291  TDatum td_render_group;
2292  td_render_group.val.int_val = render_group;
2293  td_render_group.is_null = false;
2294  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2295  ++col_idx;
2296  }
2297  } else {
2298  // regular column
2299  // pull from GDAL metadata
2300  const auto cit = columnNameToSourceNameMap.find(cd->columnName);
2301  CHECK(cit != columnNameToSourceNameMap.end());
2302  const std::string& fieldName = cit->second;
2303  const auto fit = fieldNameToIndexMap.find(fieldName);
2304  CHECK(fit != fieldNameToIndexMap.end());
2305  size_t iField = fit->second;
2306  CHECK(iField < fieldNameToIndexMap.size());
2307  std::string fieldContents = features[iFeature]->GetFieldAsString(iField);
2308  import_buffers[col_idx]->add_value(cd, fieldContents, false, copy_params);
2309  ++col_idx;
2310  }
2311  }
2312  import_status.rows_completed++;
2313  } catch (const std::exception& e) {
2314  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2315  import_buffers[col_idx_to_pop]->pop_value();
2316  }
2317  import_status.rows_rejected++;
2318  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2319  }
2320  };
2321 
2322  if (pGeometry && copy_params.geo_explode_collections) {
2323  // explode and import
2324  // @TODO(se) convert to structure-bindings when we can use C++17 here
2325  auto collection_idx_type_name = explode_collections_step1(col_descs);
2326  SQLTypes collection_child_type = std::get<1>(collection_idx_type_name);
2327  std::string collection_col_name = std::get<2>(collection_idx_type_name);
2328  explode_collections_step2(pGeometry,
2329  collection_child_type,
2330  collection_col_name,
2331  firstFeature + iFeature + 1,
2332  execute_import_feature);
2333  } else {
2334  // import non-collection or null feature just once
2335  execute_import_feature(pGeometry);
2336  }
2337  } // end features
2338 
2339  float convert_ms =
2340  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2341  convert_timer)) /
2342  1000.0f;
2343 
2344  float load_ms = 0.0f;
2345  if (import_status.rows_completed > 0) {
2346  auto load_timer = timer_start();
2347  importer->load(import_buffers, import_status.rows_completed);
2348  load_ms =
2349  float(
2350  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2351  load_timer)) /
2352  1000.0f;
2353  }
2354 
2355  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2356  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2357  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2358  }
2359 
2360  import_status.thread_id = thread_id;
2361 
2362  if (DEBUG_TIMING) {
2363  LOG(INFO) << "DEBUG: Total "
2364  << float(timer_stop<std::chrono::steady_clock::time_point,
2365  std::chrono::microseconds>(convert_timer)) /
2366  1000.0f
2367  << "ms";
2368  }
2369 
2370  return import_status;
2371 }
2372 
2374  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2375  size_t row_count) {
2376  return loadImpl(import_buffers, row_count, false);
2377 }
2378 
2379 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2380  size_t row_count) {
2381  return loadImpl(import_buffers, row_count, true);
2382 }
2383 
2384 namespace {
2385 
2386 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2387  const auto& ti = import_buffer.getTypeInfo();
2388  const int8_t* values_buffer{nullptr};
2389  if (ti.is_string()) {
2390  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2391  values_buffer = import_buffer.getStringDictBuffer();
2392  } else {
2393  values_buffer = import_buffer.getAsBytes();
2394  }
2395  CHECK(values_buffer);
2396  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2397  switch (logical_size) {
2398  case 1: {
2399  return values_buffer[index];
2400  }
2401  case 2: {
2402  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2403  }
2404  case 4: {
2405  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2406  }
2407  case 8: {
2408  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2409  }
2410  default:
2411  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2412  }
2413  UNREACHABLE();
2414  return 0;
2415 }
2416 
2417 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2418  const auto& ti = import_buffer.getTypeInfo();
2419  CHECK_EQ(kFLOAT, ti.get_type());
2420  const auto values_buffer = import_buffer.getAsBytes();
2421  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2422 }
2423 
2424 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2425  const auto& ti = import_buffer.getTypeInfo();
2426  CHECK_EQ(kDOUBLE, ti.get_type());
2427  const auto values_buffer = import_buffer.getAsBytes();
2428  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2429 }
2430 
2431 } // namespace
2432 
2433 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2434  std::vector<size_t>& all_shard_row_counts,
2435  const OneShardBuffers& import_buffers,
2436  const size_t row_count,
2437  const size_t shard_count) {
2438  all_shard_row_counts.resize(shard_count);
2439  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2440  all_shard_import_buffers.emplace_back();
2441  for (const auto& typed_import_buffer : import_buffers) {
2442  all_shard_import_buffers.back().emplace_back(
2443  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2444  typed_import_buffer->getStringDictionary()));
2445  }
2446  }
2448  int col_idx{0};
2449  const ColumnDescriptor* shard_col_desc{nullptr};
2450  for (const auto col_desc : column_descs_) {
2451  ++col_idx;
2452  if (col_idx == table_desc_->shardedColumnId) {
2453  shard_col_desc = col_desc;
2454  break;
2455  }
2456  }
2457  CHECK(shard_col_desc);
2458  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2459  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2460  const auto& shard_col_ti = shard_col_desc->columnType;
2461  CHECK(shard_col_ti.is_integer() ||
2462  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2463  shard_col_ti.is_time());
2464  if (shard_col_ti.is_string()) {
2465  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2466  CHECK(payloads_ptr);
2467  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2468  }
2469 
2470  // for each replicated (alter added) columns, number of rows in a shard is
2471  // inferred from that of the sharding column, not simply evenly distributed.
2472  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2473  // Here the loop count is overloaded. For normal imports, we loop thru all
2474  // input values (rows), so the loop count is the number of input rows.
2475  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2476  // all shards, so the loop count is the number of shards.
2477  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2478  for (size_t i = 0; i < loop_count; ++i) {
2479  const size_t shard =
2480  getReplicating()
2481  ? i
2482  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2483  auto& shard_output_buffers = all_shard_import_buffers[shard];
2484 
2485  // when replicate a column, populate 'rows' to all shards only once
2486  // and its value is fetch from the first and the single row
2487  const auto row_index = getReplicating() ? 0 : i;
2488 
2489  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2490  const auto& input_buffer = import_buffers[col_idx];
2491  const auto& col_ti = input_buffer->getTypeInfo();
2492  const auto type =
2493  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2494 
2495  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2496  // count. and, bypass non-replicated column.
2497  if (getReplicating()) {
2498  if (input_buffer->get_replicate_count() > 0) {
2499  shard_output_buffers[col_idx]->set_replicate_count(
2500  shard_tds[shard]->fragmenter->getNumRows());
2501  } else {
2502  continue;
2503  }
2504  }
2505 
2506  switch (type) {
2507  case kBOOLEAN:
2508  shard_output_buffers[col_idx]->addBoolean(
2509  int_value_at(*input_buffer, row_index));
2510  break;
2511  case kTINYINT:
2512  shard_output_buffers[col_idx]->addTinyint(
2513  int_value_at(*input_buffer, row_index));
2514  break;
2515  case kSMALLINT:
2516  shard_output_buffers[col_idx]->addSmallint(
2517  int_value_at(*input_buffer, row_index));
2518  break;
2519  case kINT:
2520  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2521  break;
2522  case kBIGINT:
2523  shard_output_buffers[col_idx]->addBigint(
2524  int_value_at(*input_buffer, row_index));
2525  break;
2526  case kFLOAT:
2527  shard_output_buffers[col_idx]->addFloat(
2528  float_value_at(*input_buffer, row_index));
2529  break;
2530  case kDOUBLE:
2531  shard_output_buffers[col_idx]->addDouble(
2532  double_value_at(*input_buffer, row_index));
2533  break;
2534  case kTEXT:
2535  case kVARCHAR:
2536  case kCHAR: {
2537  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2538  shard_output_buffers[col_idx]->addString(
2539  (*input_buffer->getStringBuffer())[row_index]);
2540  break;
2541  }
2542  case kTIME:
2543  case kTIMESTAMP:
2544  case kDATE:
2545  shard_output_buffers[col_idx]->addBigint(
2546  int_value_at(*input_buffer, row_index));
2547  break;
2548  case kARRAY:
2549  if (IS_STRING(col_ti.get_subtype())) {
2550  CHECK(input_buffer->getStringArrayBuffer());
2551  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2552  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2553  shard_output_buffers[col_idx]->addStringArray(input_arr);
2554  } else {
2555  shard_output_buffers[col_idx]->addArray(
2556  (*input_buffer->getArrayBuffer())[row_index]);
2557  }
2558  break;
2559  case kPOINT:
2560  case kLINESTRING:
2561  case kPOLYGON:
2562  case kMULTIPOLYGON: {
2563  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2564  shard_output_buffers[col_idx]->addGeoString(
2565  (*input_buffer->getGeoStringBuffer())[row_index]);
2566  break;
2567  }
2568  default:
2569  CHECK(false);
2570  }
2571  }
2572  ++all_shard_row_counts[shard];
2573  // when replicating a column, row count of a shard == replicate count of the column on
2574  // the shard
2575  if (getReplicating()) {
2576  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2577  }
2578  }
2579 }
2580 
2582  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2583  size_t row_count,
2584  bool checkpoint) {
2585  if (table_desc_->nShards) {
2586  std::vector<OneShardBuffers> all_shard_import_buffers;
2587  std::vector<size_t> all_shard_row_counts;
2588  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2589  distributeToShards(all_shard_import_buffers,
2590  all_shard_row_counts,
2591  import_buffers,
2592  row_count,
2593  shard_tables.size());
2594  bool success = true;
2595  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2596  if (!all_shard_row_counts[shard_idx]) {
2597  continue;
2598  }
2599  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2600  all_shard_row_counts[shard_idx],
2601  shard_tables[shard_idx],
2602  checkpoint);
2603  }
2604  return success;
2605  }
2606  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2607 }
2608 
2609 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2610  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2611  std::vector<DataBlockPtr> result(import_buffers.size());
2612  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2613  encoded_data_block_ptrs_futures;
2614  // make all async calls to string dictionary here and then continue execution
2615  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2616  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2617  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2618  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2619  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2620 
2621  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2622  buf_idx,
2623  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2624  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2625  return import_buffers[buf_idx]->getStringDictBuffer();
2626  })));
2627  }
2628  }
2629 
2630  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2631  DataBlockPtr p;
2632  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2633  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2634  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2635  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2636  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2637  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2638  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2639  p.stringsPtr = string_payload_ptr;
2640  } else {
2641  // This condition means we have column which is ENCODED string. We already made
2642  // Async request to gain the encoded integer values above so we should skip this
2643  // iteration and continue.
2644  continue;
2645  }
2646  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2647  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2648  p.stringsPtr = geo_payload_ptr;
2649  } else {
2650  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2651  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2652  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2653  import_buffers[buf_idx]->addDictEncodedStringArray(
2654  *import_buffers[buf_idx]->getStringArrayBuffer());
2655  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2656  } else {
2657  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2658  }
2659  }
2660  result[buf_idx] = p;
2661  }
2662 
2663  // wait for the async requests we made for string dictionary
2664  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2665  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2666  }
2667  return result;
2668 }
2669 
2671  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2672  size_t row_count,
2673  const TableDescriptor* shard_table,
2674  bool checkpoint) {
2675  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2676  // patch insert_data with new column
2677  if (this->getReplicating()) {
2678  for (const auto& import_buff : import_buffers) {
2679  insert_data_.replicate_count = import_buff->get_replicate_count();
2680  insert_data_.columnDescriptors[import_buff->getColumnDesc()->columnId] =
2681  import_buff->getColumnDesc();
2682  }
2683  }
2684 
2686  ins_data.numRows = row_count;
2687  bool success = true;
2688 
2689  ins_data.data = get_data_block_pointers(import_buffers);
2690 
2691  for (const auto& import_buffer : import_buffers) {
2692  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2693  }
2694 
2695  // release loader_lock so that in InsertOrderFragmenter::insertDat
2696  // we can have multiple threads sort/shuffle InsertData
2697  loader_lock.unlock();
2698 
2699  {
2700  try {
2701  if (checkpoint) {
2702  shard_table->fragmenter->insertData(ins_data);
2703  } else {
2704  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2705  }
2706  } catch (std::exception& e) {
2707  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2708  success = false;
2709  }
2710  }
2711  return success;
2712 }
2713 
2717  for (auto cd : column_descs_) {
2718  insert_data_.columnIds.push_back(cd->columnId);
2719  if (cd->columnType.get_compression() == kENCODING_DICT) {
2720  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2721  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2722  CHECK(dd);
2723  dict_map_[cd->columnId] = dd->stringDict.get();
2724  }
2725  }
2726  insert_data_.numRows = 0;
2727 }
2728 
2731  split_raw_data();
2733 }
2734 
2735 ImportStatus Detector::importDelimited(const std::string& file_path,
2736  const bool decompressed) {
2737  if (!p_file) {
2738  p_file = fopen(file_path.c_str(), "rb");
2739  }
2740  if (!p_file) {
2741  throw std::runtime_error("failed to open file '" + file_path +
2742  "': " + strerror(errno));
2743  }
2744 
2745  // somehow clang does not support ext/stdio_filebuf.h, so
2746  // need to diy readline with customized copy_params.line_delim...
2747  std::string line;
2748  line.reserve(1 * 1024 * 1024);
2749  auto end_time = std::chrono::steady_clock::now() +
2750  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2751  try {
2752  while (!feof(p_file)) {
2753  int c;
2754  size_t n = 0;
2755  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2756  if (n++ >= line.capacity()) {
2757  break;
2758  }
2759  line += c;
2760  }
2761  if (0 == n) {
2762  break;
2763  }
2764  // remember the first line, which is possibly a header line, to
2765  // ignore identical header line(s) in 2nd+ files of a archive;
2766  // otherwise, 2nd+ header may be mistaken as an all-string row
2767  // and so be final column types.
2768  if (line1.empty()) {
2769  line1 = line;
2770  } else if (line == line1) {
2771  line.clear();
2772  continue;
2773  }
2774 
2775  raw_data += line;
2777  line.clear();
2779  if (std::chrono::steady_clock::now() > end_time) {
2780  if (import_status.rows_completed > 10000) {
2781  break;
2782  }
2783  }
2784  }
2785  } catch (std::exception& e) {
2786  }
2787 
2788  // as if load truncated
2790  load_failed = true;
2791 
2792  fclose(p_file);
2793  p_file = nullptr;
2794  return import_status;
2795 }
2796 
2798  // this becomes analogous to Importer::import()
2800 }
2801 
2803  if (copy_params.delimiter == '\0') {
2804  copy_params.delimiter = ',';
2805  if (boost::filesystem::extension(file_path) == ".tsv") {
2806  copy_params.delimiter = '\t';
2807  }
2808  }
2809 }
2810 
2812  const char* buf = raw_data.c_str();
2813  const char* buf_end = buf + raw_data.size();
2814  bool try_single_thread = false;
2815  for (const char* p = buf; p < buf_end; p++) {
2816  std::vector<std::string> row;
2818  p, buf_end, buf_end, copy_params, nullptr, row, try_single_thread);
2819  raw_rows.push_back(row);
2820  if (try_single_thread) {
2821  break;
2822  }
2823  }
2824  if (try_single_thread) {
2825  copy_params.threads = 1;
2826  raw_rows.clear();
2827  for (const char* p = buf; p < buf_end; p++) {
2828  std::vector<std::string> row;
2830  p, buf_end, buf_end, copy_params, nullptr, row, try_single_thread);
2831  raw_rows.push_back(row);
2832  }
2833  }
2834 }
2835 
2836 template <class T>
2837 bool try_cast(const std::string& str) {
2838  try {
2839  boost::lexical_cast<T>(str);
2840  } catch (const boost::bad_lexical_cast& e) {
2841  return false;
2842  }
2843  return true;
2844 }
2845 
2846 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2847  std::tm tm_struct;
2848  char* buf;
2849  for (auto format : formats) {
2850  buf = strptime(str, format.c_str(), &tm_struct);
2851  if (buf) {
2852  return buf;
2853  }
2854  }
2855  return nullptr;
2856 }
2857 
2858 SQLTypes Detector::detect_sqltype(const std::string& str) {
2859  SQLTypes type = kTEXT;
2860  if (try_cast<double>(str)) {
2861  type = kDOUBLE;
2862  /*if (try_cast<bool>(str)) {
2863  type = kBOOLEAN;
2864  }*/
2865  if (try_cast<int16_t>(str)) {
2866  type = kSMALLINT;
2867  } else if (try_cast<int32_t>(str)) {
2868  type = kINT;
2869  } else if (try_cast<int64_t>(str)) {
2870  type = kBIGINT;
2871  } else if (try_cast<float>(str)) {
2872  type = kFLOAT;
2873  }
2874  }
2875 
2876  // check for geo types
2877  if (type == kTEXT) {
2878  // convert to upper case
2879  std::string str_upper_case = str;
2880  std::transform(
2881  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
2882 
2883  // then test for leading words
2884  if (str_upper_case.find("POINT") == 0) {
2885  type = kPOINT;
2886  } else if (str_upper_case.find("LINESTRING") == 0) {
2887  type = kLINESTRING;
2888  } else if (str_upper_case.find("POLYGON") == 0) {
2890  type = kMULTIPOLYGON;
2891  } else {
2892  type = kPOLYGON;
2893  }
2894  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
2895  type = kMULTIPOLYGON;
2896  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
2897  std::string::npos &&
2898  (str_upper_case.size() % 2) == 0) {
2899  // could be a WKB hex blob
2900  // we can't handle these yet
2901  // leave as TEXT for now
2902  // deliberate return here, as otherwise this would get matched as TIME
2903  // @TODO
2904  // implement WKB import
2905  return type;
2906  }
2907  }
2908 
2909  // check for time types
2910  if (type == kTEXT) {
2911  // @TODO
2912  // make these tests more robust so they don't match stuff they should not
2913  char* buf;
2914  buf = try_strptimes(str.c_str(), {"%Y-%m-%d", "%m/%d/%Y", "%d-%b-%y", "%d/%b/%Y"});
2915  if (buf) {
2916  type = kDATE;
2917  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
2918  buf++;
2919  }
2920  }
2921  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
2922  {"%T %z", "%T", "%H%M%S", "%R"});
2923  if (buf) {
2924  if (type == kDATE) {
2925  type = kTIMESTAMP;
2926  } else {
2927  type = kTIME;
2928  }
2929  }
2930  }
2931 
2932  return type;
2933 }
2934 
2935 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
2936  std::vector<SQLTypes> types(row.size());
2937  for (size_t i = 0; i < row.size(); i++) {
2938  types[i] = detect_sqltype(row[i]);
2939  }
2940  return types;
2941 }
2942 
2944  static std::array<int, kSQLTYPE_LAST> typeorder;
2945  typeorder[kCHAR] = 0;
2946  typeorder[kBOOLEAN] = 2;
2947  typeorder[kSMALLINT] = 3;
2948  typeorder[kINT] = 4;
2949  typeorder[kBIGINT] = 5;
2950  typeorder[kFLOAT] = 6;
2951  typeorder[kDOUBLE] = 7;
2952  typeorder[kTIMESTAMP] = 8;
2953  typeorder[kTIME] = 9;
2954  typeorder[kDATE] = 10;
2955  typeorder[kPOINT] = 11;
2956  typeorder[kLINESTRING] = 11;
2957  typeorder[kPOLYGON] = 11;
2958  typeorder[kMULTIPOLYGON] = 11;
2959  typeorder[kTEXT] = 12;
2960 
2961  // note: b < a instead of a < b because the map is ordered most to least restrictive
2962  return typeorder[b] < typeorder[a];
2963 }
2964 
2967  best_encodings =
2968  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
2969  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
2970  switch (copy_params.has_header) {
2972  has_headers = detect_headers(head_types, best_sqltypes);
2973  if (has_headers) {
2975  } else {
2977  }
2978  break;
2980  has_headers = false;
2981  break;
2983  has_headers = true;
2984  break;
2985  }
2986 }
2987 
2990 }
2991 
2992 std::vector<SQLTypes> Detector::find_best_sqltypes(
2993  const std::vector<std::vector<std::string>>& raw_rows,
2994  const CopyParams& copy_params) {
2995  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
2996 }
2997 
2998 std::vector<SQLTypes> Detector::find_best_sqltypes(
2999  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3000  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3001  const CopyParams& copy_params) {
3002  if (raw_rows.size() < 1) {
3003  throw std::runtime_error("No rows found in: " +
3004  boost::filesystem::basename(file_path));
3005  }
3006  auto end_time = std::chrono::steady_clock::now() + timeout;
3007  size_t num_cols = raw_rows.front().size();
3008  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3009  std::vector<size_t> non_null_col_counts(num_cols, 0);
3010  for (auto row = row_begin; row != row_end; row++) {
3011  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3012  best_types.push_back(kCHAR);
3013  non_null_col_counts.push_back(0);
3014  }
3015  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3016  // do not count nulls
3017  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3018  continue;
3019  }
3020  SQLTypes t = detect_sqltype(row->at(col_idx));
3021  non_null_col_counts[col_idx]++;
3022  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3023  best_types[col_idx] = t;
3024  }
3025  }
3026  if (std::chrono::steady_clock::now() > end_time) {
3027  break;
3028  }
3029  }
3030  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3031  // if we don't have any non-null values for this column make it text to be
3032  // safe b/c that is least restrictive type
3033  if (non_null_col_counts[col_idx] == 0) {
3034  best_types[col_idx] = kTEXT;
3035  }
3036  }
3037 
3038  return best_types;
3039 }
3040 
3041 std::vector<EncodingType> Detector::find_best_encodings(
3042  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3043  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3044  const std::vector<SQLTypes>& best_types) {
3045  if (raw_rows.size() < 1) {
3046  throw std::runtime_error("No rows found in: " +
3047  boost::filesystem::basename(file_path));
3048  }
3049  size_t num_cols = best_types.size();
3050  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3051  std::vector<size_t> num_rows_per_col(num_cols, 1);
3052  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3053  for (auto row = row_begin; row != row_end; row++) {
3054  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3055  if (IS_STRING(best_types[col_idx])) {
3056  count_set[col_idx].insert(row->at(col_idx));
3057  num_rows_per_col[col_idx]++;
3058  }
3059  }
3060  }
3061  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3062  if (IS_STRING(best_types[col_idx])) {
3063  float uniqueRatio =
3064  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3065  if (uniqueRatio < 0.75) {
3066  best_encodes[col_idx] = kENCODING_DICT;
3067  }
3068  }
3069  }
3070  return best_encodes;
3071 }
3072 
3073 // detect_headers returns true if:
3074 // - all elements of the first argument are kTEXT
3075 // - there is at least one instance where tail_types is more restrictive than head_types
3076 // (ie, not kTEXT)
3077 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3078  const std::vector<SQLTypes>& tail_types) {
3079  if (head_types.size() != tail_types.size()) {
3080  return false;
3081  }
3082  bool has_headers = false;
3083  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3084  if (head_types[col_idx] != kTEXT) {
3085  return false;
3086  }
3087  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3088  }
3089  return has_headers;
3090 }
3091 
3092 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3093  n = std::min(n, raw_rows.size());
3094  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3095  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3096  raw_rows.begin() + n);
3097  return sample_rows;
3098 }
3099 
3100 std::vector<std::string> Detector::get_headers() {
3101  std::vector<std::string> headers(best_sqltypes.size());
3102  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3103  if (has_headers && i < raw_rows[0].size()) {
3104  headers[i] = raw_rows[0][i];
3105  } else {
3106  headers[i] = "column_" + std::to_string(i + 1);
3107  }
3108  }
3109  return headers;
3110 }
3111 
3112 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3113  size_t row_count) {
3114  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3115  load_failed = true;
3116  }
3117 }
3118 
3119 void Importer::checkpoint(const int32_t start_epoch) {
3120  if (load_failed) {
3121  // rollback to starting epoch - undo all the added records
3122  loader->setTableEpoch(start_epoch);
3123  } else {
3124  loader->checkpoint();
3125  }
3126 
3127  if (loader->getTableDesc()->persistenceLevel ==
3128  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3129  auto ms = measure<>::execution([&]() {
3130  if (!load_failed) {
3131  for (auto& p : import_buffers_vec[0]) {
3132  if (!p->stringDictCheckpoint()) {
3133  LOG(ERROR) << "Checkpointing Dictionary for Column "
3134  << p->getColumnDesc()->columnName << " failed.";
3135  load_failed = true;
3136  break;
3137  }
3138  }
3139  }
3140  });
3141  if (DEBUG_TIMING) {
3142  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3143  << std::endl;
3144  }
3145  }
3146 }
3147 
3149  // in generalized importing scheme, reaching here file_path may
3150  // contain a file path, a url or a wildcard of file paths.
3151  // see CopyTableStmt::execute.
3152  auto file_paths = mapd_glob(file_path);
3153  if (file_paths.size() == 0) {
3154  file_paths.push_back(file_path);
3155  }
3156 
3157  // sum up sizes of all local files -- only for local files. if
3158  // file_path is a s3 url, sizes will be obtained via S3Archive.
3159  for (const auto& file_path : file_paths) {
3161  }
3162 
3163 #ifdef ENABLE_IMPORT_PARQUET
3164  // s3 parquet goes different route because the files do not use libarchive
3165  // but parquet api, and they need to landed like .7z files.
3166  //
3167  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3168  // because for example spark sql users may specify a output url w/o file
3169  // extension like this:
3170  // df.write
3171  // .mode("overwrite")
3172  // .parquet("s3://bucket/folder/parquet/mydata")
3173  // without the parameter, it means plain or compressed csv files.
3174  // note: .ORC and AVRO files should follow a similar path to Parquet?
3175  if (copy_params.file_type == FileType::PARQUET) {
3176  import_parquet(file_paths);
3177  } else
3178 #endif
3179  {
3180  import_compressed(file_paths);
3181  }
3182  return import_status;
3183 }
3184 
3185 #ifdef ENABLE_IMPORT_PARQUET
3186 inline auto open_parquet_table(const std::string& file_path,
3187  std::shared_ptr<arrow::io::ReadableFile>& infile,
3188  std::unique_ptr<parquet::arrow::FileReader>& reader,
3189  std::shared_ptr<arrow::Table>& table) {
3190  using namespace parquet::arrow;
3191  using ReadableFile = arrow::io::ReadableFile;
3192  auto mempool = arrow::default_memory_pool();
3193  PARQUET_THROW_NOT_OK(ReadableFile::Open(file_path, mempool, &infile));
3194  PARQUET_THROW_NOT_OK(OpenFile(infile, mempool, &reader));
3195  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3196  const auto num_row_groups = reader->num_row_groups();
3197  const auto num_columns = table->num_columns();
3198  const auto num_rows = table->num_rows();
3199  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3200  << " columns in " << num_row_groups << " groups.";
3201  return std::make_tuple(num_row_groups, num_columns, num_rows);
3202 }
3203 
3204 void Detector::import_local_parquet(const std::string& file_path) {
3205  std::shared_ptr<arrow::io::ReadableFile> infile;
3206  std::unique_ptr<parquet::arrow::FileReader> reader;
3207  std::shared_ptr<arrow::Table> table;
3208  int num_row_groups, num_columns;
3209  int64_t num_rows;
3210  std::tie(num_row_groups, num_columns, num_rows) =
3211  open_parquet_table(file_path, infile, reader, table);
3212  // make up header line if not yet
3213  if (0 == raw_data.size()) {
3215  copy_params.line_delim = '\n';
3216  copy_params.delimiter = ',';
3217  // must quote values to skip any embedded delimiter
3218  copy_params.quoted = true;
3219  copy_params.quote = '"';
3220  copy_params.escape = '"';
3221  for (int c = 0; c < num_columns; ++c) {
3222  if (c) {
3224  }
3225  raw_data += table->column(c)->name();
3226  }
3228  }
3229  // make up raw data... rowwize...
3230  const ColumnDescriptor cd;
3231  for (int g = 0; g < num_row_groups; ++g) {
3232  // data is columnwise
3233  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3234  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3235  arrays.resize(num_columns);
3236  for (int c = 0; c < num_columns; ++c) {
3237  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3238  for (auto chunk : arrays[c]->chunks()) {
3239  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3240  }
3241  }
3242  for (int r = 0; r < num_rows; ++r) {
3243  for (int c = 0; c < num_columns; ++c) {
3244  std::vector<std::string> buffer;
3245  for (auto chunk : arrays[c]->chunks()) {
3246  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3247  if (c) {
3249  }
3250  if (!chunk->IsNull(r)) {
3252  raw_data += boost::replace_all_copy(
3253  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3255  }
3256  }
3257  }
3259  if (++import_status.rows_completed >= 10000) {
3260  // as if load truncated
3262  load_failed = true;
3263  return;
3264  }
3265  }
3266  }
3267 }
3268 
3269 template <typename DATA_TYPE>
3270 auto TypedImportBuffer::del_values(std::vector<DATA_TYPE>& buffer,
3271  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
3272  const auto old_size = buffer.size();
3273  // erase backward to minimize memory movement overhead
3274  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3275  ++rit) {
3276  buffer.erase(buffer.begin() + *rit);
3277  }
3278  return std::make_tuple(old_size, buffer.size());
3279 }
3280 
3282  BadRowsTracker* const bad_rows_tracker) {
3283  switch (type) {
3284  case kBOOLEAN:
3285  return del_values(*bool_buffer_, bad_rows_tracker);
3286  case kTINYINT:
3287  return del_values(*tinyint_buffer_, bad_rows_tracker);
3288  case kSMALLINT:
3289  return del_values(*smallint_buffer_, bad_rows_tracker);
3290  case kINT:
3291  return del_values(*int_buffer_, bad_rows_tracker);
3292  case kBIGINT:
3293  case kNUMERIC:
3294  case kDECIMAL:
3295  case kTIME:
3296  case kTIMESTAMP:
3297  case kDATE:
3298  return del_values(*bigint_buffer_, bad_rows_tracker);
3299  case kFLOAT:
3300  return del_values(*float_buffer_, bad_rows_tracker);
3301  case kDOUBLE:
3302  return del_values(*double_buffer_, bad_rows_tracker);
3303  case kTEXT:
3304  case kVARCHAR:
3305  case kCHAR:
3306  return del_values(*string_buffer_, bad_rows_tracker);
3307  case kPOINT:
3308  case kLINESTRING:
3309  case kPOLYGON:
3310  case kMULTIPOLYGON:
3311  return del_values(*geo_string_buffer_, bad_rows_tracker);
3312  case kARRAY:
3313  return del_values(*array_buffer_, bad_rows_tracker);
3314  default:
3315  throw std::runtime_error("Invalid Type");
3316  }
3317 }
3318 
3319 void Importer::import_local_parquet(const std::string& file_path) {
3320  std::shared_ptr<arrow::io::ReadableFile> infile;
3321  std::unique_ptr<parquet::arrow::FileReader> reader;
3322  std::shared_ptr<arrow::Table> table;
3323  int num_row_groups, num_columns;
3324  int64_t nrow_in_file;
3325  std::tie(num_row_groups, num_columns, nrow_in_file) =
3326  open_parquet_table(file_path, infile, reader, table);
3327  // column_list has no $deleted
3328  const auto& column_list = get_column_descs();
3329  // for now geo columns expect a wkt string
3330  std::vector<const ColumnDescriptor*> cds;
3331  int num_physical_cols = 0;
3332  for (auto& cd : column_list) {
3333  cds.push_back(cd);
3334  num_physical_cols += cd->columnType.get_physical_cols();
3335  }
3336  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3337  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3338  std::to_string(num_columns) + " columns in file vs " +
3339  std::to_string(column_list.size() - num_physical_cols) +
3340  " columns in table.");
3341  // slice each group to import slower columns faster, eg. geo or string
3343  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3344  // init row estimate for this file
3345  const auto filesize = get_filesize(file_path);
3346  size_t nrow_completed{0};
3347  file_offsets.push_back(0);
3348  // map logic column index to physical column index
3349  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3350  int physical_col_idx = 0;
3351  for (int i = 0; i < logic_col_idx; ++i) {
3352  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3353  }
3354  return physical_col_idx;
3355  };
3356  // load a file = nested iteration of row groups, row slices and logical columns
3357  auto ms_load_a_file = measure<>::execution([&]() {
3358  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3359  // a sliced row group will be handled like a (logic) parquet file, with
3360  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3361  import_buffers_vec.resize(num_slices);
3362  for (int slice = 0; slice < num_slices; slice++) {
3363  import_buffers_vec[slice].clear();
3364  for (const auto cd : cds) {
3365  import_buffers_vec[slice].emplace_back(
3366  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3367  }
3368  }
3369  /*
3370  * A caveat here is: Parquet files or arrow data is imported column wise.
3371  * Unlike importing row-wise csv files, a error on any row of any column
3372  * forces to give up entire row group of all columns, unless there is a
3373  * sophisticated method to trace erroneous rows in individual columns so
3374  * that we can union bad rows and drop them from corresponding
3375  * import_buffers_vec; otherwise, we may exceed maximum number of
3376  * truncated rows easily even with very sparse errors in the files.
3377  */
3378  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3379  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3380  auto& bad_rows_tracker = bad_rows_trackers[slice];
3381  bad_rows_tracker.file_name = file_path;
3382  bad_rows_tracker.row_group = slice;
3383  bad_rows_tracker.importer = this;
3384  }
3385  // process arrow arrays to import buffers
3386  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3387  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3388  const auto cd = cds[physical_col_idx];
3389  std::shared_ptr<arrow::ChunkedArray> array;
3390  PARQUET_THROW_NOT_OK(
3391  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3392  const size_t array_size = array->length();
3393  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3394  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3395  for (int slice = 0; slice < num_slices; ++slice) {
3396  thread_controller.startThread([&, slice] {
3397  const auto slice_offset = slice % num_slices;
3398  ArraySliceRange slice_range(
3399  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3400  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3401  auto& bad_rows_tracker = bad_rows_trackers[slice];
3402  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3403  import_buffer->import_buffers = &import_buffers_vec[slice];
3404  import_buffer->col_idx = physical_col_idx + 1;
3405  for (auto chunk : array->chunks()) {
3406  import_buffer->add_arrow_values(
3407  cd, *chunk, false, slice_range, &bad_rows_tracker);
3408  }
3409  });
3410  }
3411  thread_controller.finish();
3412  }
3413  std::vector<size_t> nrow_in_slice_raw(num_slices);
3414  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3415  // trim bad rows from import buffers
3416  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3417  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3418  const auto cd = cds[physical_col_idx];
3419  for (int slice = 0; slice < num_slices; ++slice) {
3420  auto& bad_rows_tracker = bad_rows_trackers[slice];
3421  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3422  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3423  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3424  }
3425  }
3426  // flush slices of this row group to chunks
3427  for (int slice = 0; slice < num_slices; ++slice) {
3428  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3429  }
3430  // update import stats
3431  const auto nrow_original =
3432  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3433  const auto nrow_imported =
3434  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3435  nrow_in_slice_successfully_loaded.end(),
3436  0);
3437  const auto nrow_dropped = nrow_original - nrow_imported;
3438  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3439  << " rows, drop " << nrow_dropped << " rows.";
3440  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3441  import_status.rows_completed += nrow_imported;
3442  import_status.rows_rejected += nrow_dropped;
3445  load_failed = true;
3446  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3447  << ") rows rejected exceeded. Halting load.";
3448  }
3449  // row estimate
3450  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3451  nrow_completed += nrow_imported;
3452  file_offsets.back() =
3453  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3454  // sum up current total file offsets
3455  const auto total_file_offset =
3456  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3457  // estimate number of rows per current total file offset
3458  if (total_file_offset) {
3460  (float)total_file_size / total_file_offset * import_status.rows_completed;
3461  VLOG(3) << "rows_completed " << import_status.rows_completed
3462  << ", rows_estimated " << import_status.rows_estimated
3463  << ", total_file_size " << total_file_size << ", total_file_offset "
3464  << total_file_offset;
3465  }
3466  }
3467  });
3468  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3469  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3470 }
3471 
3472 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3473  auto importer = dynamic_cast<Importer*>(this);
3474  auto start_epoch = importer ? importer->getLoader()->getTableEpoch() : 0;
3475  try {
3476  std::exception_ptr teptr;
3477  // file_paths may contain one local file path, a list of local file paths
3478  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3479  for (auto const& file_path : file_paths) {
3480  std::map<int, std::string> url_parts;
3481  Archive::parse_url(file_path, url_parts);
3482 
3483  // for a s3 url we need to know the obj keys that it comprises
3484  std::vector<std::string> objkeys;
3485  std::unique_ptr<S3ParquetArchive> us3arch;
3486  if ("s3" == url_parts[2]) {
3487 #ifdef HAVE_AWS_S3
3488  us3arch.reset(new S3ParquetArchive(file_path,
3494  us3arch->init_for_read();
3495  total_file_size += us3arch->get_total_file_size();
3496  objkeys = us3arch->get_objkeys();
3497 #else
3498  throw std::runtime_error("AWS S3 support not available");
3499 #endif // HAVE_AWS_S3
3500  } else {
3501  objkeys.emplace_back(file_path);
3502  }
3503 
3504  // for each obj key of a s3 url we need to land it before
3505  // importing it like doing with a 'local file'.
3506  for (auto const& objkey : objkeys) {
3507  try {
3508  auto file_path =
3509  us3arch
3510  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3511  : objkey;
3512  import_local_parquet(file_path);
3513  if (us3arch) {
3514  us3arch->vacuum(objkey);
3515  }
3516  } catch (...) {
3517  if (us3arch) {
3518  us3arch->vacuum(objkey);
3519  }
3520  throw;
3521  }
3523  break;
3524  }
3525  }
3526  }
3527  // rethrow any exception happened herebefore
3528  if (teptr) {
3529  std::rethrow_exception(teptr);
3530  }
3531  } catch (...) {
3532  load_failed = true;
3534  throw;
3535  }
3536  }
3537 
3538  if (importer) {
3539  importer->checkpoint(start_epoch);
3540  }
3541 }
3542 #endif // ENABLE_IMPORT_PARQUET
3543 
3544 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3545  // a new requirement is to have one single input stream into
3546  // Importer::importDelimited, so need to move pipe related
3547  // stuff to the outmost block.
3548  int fd[2];
3549  if (pipe(fd) < 0) {
3550  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3551  }
3552  signal(SIGPIPE, SIG_IGN);
3553 
3554  std::exception_ptr teptr;
3555  // create a thread to read uncompressed byte stream out of pipe and
3556  // feed into importDelimited()
3557  ImportStatus ret;
3558  auto th_pipe_reader = std::thread([&]() {
3559  try {
3560  // importDelimited will read from FILE* p_file
3561  if (0 == (p_file = fdopen(fd[0], "r"))) {
3562  throw std::runtime_error(std::string("failed to open a pipe: ") +
3563  strerror(errno));
3564  }
3565 
3566  // in future, depending on data types of this uncompressed stream
3567  // it can be feed into other function such like importParquet, etc
3568  ret = importDelimited(file_path, true);
3569  } catch (...) {
3570  if (!teptr) { // no replace
3571  teptr = std::current_exception();
3572  }
3573  }
3574 
3575  if (p_file) {
3576  fclose(p_file);
3577  }
3578  p_file = 0;
3579  });
3580 
3581  // create a thread to iterate all files (in all archives) and
3582  // forward the uncompressed byte stream to fd[1] which is
3583  // then feed into importDelimited, importParquet, and etc.
3584  auto th_pipe_writer = std::thread([&]() {
3585  std::unique_ptr<S3Archive> us3arch;
3586  bool stop = false;
3587  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3588  try {
3589  auto file_path = file_paths[fi];
3590  std::unique_ptr<Archive> uarch;
3591  std::map<int, std::string> url_parts;
3592  Archive::parse_url(file_path, url_parts);
3593  const std::string S3_objkey_url_scheme = "s3ok";
3594  if ("file" == url_parts[2] || "" == url_parts[2]) {
3595  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3596  } else if ("s3" == url_parts[2]) {
3597 #ifdef HAVE_AWS_S3
3598  // new a S3Archive with a shared s3client.
3599  // should be safe b/c no wildcard with s3 url
3600  us3arch.reset(new S3Archive(file_path,
3606  us3arch->init_for_read();
3607  total_file_size += us3arch->get_total_file_size();
3608  // not land all files here but one by one in following iterations
3609  for (const auto& objkey : us3arch->get_objkeys()) {
3610  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3611  }
3612  continue;
3613 #else
3614  throw std::runtime_error("AWS S3 support not available");
3615 #endif // HAVE_AWS_S3
3616  } else if (S3_objkey_url_scheme == url_parts[2]) {
3617 #ifdef HAVE_AWS_S3
3618  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3619  auto file_path =
3620  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3621  if (0 == file_path.size()) {
3622  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3623  }
3624  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3625  // file not removed until file closed
3626  us3arch->vacuum(objkey);
3627 #else
3628  throw std::runtime_error("AWS S3 support not available");
3629 #endif // HAVE_AWS_S3
3630  }
3631 #if 0 // TODO(ppan): implement and enable any other archive class
3632  else
3633  if ("hdfs" == url_parts[2])
3634  uarch.reset(new HdfsArchive(file_path));
3635 #endif
3636  else {
3637  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3638  }
3639 
3640  // init the archive for read
3641  auto& arch = *uarch;
3642 
3643  // coming here, the archive of url should be ready to be read, unarchived
3644  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3645  const void* buf;
3646  size_t size;
3647  bool just_saw_archive_header;
3648  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3649  bool first_text_header_skipped = false;
3650  // start reading uncompressed bytes of this archive from libarchive
3651  // note! this archive may contain more than one files!
3652  file_offsets.push_back(0);
3653  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3654  bool insert_line_delim_after_this_file = false;
3655  while (!stop) {
3656  int64_t offset{-1};
3657  auto ok = arch.read_data_block(&buf, &size, &offset);
3658  // can't use (uncompressed) size, so track (max) file offset.
3659  // also we want to capture offset even on e.o.f.
3660  if (offset > 0) {
3661  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3662  file_offsets.back() = offset;
3663  }
3664  if (!ok) {
3665  break;
3666  }
3667  // one subtle point here is now we concatenate all files
3668  // to a single FILE stream with which we call importDelimited
3669  // only once. this would make it misunderstand that only one
3670  // header line is with this 'single' stream, while actually
3671  // we may have one header line for each of the files.
3672  // so we need to skip header lines here instead in importDelimited.
3673  const char* buf2 = (const char*)buf;
3674  int size2 = size;
3676  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3677  while (size2-- > 0) {
3678  if (*buf2++ == copy_params.line_delim) {
3679  break;
3680  }
3681  }
3682  if (size2 <= 0) {
3683  LOG(WARNING) << "No line delimiter in block." << std::endl;
3684  } else {
3685  just_saw_archive_header = false;
3686  first_text_header_skipped = true;
3687  }
3688  }
3689  // In very rare occasions the write pipe somehow operates in a mode similar to
3690  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3691  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3692  // loop reading till no bytes left, otherwise the annoying `failed to write
3693  // pipe: Success`...
3694  if (size2 > 0) {
3695  int nremaining = size2;
3696  while (nremaining > 0) {
3697  // try to write the entire remainder of the buffer to the pipe
3698  int nwritten = write(fd[1], buf2, nremaining);
3699  // how did we do?
3700  if (nwritten < 0) {
3701  // something bad happened
3702  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3703  // ignore these, assume nothing written, try again
3704  nwritten = 0;
3705  } else {
3706  // a real error
3707  throw std::runtime_error(
3708  std::string("failed or interrupted write to pipe: ") +
3709  strerror(errno));
3710  }
3711  } else if (nwritten == nremaining) {
3712  // we wrote everything; we're done
3713  break;
3714  }
3715  // only wrote some (or nothing), try again
3716  nremaining -= nwritten;
3717  buf2 += nwritten;
3718  // no exception when too many rejected
3719  // @simon.eves how would this get set? from the other thread? mutex
3720  // needed?
3722  stop = true;
3723  break;
3724  }
3725  }
3726  // check that this file (buf for size) ended with a line delim
3727  if (size > 0) {
3728  const char* plast = static_cast<const char*>(buf) + (size - 1);
3729  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3730  }
3731  }
3732  }
3733  // if that file didn't end with a line delim, we insert one here to terminate
3734  // that file's stream use a loop for the same reason as above
3735  if (insert_line_delim_after_this_file) {
3736  while (true) {
3737  // write the delim char to the pipe
3738  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3739  // how did we do?
3740  if (nwritten < 0) {
3741  // something bad happened
3742  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3743  // ignore these, assume nothing written, try again
3744  nwritten = 0;
3745  } else {
3746  // a real error
3747  throw std::runtime_error(
3748  std::string("failed or interrupted write to pipe: ") +
3749  strerror(errno));
3750  }
3751  } else if (nwritten == 1) {
3752  // we wrote it; we're done
3753  break;
3754  }
3755  }
3756  }
3757  }
3758  } catch (...) {
3759  // when import is aborted because too many data errors or because end of a
3760  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3761  // suppressed.
3762  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3764  break;
3765  }
3766  if (import_status.rows_completed > 0) {
3767  if (nullptr != dynamic_cast<Detector*>(this)) {
3768  break;
3769  }
3770  }
3771  if (!teptr) { // no replace
3772  teptr = std::current_exception();
3773  }
3774  break;
3775  }
3776  }
3777  // close writer end
3778  close(fd[1]);
3779  });
3780 
3781  th_pipe_reader.join();
3782  th_pipe_writer.join();
3783 
3784  // rethrow any exception happened herebefore
3785  if (teptr) {
3786  std::rethrow_exception(teptr);
3787  }
3788 }
3789 
3792 }
3793 
3794 ImportStatus Importer::importDelimited(const std::string& file_path,
3795  const bool decompressed) {
3796  bool load_truncated = false;
3798 
3799  if (!p_file) {
3800  p_file = fopen(file_path.c_str(), "rb");
3801  }
3802  if (!p_file) {
3803  throw std::runtime_error("failed to open file '" + file_path +
3804  "': " + strerror(errno));
3805  }
3806 
3807  if (!decompressed) {
3808  (void)fseek(p_file, 0, SEEK_END);
3809  file_size = ftell(p_file);
3810  }
3811 
3812  if (copy_params.threads == 0) {
3813  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3814  } else {
3815  max_threads = static_cast<size_t>(copy_params.threads);
3816  }
3817 
3818  // deal with small files
3819  size_t alloc_size = copy_params.buffer_size;
3820  if (!decompressed && file_size < alloc_size) {
3821  alloc_size = file_size;
3822  }
3823 
3824  for (size_t i = 0; i < max_threads; i++) {
3825  import_buffers_vec.emplace_back();
3826  for (const auto cd : loader->get_column_descs()) {
3827  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3828  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3829  }
3830  }
3831 
3832  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3833  size_t current_pos = 0;
3834  size_t end_pos;
3835  size_t begin_pos = 0;
3836 
3837  (void)fseek(p_file, current_pos, SEEK_SET);
3838  size_t size =
3839  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3840 
3841  // make render group analyzers for each poly column
3842  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3844  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3845  loader->getTableDesc()->tableId, false, false, false);
3846  for (auto cd : columnDescriptors) {
3847  SQLTypes ct = cd->columnType.get_type();
3848  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3849  auto rga = std::make_shared<RenderGroupAnalyzer>();
3850  rga->seedFromExistingTableContents(loader, cd->columnName);
3851  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3852  }
3853  }
3854  }
3855 
3856  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3857  loader->getTableDesc()->tableId};
3858  auto start_epoch = loader->getTableEpoch();
3859  {
3860  std::list<std::future<ImportStatus>> threads;
3861 
3862  // use a stack to track thread_ids which must not overlap among threads
3863  // because thread_id is used to index import_buffers_vec[]
3864  std::stack<size_t> stack_thread_ids;
3865  for (size_t i = 0; i < max_threads; i++) {
3866  stack_thread_ids.push(i);
3867  }
3868  // added for true row index on error
3869  size_t first_row_index_this_buffer = 0;
3870 
3871  while (size > 0) {
3872  unsigned int num_rows_this_buffer = 0;
3873  CHECK(scratch_buffer);
3875  scratch_buffer.get(), size, copy_params, num_rows_this_buffer);
3876 
3877  // unput residual
3878  int nresidual = size - end_pos;
3879  std::unique_ptr<char[]> unbuf;
3880  if (nresidual > 0) {
3881  unbuf = std::make_unique<char[]>(nresidual);
3882  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3883  }
3884 
3885  // get a thread_id not in use
3886  auto thread_id = stack_thread_ids.top();
3887  stack_thread_ids.pop();
3888  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3889 
3890  threads.push_back(std::async(std::launch::async,
3892  thread_id,
3893  this,
3894  std::move(scratch_buffer),
3895  begin_pos,
3896  end_pos,
3897  end_pos,
3898  columnIdToRenderGroupAnalyzerMap,
3899  first_row_index_this_buffer));
3900 
3901  first_row_index_this_buffer += num_rows_this_buffer;
3902 
3903  current_pos += end_pos;
3904  scratch_buffer = std::make_unique<char[]>(alloc_size);
3905  CHECK(scratch_buffer);
3906  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3907  size = nresidual + fread(scratch_buffer.get() + nresidual,
3908  1,
3909  copy_params.buffer_size - nresidual,
3910  p_file);
3911 
3912  begin_pos = 0;
3913 
3914  while (threads.size() > 0) {
3915  int nready = 0;
3916  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3917  it != threads.end();) {
3918  auto& p = *it;
3919  std::chrono::milliseconds span(
3920  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3921  if (p.wait_for(span) == std::future_status::ready) {
3922  auto ret_import_status = p.get();
3923  import_status += ret_import_status;
3924  // sum up current total file offsets
3925  size_t total_file_offset{0};
3926  if (decompressed) {
3927  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3928  for (const auto file_offset : file_offsets) {
3929  total_file_offset += file_offset;
3930  }
3931  }
3932  // estimate number of rows per current total file offset
3933  if (decompressed ? total_file_offset : current_pos) {
3935  (decompressed ? (float)total_file_size / total_file_offset
3936  : (float)file_size / current_pos) *
3938  }
3939  VLOG(3) << "rows_completed " << import_status.rows_completed
3940  << ", rows_estimated " << import_status.rows_estimated
3941  << ", total_file_size " << total_file_size << ", total_file_offset "
3942  << total_file_offset;
3944  // recall thread_id for reuse
3945  stack_thread_ids.push(ret_import_status.thread_id);
3946  threads.erase(it++);
3947  ++nready;
3948  } else {
3949  ++it;
3950  }
3951  }
3952 
3953  if (nready == 0) {
3954  std::this_thread::yield();
3955  }
3956 
3957  // on eof, wait all threads to finish
3958  if (0 == size) {
3959  continue;
3960  }
3961 
3962  // keep reading if any free thread slot
3963  // this is one of the major difference from old threading model !!
3964  if (threads.size() < max_threads) {
3965  break;
3966  }
3967  }
3968 
3969  if (import_status.rows_rejected > copy_params.max_reject) {
3970  load_truncated = true;
3971  load_failed = true;
3972  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
3973  break;
3974  }
3975  if (load_failed) {
3976  load_truncated = true;
3977  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
3978  "more details";
3979  break;
3980  }
3981  }
3982 
3983  // join dangling threads in case of LOG(ERROR) above
3984  for (auto& p : threads) {
3985  p.wait();
3986  }
3987  }
3988 
3989  checkpoint(start_epoch);
3990 
3991  // must set import_status.load_truncated before closing this end of pipe
3992  // otherwise, the thread on the other end would throw an unwanted 'write()'
3993  // exception
3994  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3995  import_status.load_truncated = load_truncated;
3996 
3997  fclose(p_file);
3998  p_file = nullptr;
3999  return import_status;
4000 }
4001 
4003  if (getTableDesc()->persistenceLevel ==
4004  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
4005  getCatalog().checkpoint(getTableDesc()->tableId);
4006  }
4007 }
4008 
4010  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
4011  getTableDesc()->tableId);
4012 }
4013 
4014 void Loader::setTableEpoch(const int32_t start_epoch) {
4016  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
4017 }
4018 
4019 void GDALErrorHandler(CPLErr eErrClass, int err_no, const char* msg) {
4020  CHECK(eErrClass >= CE_None && eErrClass <= CE_Fatal);
4021  static const char* errClassStrings[5] = {
4022  "Info",
4023  "Debug",
4024  "Warning",
4025  "Failure",
4026  "Fatal",
4027  };
4028  std::string log_msg = std::string("GDAL ") + errClassStrings[eErrClass] +
4029  std::string(": ") + msg + std::string(" (") +
4030  std::to_string(err_no) + std::string(")");
4031  if (eErrClass >= CE_Failure) {
4032  throw std::runtime_error(log_msg);
4033  } else {
4034  LOG(INFO) << log_msg;
4035  }
4036 }
4037 
4038 /* static */
4039 std::mutex Importer::init_gdal_mutex;
4040 
4041 /* static */
4043  // this should not be called from multiple threads, but...
4044  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
4045  // init under mutex
4046  static bool gdal_initialized = false;
4047  if (!gdal_initialized) {
4048  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
4049  setenv("GDAL_DATA",
4050  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
4051  true);
4052 
4053  // configure SSL certificate path (per S3Archive::init_for_read)
4054  // in a production build, GDAL and Curl will have been built on
4055  // CentOS, so the baked-in system path will be wrong for Ubuntu
4056  // and other Linux distros. Unless the user is deliberately
4057  // overriding it by setting SSL_CERT_FILE explicitly in the server
4058  // environment, we set it to whichever CA bundle directory exists
4059  // on the machine we're running on
4060  std::list<std::string> v_known_ca_paths({
4061  "/etc/ssl/certs/ca-certificates.crt",
4062  "/etc/pki/tls/certs/ca-bundle.crt",
4063  "/usr/share/ssl/certs/ca-bundle.crt",
4064  "/usr/local/share/certs/ca-root.crt",
4065  "/etc/ssl/cert.pem",
4066  "/etc/ssl/ca-bundle.pem",
4067  });
4068  for (const auto& known_ca_path : v_known_ca_paths) {
4069  if (boost::filesystem::exists(known_ca_path)) {
4070  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4071  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4072  break;
4073  }
4074  }
4075 
4076  GDALAllRegister();
4077  OGRRegisterAll();
4078  CPLSetErrorHandler(*GDALErrorHandler);
4079  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4080  gdal_initialized = true;
4081  }
4082 }
4083 
4085  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4086 }
4087 
4088 /* static */
4090  // for now we only support S3
4091  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4092  // only set if non-empty to allow GDAL defaults to persist
4093  // explicitly clear if empty to revert to default and not reuse a previous session's
4094  // keys
4095  if (copy_params.s3_region.size()) {
4096 #if DEBUG_AWS_AUTHENTICATION
4097  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4098 #endif
4099  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4100  } else {
4101 #if DEBUG_AWS_AUTHENTICATION
4102  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4103 #endif
4104  CPLSetConfigOption("AWS_REGION", nullptr);
4105  }
4106  if (copy_params.s3_endpoint.size()) {
4107 #if DEBUG_AWS_AUTHENTICATION
4108  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4109 #endif
4110  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4111  } else {
4112 #if DEBUG_AWS_AUTHENTICATION
4113  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4114 #endif
4115  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4116  }
4117  if (copy_params.s3_access_key.size()) {
4118 #if DEBUG_AWS_AUTHENTICATION
4119  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4120  << "'";
4121 #endif
4122  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4123  } else {
4124 #if DEBUG_AWS_AUTHENTICATION
4125  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4126 #endif
4127  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4128  }
4129  if (copy_params.s3_secret_key.size()) {
4130 #if DEBUG_AWS_AUTHENTICATION
4131  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4132  << "'";
4133 #endif
4134  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4135  } else {
4136 #if DEBUG_AWS_AUTHENTICATION
4137  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4138 #endif
4139  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4140  }
4141 
4142 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4143  // if we haven't set keys, we need to disable signed access
4144  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4145 #if DEBUG_AWS_AUTHENTICATION
4146  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4147 #endif
4148  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4149  } else {
4150 #if DEBUG_AWS_AUTHENTICATION
4151  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4152 #endif
4153  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4154  }
4155 #endif
4156 }
4157 
4158 /* static */
4159 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4160  const CopyParams& copy_params) {
4161  // lazy init GDAL
4162  initGDAL();
4163 
4164  // set authorization tokens
4165  setGDALAuthorizationTokens(copy_params);
4166 
4167  // open the file
4168  OGRDataSource* poDS;
4169 #if GDAL_VERSION_MAJOR == 1
4170  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4171 #else
4172  poDS = (OGRDataSource*)GDALOpenEx(
4173  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4174  if (poDS == nullptr) {
4175  poDS = (OGRDataSource*)GDALOpenEx(
4176  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4177  if (poDS) {
4178  LOG(INFO) << "openGDALDataset had to open as read-only";
4179  }
4180  }
4181 #endif
4182  if (poDS == nullptr) {
4183  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4184  }
4185  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4186  // in a memory leak if GDAL successfully opened the input dataset.
4187  return poDS;
4188 }
4189 
4190 namespace {
4191 
4192 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4193  const OGRDataSourceUqPtr& poDS,
4194  const std::string& file_name) {
4195  // get layer with specified name, or default to first layer
4196  OGRLayer* poLayer = nullptr;
4197  if (geo_layer_name.size()) {
4198  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4199  if (poLayer == nullptr) {
4200  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4201  file_name);
4202  }
4203  } else {
4204  poLayer = poDS->GetLayer(0);
4205  if (poLayer == nullptr) {
4206  throw std::runtime_error("No layers found in " + file_name);
4207  }
4208  }
4209  return *poLayer;
4210 }
4211 
4212 } // namespace
4213 
4214 /* static */
4216  const std::string& file_name,
4217  const std::string& geo_column_name,
4218  std::map<std::string, std::vector<std::string>>& metadata,
4219  int rowLimit,
4220  const CopyParams& copy_params) {
4221  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4222  if (poDS == nullptr) {
4223  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4224  file_name);
4225  }
4226 
4227  OGRLayer& layer =
4228  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4229 
4230  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4231  CHECK(poFDefn);
4232 
4233  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4234  auto nFeats = layer.GetFeatureCount();
4235  size_t numFeatures =
4236  std::max(static_cast<decltype(nFeats)>(0),
4237  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4238  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4239  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4240  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4241  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4242  }
4243  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4244  layer.ResetReading();
4245  size_t iFeature = 0;
4246  while (iFeature < numFeatures) {
4247  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4248  if (!poFeature) {
4249  break;
4250  }
4251 
4252  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4253  if (poGeometry != nullptr) {
4254  // validate geom type (again?)
4255  switch (wkbFlatten(poGeometry->getGeometryType())) {
4256  case wkbPoint:
4257  case wkbLineString:
4258  case wkbPolygon:
4259  case wkbMultiPolygon:
4260  break;
4261  default:
4262  throw std::runtime_error("Unsupported geometry type: " +
4263  std::string(poGeometry->getGeometryName()));
4264  }
4265 
4266  // populate metadata for regular fields
4267  for (auto i : metadata) {
4268  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4269  if (iField >= 0) { // geom is -1
4270  metadata[i.first].at(iFeature) =
4271  std::string(poFeature->GetFieldAsString(iField));
4272  }
4273  }
4274 
4275  // populate metadata for geo column with WKT string
4276  char* wkts = nullptr;
4277  poGeometry->exportToWkt(&wkts);
4278  CHECK(wkts);
4279  metadata[geo_column_name].at(iFeature) = wkts;
4280  CPLFree(wkts);
4281  }
4282  iFeature++;
4283  }
4284 }
4285 
4286 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4287  switch (ogr_type) {
4288  case OFTInteger:
4289  return std::make_pair(kINT, false);
4290  case OFTIntegerList:
4291  return std::make_pair(kINT, true);
4292 #if GDAL_VERSION_MAJOR > 1
4293  case OFTInteger64:
4294  return std::make_pair(kBIGINT, false);
4295  case OFTInteger64List:
4296  return std::make_pair(kBIGINT, true);
4297 #endif
4298  case OFTReal:
4299  return std::make_pair(kDOUBLE, false);
4300  case OFTRealList:
4301  return std::make_pair(kDOUBLE, true);
4302  case OFTString:
4303  return std::make_pair(kTEXT, false);
4304  case OFTStringList:
4305  return std::make_pair(kTEXT, true);
4306  case OFTDate:
4307  return std::make_pair(kDATE, false);
4308  case OFTTime:
4309  return std::make_pair(kTIME, false);
4310  case OFTDateTime:
4311  return std::make_pair(kTIMESTAMP, false);
4312  case OFTBinary:
4313  default:
4314  break;
4315  }
4316  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4317 }
4318 
4319 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4320  switch (ogr_type) {
4321  case wkbPoint:
4322  return kPOINT;
4323  case wkbLineString:
4324  return kLINESTRING;
4325  case wkbPolygon:
4326  return kPOLYGON;
4327  case wkbMultiPolygon:
4328  return kMULTIPOLYGON;
4329  default:
4330  break;
4331  }
4332  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4333 }
4334 
4335 /* static */
4336 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4337  const std::string& file_name,
4338  const std::string& geo_column_name,
4339  const CopyParams& copy_params) {
4340  std::list<ColumnDescriptor> cds;
4341 
4342  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4343  if (poDS == nullptr) {
4344  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4345  file_name);
4346  }
4347 
4348  OGRLayer& layer =
4349  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4350 
4351  layer.ResetReading();
4352  // TODO(andrewseidl): support multiple features
4353  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4354  if (poFeature == nullptr) {
4355  throw std::runtime_error("No features found in " + file_name);
4356  }
4357  // get fields as regular columns
4358  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4359  CHECK(poFDefn);
4360  int iField;
4361  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4362  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4363  auto typePair = ogr_to_type(poFieldDefn->GetType());
4364  ColumnDescriptor cd;
4365  cd.columnName = poFieldDefn->GetNameRef();
4366  cd.sourceName = poFieldDefn->GetNameRef();
4367  SQLTypeInfo ti;
4368  if (typePair.second) {
4369  ti.set_type(kARRAY);
4370  ti.set_subtype(typePair.first);
4371  } else {
4372  ti.set_type(typePair.first);
4373  }
4374  if (typePair.first == kTEXT) {
4376  ti.set_comp_param(32);
4377  }
4378  ti.set_fixed_size();
4379  cd.columnType = ti;
4380  cds.push_back(cd);
4381  }
4382  // get geo column, if any
4383  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4384  if (poGeometry) {
4385  ColumnDescriptor cd;
4386  cd.columnName = geo_column_name;
4387  cd.sourceName = geo_column_name;
4388 
4389  // get GDAL type
4390  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4391 
4392  // if exploding, override any collection type to child type
4393  if (copy_params.geo_explode_collections) {
4394  if (ogr_type == wkbMultiPolygon) {
4395  ogr_type = wkbPolygon;
4396  } else if (ogr_type == wkbMultiLineString) {
4397  ogr_type = wkbLineString;
4398  } else if (ogr_type == wkbMultiPoint) {
4399  ogr_type = wkbPoint;
4400  }
4401  }
4402 
4403  // convert to internal type
4404  SQLTypes geoType = ogr_to_type(ogr_type);
4405 
4406  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4408  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4409  }
4410 
4411  // build full internal type
4412  SQLTypeInfo ti;
4413  ti.set_type(geoType);
4414  ti.set_subtype(copy_params.geo_coords_type);
4415  ti.set_input_srid(copy_params.geo_coords_srid);
4416  ti.set_output_srid(copy_params.geo_coords_srid);
4417  ti.set_compression(copy_params.geo_coords_encoding);
4418  ti.set_comp_param(copy_params.geo_coords_comp_param);
4419  cd.columnType = ti;
4420 
4421  cds.push_back(cd);
4422  }
4423  return cds;
4424 }
4425 
4426 bool Importer::gdalStatInternal(const std::string& path,
4427  const CopyParams& copy_params,
4428  bool also_dir) {
4429  // lazy init GDAL
4430  initGDAL();
4431 
4432  // set authorization tokens
4433  setGDALAuthorizationTokens(copy_params);
4434 
4435 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4436  // clear GDAL stat cache
4437  // without this, file existence will be cached, even if authentication changes
4438  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4439  VSICurlClearCache();
4440 #endif
4441 
4442  // stat path
4443  VSIStatBufL sb;
4444  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4445  if (result < 0) {
4446  return false;
4447  }
4448 
4449  // exists?
4450  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4451  return true;
4452  } else if (VSI_ISREG(sb.st_mode)) {
4453  return true;
4454  }
4455  return false;
4456 }
4457 
4458 /* static */
4459 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4460  return gdalStatInternal(path, copy_params, false);
4461 }
4462 
4463 /* static */
4464 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4465  const CopyParams& copy_params) {
4466  return gdalStatInternal(path, copy_params, true);
4467 }
4468 
4469 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4470  std::vector<std::string>& files) {
4471  // prepare to gather subdirectories
4472  std::vector<std::string> subdirectories;
4473 
4474  // get entries
4475  char** entries = VSIReadDir(archive_path.c_str());
4476  if (!entries) {
4477  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4478  return;
4479  }
4480 
4481  // force scope
4482  {
4483  // request clean-up
4484  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4485 
4486  // check all the entries
4487  int index = 0;
4488  while (true) {
4489  // get next entry, or drop out if there isn't one
4490  char* entry_c = entries[index++];
4491  if (!entry_c) {
4492  break;
4493  }
4494  std::string entry(entry_c);
4495 
4496  // ignore '.' and '..'
4497  if (entry == "." || entry == "..") {
4498  continue;
4499  }
4500 
4501  // build the full path
4502  std::string entry_path = archive_path + std::string("/") + entry;
4503 
4504  // is it a file or a sub-folder
4505  VSIStatBufL sb;
4506  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4507  if (result < 0) {
4508  break;
4509  }
4510 
4511  if (VSI_ISDIR(sb.st_mode)) {
4512  // a directory that ends with .gdb could be a Geodatabase bundle
4513  // arguably dangerous to decide this purely by name, but any further
4514  // validation would be very complex especially at this scope
4515  if (boost::iends_with(entry_path, ".gdb")) {
4516  // add the directory as if it was a file and don't recurse into it
4517  files.push_back(entry_path);
4518  } else {
4519  // add subdirectory to be recursed into
4520  subdirectories.push_back(entry_path);
4521  }
4522  } else {
4523  // add this file
4524  files.push_back(entry_path);
4525  }
4526  }
4527  }
4528 
4529  // recurse into each subdirectories we found
4530  for (const auto& subdirectory : subdirectories) {
4531  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4532  }
4533 }
4534 
4535 /* static */
4536 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4537  const std::string& archive_path,
4538  const CopyParams& copy_params) {
4539  // lazy init GDAL
4540  initGDAL();
4541 
4542  // set authorization tokens
4543  setGDALAuthorizationTokens(copy_params);
4544 
4545  // prepare to gather files
4546  std::vector<std::string> files;
4547 
4548  // gather the files recursively
4549  gdalGatherFilesInArchiveRecursive(archive_path, files);
4550 
4551  // convert to relative paths inside archive
4552  for (auto& file : files) {
4553  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4554  }
4555 
4556  // done
4557  return files;
4558 }
4559 
4560 /* static */
4561 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4562  const std::string& file_name,
4563  const CopyParams& copy_params) {
4564  // lazy init GDAL
4565  initGDAL();
4566 
4567  // set authorization tokens
4568  setGDALAuthorizationTokens(copy_params);
4569 
4570  // prepare to gather layer info
4571  std::vector<GeoFileLayerInfo> layer_info;
4572 
4573  // open the data set
4574  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4575  if (poDS == nullptr) {
4576  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4577  file_name);
4578  }
4579 
4580  // enumerate the layers
4581  for (auto&& poLayer : poDS->GetLayers()) {
4583  // prepare to read this layer
4584  poLayer->ResetReading();
4585  // skip layer if empty
4586  if (poLayer->GetFeatureCount() > 0) {
4587  // get first feature
4588  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4589  CHECK(first_feature);
4590  // check feature for geometry
4591  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4592  if (!geometry) {
4593  // layer has no geometry
4594  contents = GeoFileLayerContents::NON_GEO;
4595  } else {
4596  // check the geometry type
4597  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4598  switch (wkbFlatten(geometry_type)) {
4599  case wkbPoint:
4600  case wkbLineString:
4601  case wkbPolygon:
4602  case wkbMultiPolygon:
4603  // layer has supported geo
4604  contents = GeoFileLayerContents::GEO;
4605  break;
4606  default:
4607  // layer has unsupported geometry
4609  break;
4610  }
4611  }
4612  }
4613  // store info for this layer
4614  layer_info.emplace_back(poLayer->GetName(), contents);
4615  }
4616 
4617  // done
4618  return layer_info;
4619 }
4620 
4621 /* static */
4623 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4624  return true;
4625 #else
4626  return false;
4627 #endif
4628 }
4629 
4631  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4632  // initial status
4633  bool load_truncated = false;
4635 
4637  if (poDS == nullptr) {
4638  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4639  file_path);
4640  }
4641 
4642  OGRLayer& layer =
4644 
4645  // get the number of features in this layer
4646  size_t numFeatures = layer.GetFeatureCount();
4647 
4648  // build map of metadata field (additional columns) name to index
4649  // use shared_ptr since we need to pass it to the worker
4650  FieldNameToIndexMapType fieldNameToIndexMap;
4651  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4652  CHECK(poFDefn);
4653  size_t numFields = poFDefn->GetFieldCount();
4654  for (size_t iField = 0; iField < numFields; iField++) {
4655  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4656  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4657  }
4658 
4659  // the geographic spatial reference we want to put everything in
4660  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4661  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4662 
4663 #if GDAL_VERSION_MAJOR >= 3
4664  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4665  // this results in X and Y being transposed for angle-based
4666  // coordinate systems. This restores the previous behavior.
4667  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4668 #endif
4669 
4670 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4671  // just one "thread"
4672  size_t max_threads = 1;
4673 #else
4674  // how many threads to use
4675  size_t max_threads = 0;
4676  if (copy_params.threads == 0) {
4677  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4678  } else {
4679  max_threads = copy_params.threads;
4680  }
4681 #endif
4682 
4683  // make an import buffer for each thread
4684  CHECK_EQ(import_buffers_vec.size(), 0u);
4685  import_buffers_vec.resize(max_threads);
4686  for (size_t i = 0; i < max_threads; i++) {
4687  for (const auto cd : loader->get_column_descs()) {
4688  import_buffers_vec[i].emplace_back(
4689  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4690  }
4691  }
4692 
4693  // make render group analyzers for each poly column
4694  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4696  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4697  loader->getTableDesc()->tableId, false, false, false);
4698  for (auto cd : columnDescriptors) {
4699  SQLTypes ct = cd->columnType.get_type();
4700  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4701  auto rga = std::make_shared<RenderGroupAnalyzer>();
4702  rga->seedFromExistingTableContents(loader, cd->columnName);
4703  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4704  }
4705  }
4706  }
4707 
4708 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4709  // threads
4710  std::list<std::future<ImportStatus>> threads;
4711 
4712  // use a stack to track thread_ids which must not overlap among threads
4713  // because thread_id is used to index import_buffers_vec[]
4714  std::stack<size_t> stack_thread_ids;
4715  for (size_t i = 0; i < max_threads; i++) {
4716  stack_thread_ids.push(i);
4717  }
4718 #endif
4719 
4720  // checkpoint the table
4721  auto start_epoch = loader->getTableEpoch();
4722 
4723  // reset the layer
4724  layer.ResetReading();
4725 
4726  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4727 
4728  // make a features buffer for each thread
4729  std::vector<FeaturePtrVector> features(max_threads);
4730 
4731  // for each feature...
4732  size_t firstFeatureThisChunk = 0;
4733  while (firstFeatureThisChunk < numFeatures) {
4734  // how many features this chunk
4735  size_t numFeaturesThisChunk =
4736  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4737 
4738 // get a thread_id not in use
4739 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4740  size_t thread_id = 0;
4741 #else
4742  auto thread_id = stack_thread_ids.top();
4743  stack_thread_ids.pop();
4744  CHECK(thread_id < max_threads);
4745 #endif
4746 
4747  // fill features buffer for new thread
4748  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4749  features[thread_id].emplace_back(layer.GetNextFeature());
4750  }
4751 
4752 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4753  // call worker function directly
4754  auto ret_import_status = import_thread_shapefile(0,
4755  this,
4756  poGeographicSR.get(),
4757  std::move(features[thread_id]),
4758  firstFeatureThisChunk,
4759  numFeaturesThisChunk,
4760  fieldNameToIndexMap,
4761  columnNameToSourceNameMap,
4762  columnIdToRenderGroupAnalyzerMap);
4763  import_status += ret_import_status;
4764  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4765  import_status.rows_completed;
4766  set_import_status(import_id, import_status);
4767 #else
4768  // fire up that thread to import this geometry
4769  threads.push_back(std::async(std::launch::async,
4771  thread_id,
4772  this,
4773  poGeographicSR.get(),
4774  std::move(features[thread_id]),
4775  firstFeatureThisChunk,
4776  numFeaturesThisChunk,
4777  fieldNameToIndexMap,
4778  columnNameToSourceNameMap,
4779  columnIdToRenderGroupAnalyzerMap));
4780 
4781  // let the threads run
4782  while (threads.size() > 0) {
4783  int nready = 0;
4784  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4785  it != threads.end();) {
4786  auto& p = *it;
4787  std::chrono::milliseconds span(
4788  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4789  if (p.wait_for(span) == std::future_status::ready) {
4790  auto ret_import_status = p.get();
4791  import_status += ret_import_status;
4792  import_status.rows_estimated =
4793  ((float)firstFeatureThisChunk / (float)numFeatures) *
4794  import_status.rows_completed;
4795  set_import_status(import_id, import_status);
4796 
4797  // recall thread_id for reuse
4798  stack_thread_ids.push(ret_import_status.thread_id);
4799 
4800  threads.erase(it++);
4801  ++nready;
4802  } else {
4803  ++it;
4804  }
4805  }
4806 
4807  if (nready == 0) {
4808  std::this_thread::yield();
4809  }
4810 
4811  // keep reading if any free thread slot
4812  // this is one of the major difference from old threading model !!
4813  if (threads.size() < max_threads) {
4814  break;
4815  }
4816  }
4817 #endif
4818 
4819  // out of rows?
4820  if (import_status.rows_rejected > copy_params.max_reject) {
4821  load_truncated = true;
4822  load_failed = true;
4823  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4824  break;
4825  }
4826 
4827  // failed?
4828  if (load_failed) {
4829  load_truncated = true;
4830  LOG(ERROR)
4831  << "A call to the Loader::load failed, Please review the logs for more details";
4832  break;
4833  }
4834 
4835  firstFeatureThisChunk += numFeaturesThisChunk;
4836  }
4837 
4838 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4839  // wait for any remaining threads
4840  if (threads.size()) {
4841  for (auto& p : threads) {
4842  // wait for the thread
4843  p.wait();
4844  // get the result and update the final import status
4845  auto ret_import_status = p.get();
4846  import_status += ret_import_status;
4849  }
4850  }
4851 #endif
4852 
4853  checkpoint(start_epoch);
4854 
4855  // must set import_status.load_truncated before closing this end of pipe
4856  // otherwise, the thread on the other end would throw an unwanted 'write()'
4857  // exception
4858  import_status.load_truncated = load_truncated;
4859  return import_status;
4860 }
4861 
4862 //
4863 // class RenderGroupAnalyzer
4864 //
4865 
4867  const std::unique_ptr<Loader>& loader,
4868  const std::string& geoColumnBaseName) {
4869  // start timer
4870  auto seedTimer = timer_start();
4871 
4872  // get the table descriptor
4873  const auto& cat = loader->getCatalog();
4874  const std::string& tableName = loader->getTableDesc()->tableName;
4875  const auto td = cat.getMetadataForTable(tableName);
4876  CHECK(td);
4877  CHECK(td->fragmenter);
4878 
4879  // start with a fresh tree
4880  _rtree = nullptr;
4881  _numRenderGroups = 0;
4882 
4883  // if the table is empty, just make an empty tree
4884  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4886  LOG(INFO) << "DEBUG: Table is empty!";
4887  }
4888  _rtree = std::make_unique<RTree>();
4889  CHECK(_rtree);
4890  return;
4891  }
4892 
4893  // no seeding possible without these two columns
4894  const auto cd_bounds =
4895  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
4896  const auto cd_render_group =
4897  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
4898  if (!cd_bounds || !cd_render_group) {
4899  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4900  " doesn't have bounds or render_group columns!");
4901  }
4902 
4903  // and validate their types
4904  if (cd_bounds->columnType.get_type() != kARRAY ||
4905  cd_bounds->columnType.get_subtype() != kDOUBLE) {
4906  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4907  " bounds column is wrong type!");
4908  }
4909  if (cd_render_group->columnType.get_type() != kINT) {
4910  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4911  " render_group column is wrong type!");
4912  }
4913 
4914  // get chunk accessor table
4915  auto chunkAccessorTable = getChunkAccessorTable(
4916  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
4917  const auto table_count = std::get<0>(chunkAccessorTable.back());
4918 
4920  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
4921  << "'";
4922  }
4923 
4924  std::vector<Node> nodes;
4925  try {
4926  nodes.resize(table_count);
4927  } catch (const std::exception& e) {
4928  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
4929  std::to_string(table_count) + " rows");
4930  }
4931 
4932  for (size_t row = 0; row < table_count; row++) {
4933  ArrayDatum ad;
4934  VarlenDatum vd;
4935  bool is_end;
4936 
4937  // get ChunkIters and fragment row offset
4938  size_t rowOffset = 0;
4939  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
4940  auto& boundsChunkIter = chunkIters[0];
4941  auto& renderGroupChunkIter = chunkIters[1];
4942 
4943  // get bounds values
4944  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
4945  CHECK(!is_end);
4946  CHECK(ad.pointer);
4947  int numBounds = (int)(ad.length / sizeof(double));
4948  CHECK(numBounds == 4);
4949 
4950  // convert to bounding box
4951  double* bounds = reinterpret_cast<double*>(ad.pointer);
4952  BoundingBox bounding_box;
4953  boost::geometry::assign_inverse(bounding_box);
4954  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4955  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
4956 
4957  // get render group
4958  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
4959  CHECK(!is_end);
4960  CHECK(vd.pointer);
4961  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
4962  CHECK_GE(renderGroup, 0);
4963 
4964  // store
4965  nodes[row] = std::make_pair(bounding_box, renderGroup);
4966 
4967  // how many render groups do we have now?
4968  if (renderGroup >= _numRenderGroups) {
4969  _numRenderGroups = renderGroup + 1;
4970  }
4971 
4973  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
4974  }
4975  }
4976 
4977  // bulk-load the tree
4978  auto bulk_load_timer = timer_start();
4979  _rtree = std::make_unique<RTree>(nodes);
4980  CHECK(_rtree);
4981  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
4982  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
4983  << timer_stop(bulk_load_timer) << " ms for tree)";
4984 
4986  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
4987  }
4988 }
4989 
4991  const std::vector<double>& bounds) {
4992  // validate
4993  CHECK(bounds.size() == 4);
4994 
4995  // get bounds
4996  BoundingBox bounding_box;
4997  boost::geometry::assign_inverse(bounding_box);
4998  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4999  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5000 
5001  // remainder under mutex to allow this to be multi-threaded
5002  std::lock_guard<std::mutex> guard(_rtreeMutex);
5003 
5004  // get the intersecting nodes
5005  std::vector<Node> intersects;
5006  _rtree->query(boost::geometry::index::intersects(bounding_box),
5007  std::back_inserter(intersects));
5008 
5009  // build bitset of render groups of the intersecting rectangles
5010  // clear bit means available, allows use of find_first()
5011  boost::dynamic_bitset<> bits(_numRenderGroups);
5012  bits.set();
5013  for (const auto& intersection : intersects) {
5014  CHECK(intersection.second < _numRenderGroups);
5015  bits.reset(intersection.second);
5016  }
5017 
5018  // find first available group
5019  int firstAvailableRenderGroup = 0;
5020  size_t firstSetBit = bits.find_first();
5021  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5022  // all known groups represented, add a new one
5023  firstAvailableRenderGroup = _numRenderGroups;
5024  _numRenderGroups++;
5025  } else {
5026  firstAvailableRenderGroup = (int)firstSetBit;
5027  }
5028 
5029  // insert new node
5030  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5031 
5032  // return it
5033  return firstAvailableRenderGroup;
5034 }
5035 
5036 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
5037  const Catalog_Namespace::UserMetadata& user,
5038  const ExecutorDeviceType dt)
5039  : QueryRunner(std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, "")) {}
5040 
5041 void ImportDriver::importGeoTable(const std::string& file_path,
5042  const std::string& table_name,
5043  const bool compression,
5044  const bool create_table,
5045  const bool explode_collections) {
5047  const std::string geo_column_name(OMNISCI_GEO_PREFIX);
5048 
5049  CopyParams copy_params;
5050  if (compression) {
5052  copy_params.geo_coords_comp_param = 32;
5053  } else {
5055  copy_params.geo_coords_comp_param = 0;
5056  }
5057  copy_params.geo_assign_render_groups = true;
5058  copy_params.geo_explode_collections = explode_collections;
5059 
5060  auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
5061  std::map<std::string, std::string> colname_to_src;
5062  for (auto& cd : cds) {
5063  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
5064  const auto ret =
5065  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
5066  CHECK(ret.second);
5067  cd.columnName = col_name_sanitized;
5068  }
5069 
5070  auto& cat = session_info_->getCatalog();
5071 
5072  if (create_table) {
5073  const auto td = cat.getMetadataForTable(table_name);
5074  if (td != nullptr) {
5075  throw std::runtime_error("Error: Table " + table_name +
5076  " already exists. Possible failure to correctly re-create "
5077  "mapd_data directory.");
5078  }
5079  if (table_name != ImportHelpers::sanitize_name(table_name)) {
5080  throw std::runtime_error("Invalid characters in table name: " + table_name);
5081  }
5082 
5083  std::string stmt{"CREATE TABLE " + table_name};
5084  std::vector<std::string> col_stmts;
5085 
5086  for (auto& cd : cds) {
5089  throw std::runtime_error(
5090  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
5091  cd.columnName + " (table: " + table_name + ")");
5092  }
5093 
5094  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
5095  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
5096  cd.columnType.set_precision(14);
5097  cd.columnType.set_scale(7);
5098  }
5099  }
5100 
5101  std::string col_stmt;
5102  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
5103 
5105  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
5106  } else {
5107  if (cd.columnType.is_string()) {
5108  col_stmt.append("ENCODING NONE");
5109  } else if (cd.columnType.is_geometry()) {
5110  if (cd.columnType.get_output_srid() == 4326) {
5111  col_stmt.append("ENCODING NONE");
5112  }
5113  }
5114  }
5115  col_stmts.push_back(col_stmt);
5116  }
5117 
5118  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
5119  runDDLStatement(stmt);
5120 
5121  LOG(INFO) << "Created table: " << table_name;
5122  } else {
5123  LOG(INFO) << "Not creating table: " << table_name;
5124  }
5125 
5126  const auto td = cat.getMetadataForTable(table_name);
5127  if (td == nullptr) {
5128  throw std::runtime_error("Error: Failed to create table " + table_name);
5129  }
5130 
5131  Importer_NS::Importer importer(cat, td, file_path, copy_params);
5132  auto ms = measure<>::execution([&]() { importer.importGDAL(colname_to_src); });
5133  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
5134 }
5135 
5136 } // namespace Importer_NS
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:147
int8_t tinyintval
Definition: sqltypes.h:126
std::unique_ptr< Loader > loader
Definition: Importer.h:833
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4561
void addSmallint(const int16_t v)
Definition: Importer.h:229
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4089
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4286
#define CHECK_EQ(x, y)
Definition: Logger.h:201
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE int get_output_srid() const
Definition: sqltypes.h:332
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2433
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:208
std::vector< int16_t > * smallint_buffer_
Definition: Importer.h:516
#define NULL_DOUBLE
Definition: sqltypes.h:179
std::chrono::duration< double > timeout
Definition: Importer.h:709
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:3041
std::vector< size_t > file_offsets
Definition: Importer.h:659
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3544
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
Definition: Importer.cpp:5041
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
std::vector< std::unique_ptr< TypedImportBuffer >> OneShardBuffers
Definition: Importer.h:584
std::tuple< int, SQLTypes, std::string > explode_collections_step1(const std::list< const ColumnDescriptor * > &col_descs)
Definition: Importer.cpp:1653
void addGeoString(const std::string &v)
Definition: Importer.h:241
std::vector< int64_t > * bigint_buffer_
Definition: Importer.h:518
const int8_t const int64_t * num_rows
std::vector< EncodingType > best_encodings
Definition: Importer.h:678
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::string null_str
Definition: CopyParams.h:47
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2735
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
SQLTypes
Definition: sqltypes.h:41
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
void addTinyint(const int8_t v)
Definition: Importer.h:227
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:842
static const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const Importer_NS::CopyParams &copy_params, const bool *is_array, std::vector< std::string > &row, bool &try_single_thread)
Parses the first row in the given buffer and inserts fields into given vector.
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:112
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:77
std::string error_context(const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
Definition: ArrowImporter.h:75
virtual void checkpoint()
Definition: Importer.cpp:4002
ExecutorDeviceType
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2858
std::vector< float > * float_buffer_
Definition: Importer.h:519
std::mutex loader_mutex_
Definition: Importer.h:606
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:187
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2386
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:548
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
#define LOG(tag)
Definition: Logger.h:188
bool boolval
Definition: sqltypes.h:125
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:4990
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3042
const CopyParams & get_copy_params() const
Definition: Importer.h:749
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4630
Fragmenter_Namespace::InsertData insert_data_
Definition: Importer.h:594
std::chrono::steady_clock::time_point end
Definition: Importer.h:611
~Importer() override
Definition: Importer.cpp:196
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
bool try_cast(const std::string &str)
Definition: Importer.cpp:2837
std::string join(T const &container, std::string const &delim)
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1422
void set_input_srid(int d)
Definition: sqltypes.h:420
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:431
#define UNREACHABLE()
Definition: Logger.h:237
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
#define CHECK_GE(x, y)
Definition: Logger.h:206
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
std::map< int, StringDictionary * > dict_map_
Definition: Importer.h:595
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2379
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:834
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2257
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:416
Catalog_Namespace::Catalog & catalog_
Definition: Importer.h:591
void set_fixed_size()
Definition: sqltypes.h:425
int8_t * getAsBytes() const
Definition: Importer.h:311
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:726
void addInt(const int32_t v)
Definition: Importer.h:231
auto value_getter(const Array &array, const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
const SQLTypeInfo & getTypeInfo() const
Definition: Importer.h:305
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:93
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:102
void set_scale(int s)
Definition: sqltypes.h:421
std::chrono::steady_clock::time_point start
Definition: Importer.h:610
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2581
#define CHECK_GT(x, y)
Definition: Logger.h:205
void set_compression(EncodingType c)
Definition: sqltypes.h:426
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
int32_t intval
Definition: sqltypes.h:128
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:220
std::string sourceName
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:350
std::string to_string(char const *&&v)
std::map< int, const ColumnDescriptor * > columnDescriptors
Definition: Fragmenter.h:69
void set_output_srid(int s)
Definition: sqltypes.h:422
int8_t * pointer
Definition: sqltypes.h:75
bool is_number() const
Definition: sqltypes.h:482
std::string line1
Definition: Importer.h:710
std::string get_type_name() const
Definition: sqltypes.h:429
EncodingType geo_coords_encoding
Definition: CopyParams.h:73
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4464
#define DEBUG_TIMING
Definition: Importer.cpp:138
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4336
void set_replicate_count(const int64_t replicate_count)
Definition: Importer.h:497
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2846
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:268
void addBoolean(const int8_t v)
Definition: Importer.h:225
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
float floatval
Definition: sqltypes.h:130
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed)=0
int64_t explode_collections_step2(OGRGeometry *ogr_geometry, SQLTypes collection_child_type, const std::string &collection_col_name, size_t row_or_feature_idx, std::function< void(OGRGeometry *)> execute_import_lambda)
Definition: Importer.cpp:1687
virtual void setTableEpoch(const int32_t new_epoch)
Definition: Importer.cpp:4014
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:232
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:615
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:417
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:146
void set_precision(int d)
Definition: sqltypes.h:419
std::vector< int8_t > * tinyint_buffer_
Definition: Importer.h:515
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:794
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4192
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3092
void addFloat(const float v)
Definition: Importer.h:235
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
Definition: Importer.cpp:4866
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:832
CHECK(cgen_state)
std::vector< std::string > * string_buffer_
Definition: Importer.h:521
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:727
std::string s3_access_key
Definition: CopyParams.h:62
bool is_time() const
Definition: sqltypes.h:483