OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include "ImportExport/Importer.h"
24 
25 #include <arrow/api.h>
26 #include <arrow/io/api.h>
27 #include <gdal.h>
28 #include <ogrsf_frmts.h>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/dynamic_bitset.hpp>
31 #include <boost/filesystem.hpp>
32 #include <boost/geometry.hpp>
33 #include <boost/variant.hpp>
34 #include <csignal>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <fstream>
38 #include <future>
39 #include <iomanip>
40 #include <list>
41 #include <memory>
42 #include <mutex>
43 #include <numeric>
44 #include <stack>
45 #include <stdexcept>
46 #include <thread>
47 #include <typeinfo>
48 #include <unordered_map>
49 #include <unordered_set>
50 #include <utility>
51 #include <vector>
52 
54 #include "Archive/S3Archive.h"
55 #include "ArrowImporter.h"
56 #include "Geospatial/Compression.h"
57 #include "Geospatial/GDAL.h"
58 #include "Geospatial/Transforms.h"
59 #include "Geospatial/Types.h"
61 #include "Logger/Logger.h"
64 #include "QueryEngine/Execute.h"
66 #include "Shared/DateTimeParser.h"
67 #include "Shared/SqlTypesLayout.h"
68 #include "Shared/import_helpers.h"
69 #include "Shared/likely.h"
70 #include "Shared/measure.h"
71 #include "Shared/misc.h"
72 #include "Shared/scope.h"
73 #include "Shared/shard_key.h"
74 #include "Shared/thread_count.h"
76 
77 #include "gen-cpp/OmniSci.h"
78 
79 #ifdef _WIN32
80 #include <fcntl.h>
81 #include <io.h>
82 #endif
83 
85  32; // Max number of default import threads to use (num hardware threads will be used
86  // if lower, and can also be explicitly overriden in copy statement with threads
87  // option)
88 size_t g_archive_read_buf_size = 1 << 20;
89 
90 inline auto get_filesize(const std::string& file_path) {
91  boost::filesystem::path boost_file_path{file_path};
92  boost::system::error_code ec;
93  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
94  return ec ? 0 : filesize;
95 }
96 
97 namespace {
98 bool checkInterrupt(const QuerySessionId& query_session, Executor* executor) {
99  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
100  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
101  const auto isInterrupted =
102  executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
103  return isInterrupted;
104  }
105  return false;
106 }
107 
109  void operator()(OGRDataSource* datasource) {
110  if (datasource) {
111  GDALClose(datasource);
112  }
113  }
114 };
115 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
116 
118  void operator()(OGRFeature* feature) {
119  if (feature) {
120  OGRFeature::DestroyFeature(feature);
121  }
122  }
123 };
124 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
125 
127  void operator()(OGRSpatialReference* ref) {
128  if (ref) {
129  OGRSpatialReference::DestroySpatialReference(ref);
130  }
131  }
132 };
134  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
135 
136 } // namespace
137 
138 // For logging std::vector<std::string> row.
139 namespace boost {
140 namespace log {
141 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
142  out << '[';
143  for (size_t i = 0; i < row.size(); ++i) {
144  out << (i ? ", " : "") << row[i];
145  }
146  out << ']';
147  return out;
148 }
149 } // namespace log
150 } // namespace boost
151 
152 namespace import_export {
153 
154 using FieldNameToIndexMapType = std::map<std::string, size_t>;
155 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
157  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
158 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
159 
160 #define DEBUG_TIMING false
161 #define DEBUG_RENDER_GROUP_ANALYZER 0
162 #define DEBUG_AWS_AUTHENTICATION 0
163 
164 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
165 
166 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
167 
169 static std::map<std::string, ImportStatus> import_status_map;
170 
172  const TableDescriptor* t,
173  const std::string& f,
174  const CopyParams& p)
175  : Importer(new Loader(c, t), f, p) {}
176 
177 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
178  : DataStreamSink(p, f), loader(providedLoader) {
179  import_id = boost::filesystem::path(file_path).filename().string();
180  file_size = 0;
181  max_threads = 0;
182  p_file = nullptr;
183  buffer[0] = nullptr;
184  buffer[1] = nullptr;
185  // we may be overallocating a little more memory here due to dropping phy cols.
186  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
187  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
188  int i = 0;
189  bool has_array = false;
190  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
191  int skip_physical_cols = 0;
192  for (auto& p : loader->get_column_descs()) {
193  // phy geo columns can't be in input file
194  if (skip_physical_cols-- > 0) {
195  continue;
196  }
197  // neither are rowid or $deleted$
198  // note: columns can be added after rowid/$deleted$
199  if (p->isVirtualCol || p->isDeletedCol) {
200  continue;
201  }
202  skip_physical_cols = p->columnType.get_physical_cols();
203  if (p->columnType.get_type() == kARRAY) {
204  is_array.get()[i] = true;
205  has_array = true;
206  } else {
207  is_array.get()[i] = false;
208  }
209  ++i;
210  }
211  if (has_array) {
212  is_array_a = std::unique_ptr<bool[]>(is_array.release());
213  } else {
214  is_array_a = std::unique_ptr<bool[]>(nullptr);
215  }
216 }
217 
219  if (p_file != nullptr) {
220  fclose(p_file);
221  }
222  if (buffer[0] != nullptr) {
223  free(buffer[0]);
224  }
225  if (buffer[1] != nullptr) {
226  free(buffer[1]);
227  }
228 }
229 
230 ImportStatus Importer::get_import_status(const std::string& import_id) {
231  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
232  return import_status_map.at(import_id);
233 }
234 
235 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
236  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
237  is.end = std::chrono::steady_clock::now();
238  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
240 }
241 
242 static const std::string trim_space(const char* field, const size_t len) {
243  size_t i = 0;
244  size_t j = len;
245  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
246  i++;
247  }
248  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
249  j--;
250  }
251  return std::string(field + i, j - i);
252 }
253 
255  Datum d;
256  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
257  switch (type) {
258  case kBOOLEAN:
260  break;
261  case kBIGINT:
263  break;
264  case kINT:
266  break;
267  case kSMALLINT:
269  break;
270  case kTINYINT:
272  break;
273  case kFLOAT:
274  d.floatval = NULL_FLOAT;
275  break;
276  case kDOUBLE:
278  break;
279  case kTIME:
280  case kTIMESTAMP:
281  case kDATE:
283  break;
284  case kPOINT:
285  case kLINESTRING:
286  case kPOLYGON:
287  case kMULTIPOLYGON:
288  throw std::runtime_error("Internal error: geometry type in NullDatum.");
289  default:
290  throw std::runtime_error("Internal error: invalid type in NullDatum.");
291  }
292  return d;
293 }
294 
296  Datum d;
297  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
298  switch (type) {
299  case kBOOLEAN:
301  break;
302  case kBIGINT:
304  break;
305  case kINT:
307  break;
308  case kSMALLINT:
310  break;
311  case kTINYINT:
313  break;
314  case kFLOAT:
316  break;
317  case kDOUBLE:
319  break;
320  case kTIME:
321  case kTIMESTAMP:
322  case kDATE:
324  break;
325  case kPOINT:
326  case kLINESTRING:
327  case kPOLYGON:
328  case kMULTIPOLYGON:
329  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
330  default:
331  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
332  }
333  return d;
334 }
335 
336 ArrayDatum StringToArray(const std::string& s,
337  const SQLTypeInfo& ti,
338  const CopyParams& copy_params) {
339  SQLTypeInfo elem_ti = ti.get_elem_type();
340  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
341  return ArrayDatum(0, NULL, true);
342  }
343  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
344  LOG(WARNING) << "Malformed array: " << s;
345  return ArrayDatum(0, NULL, true);
346  }
347  std::vector<std::string> elem_strs;
348  size_t last = 1;
349  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
350  i = s.find(copy_params.array_delim, last)) {
351  elem_strs.push_back(s.substr(last, i - last));
352  last = i + 1;
353  }
354  if (last + 1 <= s.size()) {
355  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
356  }
357  if (elem_strs.size() == 1) {
358  auto str = elem_strs.front();
359  auto str_trimmed = trim_space(str.c_str(), str.length());
360  if (str_trimmed == "") {
361  elem_strs.clear(); // Empty array
362  }
363  }
364  if (!elem_ti.is_string()) {
365  size_t len = elem_strs.size() * elem_ti.get_size();
366  int8_t* buf = (int8_t*)checked_malloc(len);
367  int8_t* p = buf;
368  for (auto& es : elem_strs) {
369  auto e = trim_space(es.c_str(), es.length());
370  bool is_null = (e == copy_params.null_str) || e == "NULL";
371  if (!elem_ti.is_string() && e == "") {
372  is_null = true;
373  }
374  if (elem_ti.is_number() || elem_ti.is_time()) {
375  if (!isdigit(e[0]) && e[0] != '-') {
376  is_null = true;
377  }
378  }
379  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
380  p = appendDatum(p, d, elem_ti);
381  }
382  return ArrayDatum(len, buf, false);
383  }
384  // must not be called for array of strings
385  CHECK(false);
386  return ArrayDatum(0, NULL, true);
387 }
388 
390  SQLTypeInfo elem_ti = ti.get_elem_type();
391  auto len = ti.get_size();
392 
393  if (elem_ti.is_string()) {
394  // must not be called for array of strings
395  CHECK(false);
396  return ArrayDatum(0, NULL, true);
397  }
398 
399  if (len > 0) {
400  // Compose a NULL fixlen array
401  int8_t* buf = (int8_t*)checked_malloc(len);
402  // First scalar is a NULL_ARRAY sentinel
403  Datum d = NullArrayDatum(elem_ti);
404  int8_t* p = appendDatum(buf, d, elem_ti);
405  // Rest is filled with normal NULL sentinels
406  Datum d0 = NullDatum(elem_ti);
407  while ((p - buf) < len) {
408  p = appendDatum(p, d0, elem_ti);
409  }
410  CHECK((p - buf) == len);
411  return ArrayDatum(len, buf, true);
412  }
413  // NULL varlen array
414  return ArrayDatum(0, NULL, true);
415 }
416 
418  return NullArray(ti);
419 }
420 
421 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
422  const auto& arr = datum.val.arr_val;
423  for (const auto& elem_datum : arr) {
424  string_vec.push_back(elem_datum.val.str_val);
425  }
426 }
427 
428 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
429  Datum d;
430  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
431  switch (type) {
432  case kBOOLEAN:
433  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
434  break;
435  case kBIGINT:
436  d.bigintval =
437  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
438  break;
439  case kINT:
440  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
441  break;
442  case kSMALLINT:
443  d.smallintval =
444  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
445  break;
446  case kTINYINT:
447  d.tinyintval =
448  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
449  break;
450  case kFLOAT:
451  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
452  break;
453  case kDOUBLE:
454  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
455  break;
456  case kTIME:
457  case kTIMESTAMP:
458  case kDATE:
459  d.bigintval =
460  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
461  break;
462  case kPOINT:
463  case kLINESTRING:
464  case kPOLYGON:
465  case kMULTIPOLYGON:
466  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
467  default:
468  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
469  }
470  return d;
471 }
472 
473 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
474  SQLTypeInfo elem_ti = ti.get_elem_type();
475 
476  CHECK(!elem_ti.is_string());
477 
478  if (datum.is_null) {
479  return NullArray(ti);
480  }
481 
482  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
483  int8_t* buf = (int8_t*)checked_malloc(len);
484  int8_t* p = buf;
485  for (auto& e : datum.val.arr_val) {
486  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
487  }
488 
489  return ArrayDatum(len, buf, false);
490 }
491 
492 void TypedImportBuffer::addDictEncodedString(const std::vector<std::string>& string_vec) {
494  std::vector<std::string_view> string_view_vec;
495  string_view_vec.reserve(string_vec.size());
496  for (const auto& str : string_vec) {
497  if (str.size() > StringDictionary::MAX_STRLEN) {
498  std::ostringstream oss;
499  oss << "while processing dictionary for column " << getColumnDesc()->columnName
500  << " a string was detected too long for encoding, string length = "
501  << str.size() << ", first 100 characters are '" << str.substr(0, 100) << "'";
502  throw std::runtime_error(oss.str());
503  }
504  string_view_vec.push_back(str);
505  }
506  try {
507  switch (column_desc_->columnType.get_size()) {
508  case 1:
509  string_dict_i8_buffer_->resize(string_view_vec.size());
510  string_dict_->getOrAddBulk(string_view_vec, string_dict_i8_buffer_->data());
511  break;
512  case 2:
513  string_dict_i16_buffer_->resize(string_view_vec.size());
514  string_dict_->getOrAddBulk(string_view_vec, string_dict_i16_buffer_->data());
515  break;
516  case 4:
517  string_dict_i32_buffer_->resize(string_view_vec.size());
518  string_dict_->getOrAddBulk(string_view_vec, string_dict_i32_buffer_->data());
519  break;
520  default:
521  CHECK(false);
522  }
523  } catch (std::exception& e) {
524  std::ostringstream oss;
525  oss << "while processing dictionary for column " << getColumnDesc()->columnName
526  << " : " << e.what();
527  LOG(ERROR) << oss.str();
528  throw std::runtime_error(oss.str());
529  }
530 }
531 
533  const std::string_view val,
534  const bool is_null,
535  const CopyParams& copy_params) {
536  const auto type = cd->columnType.get_type();
537  switch (type) {
538  case kBOOLEAN: {
539  if (is_null) {
540  if (cd->columnType.get_notnull()) {
541  throw std::runtime_error("NULL for column " + cd->columnName);
542  }
544  } else {
545  auto ti = cd->columnType;
546  Datum d = StringToDatum(val, ti);
547  addBoolean(static_cast<int8_t>(d.boolval));
548  }
549  break;
550  }
551  case kTINYINT: {
552  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
553  auto ti = cd->columnType;
554  Datum d = StringToDatum(val, ti);
556  } else {
557  if (cd->columnType.get_notnull()) {
558  throw std::runtime_error("NULL for column " + cd->columnName);
559  }
561  }
562  break;
563  }
564  case kSMALLINT: {
565  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
566  auto ti = cd->columnType;
567  Datum d = StringToDatum(val, ti);
569  } else {
570  if (cd->columnType.get_notnull()) {
571  throw std::runtime_error("NULL for column " + cd->columnName);
572  }
574  }
575  break;
576  }
577  case kINT: {
578  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
579  auto ti = cd->columnType;
580  Datum d = StringToDatum(val, ti);
581  addInt(d.intval);
582  } else {
583  if (cd->columnType.get_notnull()) {
584  throw std::runtime_error("NULL for column " + cd->columnName);
585  }
587  }
588  break;
589  }
590  case kBIGINT: {
591  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
592  auto ti = cd->columnType;
593  Datum d = StringToDatum(val, ti);
594  addBigint(d.bigintval);
595  } else {
596  if (cd->columnType.get_notnull()) {
597  throw std::runtime_error("NULL for column " + cd->columnName);
598  }
600  }
601  break;
602  }
603  case kDECIMAL:
604  case kNUMERIC: {
605  if (!is_null) {
606  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
607  Datum d = StringToDatum(val, ti);
608  const auto converted_decimal_value =
610  addBigint(converted_decimal_value);
611  } else {
612  if (cd->columnType.get_notnull()) {
613  throw std::runtime_error("NULL for column " + cd->columnName);
614  }
616  }
617  break;
618  }
619  case kFLOAT:
620  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
621  addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
622  } else {
623  if (cd->columnType.get_notnull()) {
624  throw std::runtime_error("NULL for column " + cd->columnName);
625  }
627  }
628  break;
629  case kDOUBLE:
630  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
631  addDouble(std::atof(std::string(val).c_str()));
632  } else {
633  if (cd->columnType.get_notnull()) {
634  throw std::runtime_error("NULL for column " + cd->columnName);
635  }
637  }
638  break;
639  case kTEXT:
640  case kVARCHAR:
641  case kCHAR: {
642  // @TODO(wei) for now, use empty string for nulls
643  if (is_null) {
644  if (cd->columnType.get_notnull()) {
645  throw std::runtime_error("NULL for column " + cd->columnName);
646  }
647  addString(std::string());
648  } else {
649  if (val.length() > StringDictionary::MAX_STRLEN) {
650  throw std::runtime_error("String too long for column " + cd->columnName +
651  " was " + std::to_string(val.length()) + " max is " +
653  }
654  addString(val);
655  }
656  break;
657  }
658  case kTIME:
659  case kTIMESTAMP:
660  case kDATE:
661  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
662  SQLTypeInfo ti = cd->columnType;
663  Datum d = StringToDatum(val, ti);
664  addBigint(d.bigintval);
665  } else {
666  if (cd->columnType.get_notnull()) {
667  throw std::runtime_error("NULL for column " + cd->columnName);
668  }
670  }
671  break;
672  case kARRAY: {
673  if (is_null && cd->columnType.get_notnull()) {
674  throw std::runtime_error("NULL for column " + cd->columnName);
675  }
676  SQLTypeInfo ti = cd->columnType;
677  if (IS_STRING(ti.get_subtype())) {
678  std::vector<std::string> string_vec;
679  // Just parse string array, don't push it to buffer yet as we might throw
681  std::string(val), copy_params, string_vec);
682  if (!is_null) {
683  // TODO: add support for NULL string arrays
684  if (ti.get_size() > 0) {
685  auto sti = ti.get_elem_type();
686  size_t expected_size = ti.get_size() / sti.get_size();
687  size_t actual_size = string_vec.size();
688  if (actual_size != expected_size) {
689  throw std::runtime_error("Fixed length array column " + cd->columnName +
690  " expects " + std::to_string(expected_size) +
691  " values, received " +
692  std::to_string(actual_size));
693  }
694  }
695  addStringArray(string_vec);
696  } else {
697  if (ti.get_size() > 0) {
698  // TODO: remove once NULL fixlen arrays are allowed
699  throw std::runtime_error("Fixed length array column " + cd->columnName +
700  " currently cannot accept NULL arrays");
701  }
702  // TODO: add support for NULL string arrays, replace with addStringArray(),
703  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
704  addStringArray(string_vec);
705  }
706  } else {
707  if (!is_null) {
708  ArrayDatum d = StringToArray(std::string(val), ti, copy_params);
709  if (d.is_null) { // val could be "NULL"
710  addArray(NullArray(ti));
711  } else {
712  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
713  throw std::runtime_error("Fixed length array for column " + cd->columnName +
714  " has incorrect length: " + std::string(val));
715  }
716  addArray(d);
717  }
718  } else {
719  addArray(NullArray(ti));
720  }
721  }
722  break;
723  }
724  case kPOINT:
725  case kLINESTRING:
726  case kPOLYGON:
727  case kMULTIPOLYGON:
728  addGeoString(val);
729  break;
730  default:
731  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
732  }
733 }
734 
736  const auto type = column_desc_->columnType.is_decimal()
739  switch (type) {
740  case kBOOLEAN:
741  bool_buffer_->pop_back();
742  break;
743  case kTINYINT:
744  tinyint_buffer_->pop_back();
745  break;
746  case kSMALLINT:
747  smallint_buffer_->pop_back();
748  break;
749  case kINT:
750  int_buffer_->pop_back();
751  break;
752  case kBIGINT:
753  bigint_buffer_->pop_back();
754  break;
755  case kFLOAT:
756  float_buffer_->pop_back();
757  break;
758  case kDOUBLE:
759  double_buffer_->pop_back();
760  break;
761  case kTEXT:
762  case kVARCHAR:
763  case kCHAR:
764  string_buffer_->pop_back();
765  break;
766  case kDATE:
767  case kTIME:
768  case kTIMESTAMP:
769  bigint_buffer_->pop_back();
770  break;
771  case kARRAY:
773  string_array_buffer_->pop_back();
774  } else {
775  array_buffer_->pop_back();
776  }
777  break;
778  case kPOINT:
779  case kLINESTRING:
780  case kPOLYGON:
781  case kMULTIPOLYGON:
782  geo_string_buffer_->pop_back();
783  break;
784  default:
785  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
786  }
787 }
788 
789 struct GeoImportException : std::runtime_error {
790  using std::runtime_error::runtime_error;
791 };
792 
793 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
794 template <typename DATA_TYPE>
796  const ColumnDescriptor* cd,
797  const Array& array,
798  std::vector<DATA_TYPE>& buffer,
799  const ArraySliceRange& slice_range,
800  import_export::BadRowsTracker* const bad_rows_tracker) {
801  auto data =
802  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
803  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
804  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
805  if (bad_rows_tracker && cd->columnType.is_geometry()) {
806  f_add_geo_phy_cols = [&](const int64_t row) {
807  // Populate physical columns (ref. DBHandler::load_table)
808  std::vector<double> coords, bounds;
809  std::vector<int> ring_sizes, poly_rings;
810  int render_group = 0;
811  SQLTypeInfo ti;
812  // replace any unexpected exception from getGeoColumns or other
813  // on this path with a GeoImportException so that we wont over
814  // push a null to the logical column...
815  try {
816  SQLTypeInfo import_ti{ti};
817  if (array.IsNull(row)) {
819  import_ti, coords, bounds, ring_sizes, poly_rings, false);
820  } else {
821  arrow_throw_if<GeoImportException>(
823  ti,
824  coords,
825  bounds,
826  ring_sizes,
827  poly_rings,
828  false),
829  error_context(cd, bad_rows_tracker) + "Invalid geometry");
830  arrow_throw_if<GeoImportException>(
831  cd->columnType.get_type() != ti.get_type(),
832  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
833  }
834  auto col_idx_workpad = col_idx; // what a pitfall!!
836  bad_rows_tracker->importer->getCatalog(),
837  cd,
839  col_idx_workpad,
840  coords,
841  bounds,
842  ring_sizes,
843  poly_rings,
844  render_group);
845  } catch (GeoImportException&) {
846  throw;
847  } catch (std::runtime_error& e) {
848  throw GeoImportException(e.what());
849  } catch (const std::exception& e) {
850  throw GeoImportException(e.what());
851  } catch (...) {
852  throw GeoImportException("unknown exception");
853  }
854  };
855  }
856  auto f_mark_a_bad_row = [&](const auto row) {
857  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
858  bad_rows_tracker->rows.insert(row - slice_range.first);
859  };
860  buffer.reserve(slice_range.second - slice_range.first);
861  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
862  try {
863  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
864  f_add_geo_phy_cols(row);
865  } catch (GeoImportException&) {
866  f_mark_a_bad_row(row);
867  } catch (ArrowImporterException&) {
868  // trace bad rows of each column; otherwise rethrow.
869  if (bad_rows_tracker) {
870  *data << nullptr;
871  f_mark_a_bad_row(row);
872  } else {
873  throw;
874  }
875  }
876  }
877  return buffer.size();
878 }
879 
881  const Array& col,
882  const bool exact_type_match,
883  const ArraySliceRange& slice_range,
884  BadRowsTracker* const bad_rows_tracker) {
885  const auto type = cd->columnType.get_type();
886  if (cd->columnType.get_notnull()) {
887  // We can't have any null values for this column; to have them is an error
888  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
889  }
890 
891  switch (type) {
892  case kBOOLEAN:
893  if (exact_type_match) {
894  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
895  }
897  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
898  case kTINYINT:
899  if (exact_type_match) {
900  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
901  }
903  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
904  case kSMALLINT:
905  if (exact_type_match) {
906  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
907  }
909  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
910  case kINT:
911  if (exact_type_match) {
912  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
913  }
915  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
916  case kBIGINT:
917  case kNUMERIC:
918  case kDECIMAL:
919  if (exact_type_match) {
920  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
921  }
923  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
924  case kFLOAT:
925  if (exact_type_match) {
926  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
927  }
929  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
930  case kDOUBLE:
931  if (exact_type_match) {
932  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
933  }
935  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
936  case kTEXT:
937  case kVARCHAR:
938  case kCHAR:
939  if (exact_type_match) {
940  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
941  "Expected string type");
942  }
944  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
945  case kTIME:
946  if (exact_type_match) {
947  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
948  "Expected time32 or time64 type");
949  }
951  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
952  case kTIMESTAMP:
953  if (exact_type_match) {
954  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
955  }
957  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
958  case kDATE:
959  if (exact_type_match) {
960  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
961  "Expected date32 or date64 type");
962  }
964  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
965  case kPOINT:
966  case kLINESTRING:
967  case kPOLYGON:
968  case kMULTIPOLYGON:
969  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
970  "Expected string type");
972  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
973  case kARRAY:
974  throw std::runtime_error("Arrow array appends not yet supported");
975  default:
976  throw std::runtime_error("Invalid Type");
977  }
978 }
979 
980 // this is exclusively used by load_table_binary_columnar
981 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
982  size_t dataSize = 0;
983  if (cd->columnType.get_notnull()) {
984  // We can't have any null values for this column; to have them is an error
985  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
986  throw std::runtime_error("NULL for column " + cd->columnName);
987  }
988  }
989 
990  switch (cd->columnType.get_type()) {
991  case kBOOLEAN: {
992  dataSize = col.data.int_col.size();
993  bool_buffer_->reserve(dataSize);
994  for (size_t i = 0; i < dataSize; i++) {
995  if (col.nulls[i]) {
997  } else {
998  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
999  }
1000  }
1001  break;
1002  }
1003  case kTINYINT: {
1004  dataSize = col.data.int_col.size();
1005  tinyint_buffer_->reserve(dataSize);
1006  for (size_t i = 0; i < dataSize; i++) {
1007  if (col.nulls[i]) {
1009  } else {
1010  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1011  }
1012  }
1013  break;
1014  }
1015  case kSMALLINT: {
1016  dataSize = col.data.int_col.size();
1017  smallint_buffer_->reserve(dataSize);
1018  for (size_t i = 0; i < dataSize; i++) {
1019  if (col.nulls[i]) {
1021  } else {
1022  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1023  }
1024  }
1025  break;
1026  }
1027  case kINT: {
1028  dataSize = col.data.int_col.size();
1029  int_buffer_->reserve(dataSize);
1030  for (size_t i = 0; i < dataSize; i++) {
1031  if (col.nulls[i]) {
1033  } else {
1034  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1035  }
1036  }
1037  break;
1038  }
1039  case kBIGINT:
1040  case kNUMERIC:
1041  case kDECIMAL: {
1042  dataSize = col.data.int_col.size();
1043  bigint_buffer_->reserve(dataSize);
1044  for (size_t i = 0; i < dataSize; i++) {
1045  if (col.nulls[i]) {
1047  } else {
1048  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1049  }
1050  }
1051  break;
1052  }
1053  case kFLOAT: {
1054  dataSize = col.data.real_col.size();
1055  float_buffer_->reserve(dataSize);
1056  for (size_t i = 0; i < dataSize; i++) {
1057  if (col.nulls[i]) {
1058  float_buffer_->push_back(NULL_FLOAT);
1059  } else {
1060  float_buffer_->push_back((float)col.data.real_col[i]);
1061  }
1062  }
1063  break;
1064  }
1065  case kDOUBLE: {
1066  dataSize = col.data.real_col.size();
1067  double_buffer_->reserve(dataSize);
1068  for (size_t i = 0; i < dataSize; i++) {
1069  if (col.nulls[i]) {
1070  double_buffer_->push_back(NULL_DOUBLE);
1071  } else {
1072  double_buffer_->push_back((double)col.data.real_col[i]);
1073  }
1074  }
1075  break;
1076  }
1077  case kTEXT:
1078  case kVARCHAR:
1079  case kCHAR: {
1080  // TODO: for now, use empty string for nulls
1081  dataSize = col.data.str_col.size();
1082  string_buffer_->reserve(dataSize);
1083  for (size_t i = 0; i < dataSize; i++) {
1084  if (col.nulls[i]) {
1085  string_buffer_->push_back(std::string());
1086  } else {
1087  string_buffer_->push_back(col.data.str_col[i]);
1088  }
1089  }
1090  break;
1091  }
1092  case kTIME:
1093  case kTIMESTAMP:
1094  case kDATE: {
1095  dataSize = col.data.int_col.size();
1096  bigint_buffer_->reserve(dataSize);
1097  for (size_t i = 0; i < dataSize; i++) {
1098  if (col.nulls[i]) {
1100  } else {
1101  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1102  }
1103  }
1104  break;
1105  }
1106  case kPOINT:
1107  case kLINESTRING:
1108  case kPOLYGON:
1109  case kMULTIPOLYGON: {
1110  dataSize = col.data.str_col.size();
1111  geo_string_buffer_->reserve(dataSize);
1112  for (size_t i = 0; i < dataSize; i++) {
1113  if (col.nulls[i]) {
1114  // TODO: add support for NULL geo
1115  geo_string_buffer_->push_back(std::string());
1116  } else {
1117  geo_string_buffer_->push_back(col.data.str_col[i]);
1118  }
1119  }
1120  break;
1121  }
1122  case kARRAY: {
1123  dataSize = col.data.arr_col.size();
1124  if (IS_STRING(cd->columnType.get_subtype())) {
1125  for (size_t i = 0; i < dataSize; i++) {
1126  std::vector<std::string>& string_vec = addStringArray();
1127  if (!col.nulls[i]) {
1128  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1129  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1130  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1131  }
1132  }
1133  }
1134  } else {
1135  auto elem_ti = cd->columnType.get_subtype();
1136  switch (elem_ti) {
1137  case kBOOLEAN: {
1138  for (size_t i = 0; i < dataSize; i++) {
1139  if (col.nulls[i]) {
1141  } else {
1142  size_t len = col.data.arr_col[i].data.int_col.size();
1143  size_t byteSize = len * sizeof(int8_t);
1144  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1145  int8_t* p = buf;
1146  for (size_t j = 0; j < len; ++j) {
1147  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1148  p += sizeof(bool);
1149  }
1150  addArray(ArrayDatum(byteSize, buf, false));
1151  }
1152  }
1153  break;
1154  }
1155  case kTINYINT: {
1156  for (size_t i = 0; i < dataSize; i++) {
1157  if (col.nulls[i]) {
1159  } else {
1160  size_t len = col.data.arr_col[i].data.int_col.size();
1161  size_t byteSize = len * sizeof(int8_t);
1162  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1163  int8_t* p = buf;
1164  for (size_t j = 0; j < len; ++j) {
1165  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1166  p += sizeof(int8_t);
1167  }
1168  addArray(ArrayDatum(byteSize, buf, false));
1169  }
1170  }
1171  break;
1172  }
1173  case kSMALLINT: {
1174  for (size_t i = 0; i < dataSize; i++) {
1175  if (col.nulls[i]) {
1177  } else {
1178  size_t len = col.data.arr_col[i].data.int_col.size();
1179  size_t byteSize = len * sizeof(int16_t);
1180  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1181  int8_t* p = buf;
1182  for (size_t j = 0; j < len; ++j) {
1183  *(int16_t*)p =
1184  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1185  p += sizeof(int16_t);
1186  }
1187  addArray(ArrayDatum(byteSize, buf, false));
1188  }
1189  }
1190  break;
1191  }
1192  case kINT: {
1193  for (size_t i = 0; i < dataSize; i++) {
1194  if (col.nulls[i]) {
1196  } else {
1197  size_t len = col.data.arr_col[i].data.int_col.size();
1198  size_t byteSize = len * sizeof(int32_t);
1199  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1200  int8_t* p = buf;
1201  for (size_t j = 0; j < len; ++j) {
1202  *(int32_t*)p =
1203  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1204  p += sizeof(int32_t);
1205  }
1206  addArray(ArrayDatum(byteSize, buf, false));
1207  }
1208  }
1209  break;
1210  }
1211  case kBIGINT:
1212  case kNUMERIC:
1213  case kDECIMAL: {
1214  for (size_t i = 0; i < dataSize; i++) {
1215  if (col.nulls[i]) {
1217  } else {
1218  size_t len = col.data.arr_col[i].data.int_col.size();
1219  size_t byteSize = len * sizeof(int64_t);
1220  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1221  int8_t* p = buf;
1222  for (size_t j = 0; j < len; ++j) {
1223  *(int64_t*)p =
1224  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1225  p += sizeof(int64_t);
1226  }
1227  addArray(ArrayDatum(byteSize, buf, false));
1228  }
1229  }
1230  break;
1231  }
1232  case kFLOAT: {
1233  for (size_t i = 0; i < dataSize; i++) {
1234  if (col.nulls[i]) {
1236  } else {
1237  size_t len = col.data.arr_col[i].data.real_col.size();
1238  size_t byteSize = len * sizeof(float);
1239  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1240  int8_t* p = buf;
1241  for (size_t j = 0; j < len; ++j) {
1242  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1243  p += sizeof(float);
1244  }
1245  addArray(ArrayDatum(byteSize, buf, false));
1246  }
1247  }
1248  break;
1249  }
1250  case kDOUBLE: {
1251  for (size_t i = 0; i < dataSize; i++) {
1252  if (col.nulls[i]) {
1254  } else {
1255  size_t len = col.data.arr_col[i].data.real_col.size();
1256  size_t byteSize = len * sizeof(double);
1257  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1258  int8_t* p = buf;
1259  for (size_t j = 0; j < len; ++j) {
1260  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1261  p += sizeof(double);
1262  }
1263  addArray(ArrayDatum(byteSize, buf, false));
1264  }
1265  }
1266  break;
1267  }
1268  case kTIME:
1269  case kTIMESTAMP:
1270  case kDATE: {
1271  for (size_t i = 0; i < dataSize; i++) {
1272  if (col.nulls[i]) {
1274  } else {
1275  size_t len = col.data.arr_col[i].data.int_col.size();
1276  size_t byteWidth = sizeof(int64_t);
1277  size_t byteSize = len * byteWidth;
1278  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1279  int8_t* p = buf;
1280  for (size_t j = 0; j < len; ++j) {
1281  *reinterpret_cast<int64_t*>(p) =
1282  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1283  p += sizeof(int64_t);
1284  }
1285  addArray(ArrayDatum(byteSize, buf, false));
1286  }
1287  }
1288  break;
1289  }
1290  default:
1291  throw std::runtime_error("Invalid Array Type");
1292  }
1293  }
1294  break;
1295  }
1296  default:
1297  throw std::runtime_error("Invalid Type");
1298  }
1299  return dataSize;
1300 }
1301 
1303  const TDatum& datum,
1304  const bool is_null) {
1305  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1306  : cd->columnType.get_type();
1307  switch (type) {
1308  case kBOOLEAN: {
1309  if (is_null) {
1310  if (cd->columnType.get_notnull()) {
1311  throw std::runtime_error("NULL for column " + cd->columnName);
1312  }
1314  } else {
1315  addBoolean((int8_t)datum.val.int_val);
1316  }
1317  break;
1318  }
1319  case kTINYINT:
1320  if (!is_null) {
1321  addTinyint((int8_t)datum.val.int_val);
1322  } else {
1323  if (cd->columnType.get_notnull()) {
1324  throw std::runtime_error("NULL for column " + cd->columnName);
1325  }
1327  }
1328  break;
1329  case kSMALLINT:
1330  if (!is_null) {
1331  addSmallint((int16_t)datum.val.int_val);
1332  } else {
1333  if (cd->columnType.get_notnull()) {
1334  throw std::runtime_error("NULL for column " + cd->columnName);
1335  }
1337  }
1338  break;
1339  case kINT:
1340  if (!is_null) {
1341  addInt((int32_t)datum.val.int_val);
1342  } else {
1343  if (cd->columnType.get_notnull()) {
1344  throw std::runtime_error("NULL for column " + cd->columnName);
1345  }
1347  }
1348  break;
1349  case kBIGINT:
1350  if (!is_null) {
1351  addBigint(datum.val.int_val);
1352  } else {
1353  if (cd->columnType.get_notnull()) {
1354  throw std::runtime_error("NULL for column " + cd->columnName);
1355  }
1357  }
1358  break;
1359  case kFLOAT:
1360  if (!is_null) {
1361  addFloat((float)datum.val.real_val);
1362  } else {
1363  if (cd->columnType.get_notnull()) {
1364  throw std::runtime_error("NULL for column " + cd->columnName);
1365  }
1367  }
1368  break;
1369  case kDOUBLE:
1370  if (!is_null) {
1371  addDouble(datum.val.real_val);
1372  } else {
1373  if (cd->columnType.get_notnull()) {
1374  throw std::runtime_error("NULL for column " + cd->columnName);
1375  }
1377  }
1378  break;
1379  case kTEXT:
1380  case kVARCHAR:
1381  case kCHAR: {
1382  // @TODO(wei) for now, use empty string for nulls
1383  if (is_null) {
1384  if (cd->columnType.get_notnull()) {
1385  throw std::runtime_error("NULL for column " + cd->columnName);
1386  }
1387  addString(std::string());
1388  } else {
1389  addString(datum.val.str_val);
1390  }
1391  break;
1392  }
1393  case kTIME:
1394  case kTIMESTAMP:
1395  case kDATE: {
1396  if (!is_null) {
1397  addBigint(datum.val.int_val);
1398  } else {
1399  if (cd->columnType.get_notnull()) {
1400  throw std::runtime_error("NULL for column " + cd->columnName);
1401  }
1403  }
1404  break;
1405  }
1406  case kARRAY:
1407  if (is_null && cd->columnType.get_notnull()) {
1408  throw std::runtime_error("NULL for column " + cd->columnName);
1409  }
1410  if (IS_STRING(cd->columnType.get_subtype())) {
1411  std::vector<std::string>& string_vec = addStringArray();
1412  addBinaryStringArray(datum, string_vec);
1413  } else {
1414  if (!is_null) {
1415  addArray(TDatumToArrayDatum(datum, cd->columnType));
1416  } else {
1418  }
1419  }
1420  break;
1421  case kPOINT:
1422  case kLINESTRING:
1423  case kPOLYGON:
1424  case kMULTIPOLYGON:
1425  if (is_null) {
1426  if (cd->columnType.get_notnull()) {
1427  throw std::runtime_error("NULL for column " + cd->columnName);
1428  }
1429  addGeoString(std::string());
1430  } else {
1431  addGeoString(datum.val.str_val);
1432  }
1433  break;
1434  default:
1435  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1436  }
1437 }
1438 
1439 bool importGeoFromLonLat(double lon,
1440  double lat,
1441  std::vector<double>& coords,
1442  SQLTypeInfo& ti) {
1443  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1444  return false;
1445  }
1446  if (ti.transforms()) {
1447  Geospatial::GeoPoint pt{std::vector<double>{lon, lat}};
1448  if (!pt.transform(ti)) {
1449  return false;
1450  }
1451  pt.getColumns(coords);
1452  return true;
1453  }
1454  coords.push_back(lon);
1455  coords.push_back(lat);
1456  return true;
1457 }
1458 
1460  const Catalog_Namespace::Catalog& catalog,
1461  const ColumnDescriptor* cd,
1462  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1463  size_t& col_idx,
1464  std::vector<double>& coords,
1465  std::vector<double>& bounds,
1466  std::vector<int>& ring_sizes,
1467  std::vector<int>& poly_rings,
1468  int render_group) {
1469  const auto col_ti = cd->columnType;
1470  const auto col_type = col_ti.get_type();
1471  auto columnId = cd->columnId;
1472  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1473  bool is_null_geo = false;
1474  bool is_null_point = false;
1475  if (!col_ti.get_notnull()) {
1476  // Check for NULL geo
1477  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1478  is_null_point = true;
1479  coords.clear();
1480  }
1481  is_null_geo = coords.empty();
1482  if (is_null_point) {
1483  coords.push_back(NULL_ARRAY_DOUBLE);
1484  coords.push_back(NULL_DOUBLE);
1485  // Treating POINT coords as notnull, need to store actual encoding
1486  // [un]compressed+[not]null
1487  is_null_geo = false;
1488  }
1489  }
1490  TDatum tdd_coords;
1491  // Get the raw data representing [optionally compressed] non-NULL geo's coords.
1492  // One exception - NULL POINT geo: coords need to be processed to encode nullness
1493  // in a fixlen array, compressed and uncompressed.
1494  if (!is_null_geo) {
1495  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(coords, col_ti);
1496  tdd_coords.val.arr_val.reserve(compressed_coords.size());
1497  for (auto cc : compressed_coords) {
1498  tdd_coords.val.arr_val.emplace_back();
1499  tdd_coords.val.arr_val.back().val.int_val = cc;
1500  }
1501  }
1502  tdd_coords.is_null = is_null_geo;
1503  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false);
1504 
1505  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1506  // Create ring_sizes array value and add it to the physical column
1507  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1508  TDatum tdd_ring_sizes;
1509  tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1510  if (!is_null_geo) {
1511  for (auto ring_size : ring_sizes) {
1512  tdd_ring_sizes.val.arr_val.emplace_back();
1513  tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1514  }
1515  }
1516  tdd_ring_sizes.is_null = is_null_geo;
1517  import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1518  }
1519 
1520  if (col_type == kMULTIPOLYGON) {
1521  // Create poly_rings array value and add it to the physical column
1522  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1523  TDatum tdd_poly_rings;
1524  tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1525  if (!is_null_geo) {
1526  for (auto num_rings : poly_rings) {
1527  tdd_poly_rings.val.arr_val.emplace_back();
1528  tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1529  }
1530  }
1531  tdd_poly_rings.is_null = is_null_geo;
1532  import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings, false);
1533  }
1534 
1535  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1536  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1537  TDatum tdd_bounds;
1538  tdd_bounds.val.arr_val.reserve(bounds.size());
1539  if (!is_null_geo) {
1540  for (auto b : bounds) {
1541  tdd_bounds.val.arr_val.emplace_back();
1542  tdd_bounds.val.arr_val.back().val.real_val = b;
1543  }
1544  }
1545  tdd_bounds.is_null = is_null_geo;
1546  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false);
1547  }
1548 
1549  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1550  // Create render_group value and add it to the physical column
1551  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1552  TDatum td_render_group;
1553  td_render_group.val.int_val = render_group;
1554  td_render_group.is_null = is_null_geo;
1555  import_buffers[col_idx++]->add_value(cd_render_group, td_render_group, false);
1556  }
1557 }
1558 
1560  const Catalog_Namespace::Catalog& catalog,
1561  const ColumnDescriptor* cd,
1562  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1563  size_t& col_idx,
1564  std::vector<std::vector<double>>& coords_column,
1565  std::vector<std::vector<double>>& bounds_column,
1566  std::vector<std::vector<int>>& ring_sizes_column,
1567  std::vector<std::vector<int>>& poly_rings_column,
1568  std::vector<int>& render_groups_column) {
1569  const auto col_ti = cd->columnType;
1570  const auto col_type = col_ti.get_type();
1571  auto columnId = cd->columnId;
1572 
1573  auto coords_row_count = coords_column.size();
1574  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1575  for (auto& coords : coords_column) {
1576  bool is_null_geo = false;
1577  bool is_null_point = false;
1578  if (!col_ti.get_notnull()) {
1579  // Check for NULL geo
1580  if (col_type == kPOINT && (coords.empty() || coords[0] == NULL_ARRAY_DOUBLE)) {
1581  is_null_point = true;
1582  coords.clear();
1583  }
1584  is_null_geo = coords.empty();
1585  if (is_null_point) {
1586  coords.push_back(NULL_ARRAY_DOUBLE);
1587  coords.push_back(NULL_DOUBLE);
1588  // Treating POINT coords as notnull, need to store actual encoding
1589  // [un]compressed+[not]null
1590  is_null_geo = false;
1591  }
1592  }
1593  std::vector<TDatum> td_coords_data;
1594  if (!is_null_geo) {
1595  std::vector<uint8_t> compressed_coords =
1596  Geospatial::compress_coords(coords, col_ti);
1597  for (auto const& cc : compressed_coords) {
1598  TDatum td_byte;
1599  td_byte.val.int_val = cc;
1600  td_coords_data.push_back(td_byte);
1601  }
1602  }
1603  TDatum tdd_coords;
1604  tdd_coords.val.arr_val = td_coords_data;
1605  tdd_coords.is_null = is_null_geo;
1606  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
1607  }
1608  col_idx++;
1609 
1610  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1611  if (ring_sizes_column.size() != coords_row_count) {
1612  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1613  }
1614  // Create ring_sizes array value and add it to the physical column
1615  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1616  for (auto const& ring_sizes : ring_sizes_column) {
1617  bool is_null_geo = false;
1618  if (!col_ti.get_notnull()) {
1619  // Check for NULL geo
1620  is_null_geo = ring_sizes.empty();
1621  }
1622  std::vector<TDatum> td_ring_sizes;
1623  for (auto const& ring_size : ring_sizes) {
1624  TDatum td_ring_size;
1625  td_ring_size.val.int_val = ring_size;
1626  td_ring_sizes.push_back(td_ring_size);
1627  }
1628  TDatum tdd_ring_sizes;
1629  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1630  tdd_ring_sizes.is_null = is_null_geo;
1631  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
1632  }
1633  col_idx++;
1634  }
1635 
1636  if (col_type == kMULTIPOLYGON) {
1637  if (poly_rings_column.size() != coords_row_count) {
1638  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1639  }
1640  // Create poly_rings array value and add it to the physical column
1641  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  for (auto const& poly_rings : poly_rings_column) {
1643  bool is_null_geo = false;
1644  if (!col_ti.get_notnull()) {
1645  // Check for NULL geo
1646  is_null_geo = poly_rings.empty();
1647  }
1648  std::vector<TDatum> td_poly_rings;
1649  for (auto const& num_rings : poly_rings) {
1650  TDatum td_num_rings;
1651  td_num_rings.val.int_val = num_rings;
1652  td_poly_rings.push_back(td_num_rings);
1653  }
1654  TDatum tdd_poly_rings;
1655  tdd_poly_rings.val.arr_val = td_poly_rings;
1656  tdd_poly_rings.is_null = is_null_geo;
1657  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
1658  }
1659  col_idx++;
1660  }
1661 
1662  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1663  if (bounds_column.size() != coords_row_count) {
1664  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1665  }
1666  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1667  for (auto const& bounds : bounds_column) {
1668  bool is_null_geo = false;
1669  if (!col_ti.get_notnull()) {
1670  // Check for NULL geo
1671  is_null_geo = (bounds.empty() || bounds[0] == NULL_ARRAY_DOUBLE);
1672  }
1673  std::vector<TDatum> td_bounds_data;
1674  for (auto const& b : bounds) {
1675  TDatum td_double;
1676  td_double.val.real_val = b;
1677  td_bounds_data.push_back(td_double);
1678  }
1679  TDatum tdd_bounds;
1680  tdd_bounds.val.arr_val = td_bounds_data;
1681  tdd_bounds.is_null = is_null_geo;
1682  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
1683  }
1684  col_idx++;
1685  }
1686 
1687  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1688  // Create render_group value and add it to the physical column
1689  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1690  for (auto const& render_group : render_groups_column) {
1691  TDatum td_render_group;
1692  td_render_group.val.int_val = render_group;
1693  td_render_group.is_null = false;
1694  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
1695  }
1696  col_idx++;
1697  }
1698 }
1699 
1700 namespace {
1701 
1702 std::tuple<int, SQLTypes, std::string> explode_collections_step1(
1703  const std::list<const ColumnDescriptor*>& col_descs) {
1704  // validate the columns
1705  // for now we can only explode into a single destination column
1706  // which must be of the child type (POLYGON, LINESTRING, POINT)
1707  int collection_col_idx = -1;
1708  int col_idx = 0;
1709  std::string collection_col_name;
1710  SQLTypes collection_child_type = kNULLT;
1711  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1712  auto const& cd = *cd_it;
1713  auto const col_type = cd->columnType.get_type();
1714  if (col_type == kPOLYGON || col_type == kLINESTRING || col_type == kPOINT) {
1715  if (collection_col_idx >= 0) {
1716  throw std::runtime_error(
1717  "Explode Collections: Found more than one destination column");
1718  }
1719  collection_col_idx = col_idx;
1720  collection_child_type = col_type;
1721  collection_col_name = cd->columnName;
1722  }
1723  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1724  ++cd_it;
1725  }
1726  col_idx++;
1727  }
1728  if (collection_col_idx < 0) {
1729  throw std::runtime_error(
1730  "Explode Collections: Failed to find a supported column type to explode "
1731  "into");
1732  }
1733  return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1734 }
1735 
1737  OGRGeometry* ogr_geometry,
1738  SQLTypes collection_child_type,
1739  const std::string& collection_col_name,
1740  size_t row_or_feature_idx,
1741  std::function<void(OGRGeometry*)> execute_import_lambda) {
1742  auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1743  bool is_collection = false;
1744  switch (collection_child_type) {
1745  case kPOINT:
1746  switch (ogr_geometry_type) {
1747  case wkbMultiPoint:
1748  is_collection = true;
1749  break;
1750  case wkbPoint:
1751  break;
1752  default:
1753  throw std::runtime_error(
1754  "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1755  }
1756  break;
1757  case kLINESTRING:
1758  switch (ogr_geometry_type) {
1759  case wkbMultiLineString:
1760  is_collection = true;
1761  break;
1762  case wkbLineString:
1763  break;
1764  default:
1765  throw std::runtime_error(
1766  "Explode Collections: Source geo type must be MULTILINESTRING or "
1767  "LINESTRING");
1768  }
1769  break;
1770  case kPOLYGON:
1771  switch (ogr_geometry_type) {
1772  case wkbMultiPolygon:
1773  is_collection = true;
1774  break;
1775  case wkbPolygon:
1776  break;
1777  default:
1778  throw std::runtime_error(
1779  "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1780  }
1781  break;
1782  default:
1783  CHECK(false) << "Unsupported geo child type " << collection_child_type;
1784  }
1785 
1786  int64_t us = 0LL;
1787 
1788  // explode or just import
1789  if (is_collection) {
1790  // cast to collection
1791  OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1792  CHECK(collection_geometry);
1793 
1794 #if LOG_EXPLODE_COLLECTIONS
1795  // log number of children
1796  LOG(INFO) << "Exploding row/feature " << row_or_feature_idx << " for column '"
1797  << explode_col_name << "' into " << collection_geometry->getNumGeometries()
1798  << " child rows";
1799 #endif
1800 
1801  // loop over children
1802  uint32_t child_geometry_count = 0;
1803  auto child_geometry_it = collection_geometry->begin();
1804  while (child_geometry_it != collection_geometry->end()) {
1805  // get and import this child
1806  OGRGeometry* import_geometry = *child_geometry_it;
1808  [&] { execute_import_lambda(import_geometry); });
1809 
1810  // next child
1811  child_geometry_it++;
1812  child_geometry_count++;
1813  }
1814  } else {
1815  // import non-collection row just once
1817  [&] { execute_import_lambda(ogr_geometry); });
1818  }
1819 
1820  // done
1821  return us;
1822 }
1823 
1824 } // namespace
1825 
1827  int thread_id,
1828  Importer* importer,
1829  std::unique_ptr<char[]> scratch_buffer,
1830  size_t begin_pos,
1831  size_t end_pos,
1832  size_t total_size,
1833  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1834  size_t first_row_index_this_buffer,
1835  const Catalog_Namespace::SessionInfo* session_info,
1836  Executor* executor) {
1837  ImportStatus thread_import_status;
1838  int64_t total_get_row_time_us = 0;
1839  int64_t total_str_to_val_time_us = 0;
1840  auto query_session = session_info ? session_info->get_session_id() : "";
1841  CHECK(scratch_buffer);
1842  auto buffer = scratch_buffer.get();
1843  auto load_ms = measure<>::execution([]() {});
1844 
1845  thread_import_status.thread_id = thread_id;
1846 
1847  auto ms = measure<>::execution([&]() {
1848  const CopyParams& copy_params = importer->get_copy_params();
1849  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1850  size_t begin =
1851  delimited_parser::find_beginning(buffer, begin_pos, end_pos, copy_params);
1852  const char* thread_buf = buffer + begin_pos + begin;
1853  const char* thread_buf_end = buffer + end_pos;
1854  const char* buf_end = buffer + total_size;
1855  bool try_single_thread = false;
1856  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1857  importer->get_import_buffers(thread_id);
1859  int phys_cols = 0;
1860  int point_cols = 0;
1861  for (const auto cd : col_descs) {
1862  const auto& col_ti = cd->columnType;
1863  phys_cols += col_ti.get_physical_cols();
1864  if (cd->columnType.get_type() == kPOINT) {
1865  point_cols++;
1866  }
1867  }
1868  auto num_cols = col_descs.size() - phys_cols;
1869  for (const auto& p : import_buffers) {
1870  p->clear();
1871  }
1872  std::vector<std::string_view> row;
1873  size_t row_index_plus_one = 0;
1874  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1875  row.clear();
1876  std::vector<std::unique_ptr<char[]>>
1877  tmp_buffers; // holds string w/ removed escape chars, etc
1878  if (DEBUG_TIMING) {
1881  thread_buf_end,
1882  buf_end,
1883  copy_params,
1884  importer->get_is_array(),
1885  row,
1886  tmp_buffers,
1887  try_single_thread,
1888  true);
1889  });
1890  total_get_row_time_us += us;
1891  } else {
1893  thread_buf_end,
1894  buf_end,
1895  copy_params,
1896  importer->get_is_array(),
1897  row,
1898  tmp_buffers,
1899  try_single_thread,
1900  true);
1901  }
1902  row_index_plus_one++;
1903  // Each POINT could consume two separate coords instead of a single WKT
1904  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1905  thread_import_status.rows_rejected++;
1906  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1907  << row.size() << "): " << shared::printContainer(row);
1908  if (thread_import_status.rows_rejected > copy_params.max_reject) {
1909  break;
1910  }
1911  continue;
1912  }
1913 
1914  //
1915  // lambda for importing a row (perhaps multiple times if exploding a collection)
1916  //
1917 
1918  auto execute_import_row = [&](OGRGeometry* import_geometry) {
1919  size_t import_idx = 0;
1920  size_t col_idx = 0;
1921  try {
1922  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1923  auto cd = *cd_it;
1924  const auto& col_ti = cd->columnType;
1925 
1926  bool is_null =
1927  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1928  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1929  // So initially nullness may be missed and not passed to add_value,
1930  // which then might also check and still decide it's actually a NULL, e.g.
1931  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1932  // So "NULL" is not recognized as NULL but then it's not recognized as
1933  // a valid kINT, so it's a NULL after all.
1934  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1935 
1936  // Treating empty as NULL
1937  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1938  is_null = true;
1939  }
1940 
1941  if (col_ti.get_physical_cols() == 0) {
1942  // not geo
1943 
1944  import_buffers[col_idx]->add_value(
1945  cd, row[import_idx], is_null, copy_params);
1946 
1947  // next
1948  ++import_idx;
1949  ++col_idx;
1950  } else {
1951  // geo
1952 
1953  // store null string in the base column
1954  import_buffers[col_idx]->add_value(
1955  cd, copy_params.null_str, true, copy_params);
1956 
1957  // WKT from string we're not storing
1958  auto const& geo_string = row[import_idx];
1959 
1960  // next
1961  ++import_idx;
1962  ++col_idx;
1963 
1964  SQLTypes col_type = col_ti.get_type();
1965  CHECK(IS_GEO(col_type));
1966 
1967  std::vector<double> coords;
1968  std::vector<double> bounds;
1969  std::vector<int> ring_sizes;
1970  std::vector<int> poly_rings;
1971  int render_group = 0;
1972 
1973  // if this is a POINT column, and the field is not null, and
1974  // looks like a scalar numeric value (and not a hex blob)
1975  // attempt to import two columns as lon/lat (or lat/lon)
1976  if (col_type == kPOINT && !is_null && geo_string.size() > 0 &&
1977  (geo_string[0] == '.' || isdigit(geo_string[0]) ||
1978  geo_string[0] == '-') &&
1979  geo_string.find_first_of("ABCDEFabcdef") == std::string::npos) {
1980  double lon = std::atof(std::string(geo_string).c_str());
1981  double lat = NAN;
1982  auto lat_str = row[import_idx];
1983  ++import_idx;
1984  if (lat_str.size() > 0 &&
1985  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1986  lat = std::atof(std::string(lat_str).c_str());
1987  }
1988  // Swap coordinates if this table uses a reverse order: lat/lon
1989  if (!copy_params.lonlat) {
1990  std::swap(lat, lon);
1991  }
1992  // TODO: should check if POINT column should have been declared with
1993  // SRID WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1994  // throw std::runtime_error("POINT column " + cd->columnName + " is
1995  // not WGS84, cannot insert lon/lat");
1996  // }
1997  SQLTypeInfo import_ti{col_ti};
1998  if (copy_params.file_type == FileType::DELIMITED &&
1999  import_ti.get_output_srid() == 4326) {
2000  auto srid0 = copy_params.source_srid;
2001  if (srid0 > 0) {
2002  // srid0 -> 4326 transform is requested on import
2003  import_ti.set_input_srid(srid0);
2004  }
2005  }
2006  if (!importGeoFromLonLat(lon, lat, coords, import_ti)) {
2007  throw std::runtime_error(
2008  "Cannot read lon/lat to insert into POINT column " +
2009  cd->columnName);
2010  }
2011  } else {
2012  // import it
2013  SQLTypeInfo import_ti{col_ti};
2014  if (copy_params.file_type == FileType::DELIMITED &&
2015  import_ti.get_output_srid() == 4326) {
2016  auto srid0 = copy_params.source_srid;
2017  if (srid0 > 0) {
2018  // srid0 -> 4326 transform is requested on import
2019  import_ti.set_input_srid(srid0);
2020  }
2021  }
2022  if (is_null) {
2023  if (col_ti.get_notnull()) {
2024  throw std::runtime_error("NULL geo for column " + cd->columnName);
2025  }
2027  import_ti,
2028  coords,
2029  bounds,
2030  ring_sizes,
2031  poly_rings,
2033  } else {
2034  if (import_geometry) {
2035  // geometry already exploded
2037  import_geometry,
2038  import_ti,
2039  coords,
2040  bounds,
2041  ring_sizes,
2042  poly_rings,
2044  std::string msg =
2045  "Failed to extract valid geometry from exploded row " +
2046  std::to_string(first_row_index_this_buffer +
2047  row_index_plus_one) +
2048  " for column " + cd->columnName;
2049  throw std::runtime_error(msg);
2050  }
2051  } else {
2052  // extract geometry directly from WKT
2054  std::string(geo_string),
2055  import_ti,
2056  coords,
2057  bounds,
2058  ring_sizes,
2059  poly_rings,
2061  std::string msg = "Failed to extract valid geometry from row " +
2062  std::to_string(first_row_index_this_buffer +
2063  row_index_plus_one) +
2064  " for column " + cd->columnName;
2065  throw std::runtime_error(msg);
2066  }
2067  }
2068 
2069  // validate types
2070  if (col_type != import_ti.get_type()) {
2072  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2073  col_type == SQLTypes::kMULTIPOLYGON)) {
2074  throw std::runtime_error(
2075  "Imported geometry doesn't match the type of column " +
2076  cd->columnName);
2077  }
2078  }
2079  }
2080 
2081  // assign render group?
2082  if (columnIdToRenderGroupAnalyzerMap.size()) {
2083  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2084  if (ring_sizes.size()) {
2085  // get a suitable render group for these poly coords
2086  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2087  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2088  render_group =
2089  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2090  } else {
2091  // empty poly
2092  render_group = -1;
2093  }
2094  }
2095  }
2096  }
2097 
2098  // import extracted geo
2100  cd,
2101  import_buffers,
2102  col_idx,
2103  coords,
2104  bounds,
2105  ring_sizes,
2106  poly_rings,
2107  render_group);
2108 
2109  // skip remaining physical columns
2110  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2111  ++cd_it;
2112  }
2113  }
2114  }
2115  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2116  checkInterrupt(query_session, executor))) {
2117  thread_import_status.load_failed = true;
2118  thread_import_status.load_msg =
2119  "Table load was cancelled via Query Interrupt";
2120  return;
2121  }
2122  thread_import_status.rows_completed++;
2123  } catch (const std::exception& e) {
2124  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2125  import_buffers[col_idx_to_pop]->pop_value();
2126  }
2127  thread_import_status.rows_rejected++;
2128  LOG(ERROR) << "Input exception thrown: " << e.what()
2129  << ". Row discarded. Data: " << shared::printContainer(row);
2130  if (thread_import_status.rows_rejected > copy_params.max_reject) {
2131  LOG(ERROR) << "Load was cancelled due to max reject rows being reached";
2132  thread_import_status.load_failed = true;
2133  thread_import_status.load_msg =
2134  "Load was cancelled due to max reject rows being reached";
2135  }
2136  }
2137  }; // End of lambda
2138 
2139  if (copy_params.geo_explode_collections) {
2140  // explode and import
2141  auto const [collection_col_idx, collection_child_type, collection_col_name] =
2142  explode_collections_step1(col_descs);
2143  // pull out the collection WKT or WKB hex
2144  CHECK_LT(collection_col_idx, (int)row.size()) << "column index out of range";
2145  auto const& collection_geo_string = row[collection_col_idx];
2146  // convert to OGR
2147  OGRGeometry* ogr_geometry = nullptr;
2148  ScopeGuard destroy_ogr_geometry = [&] {
2149  if (ogr_geometry) {
2150  OGRGeometryFactory::destroyGeometry(ogr_geometry);
2151  }
2152  };
2154  std::string(collection_geo_string));
2155  // do the explode and import
2156  us = explode_collections_step2(ogr_geometry,
2157  collection_child_type,
2158  collection_col_name,
2159  first_row_index_this_buffer + row_index_plus_one,
2160  execute_import_row);
2161  } else {
2162  // import non-collection row just once
2164  [&] { execute_import_row(nullptr); });
2165  }
2166 
2167  if (thread_import_status.load_failed) {
2168  break;
2169  }
2170  } // end thread
2171  total_str_to_val_time_us += us;
2172  if (!thread_import_status.load_failed && thread_import_status.rows_completed > 0) {
2173  load_ms = measure<>::execution([&]() {
2174  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2175  });
2176  }
2177  }); // end execution
2178 
2179  if (DEBUG_TIMING && !thread_import_status.load_failed &&
2180  thread_import_status.rows_completed > 0) {
2181  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2182  << thread_import_status.rows_completed << " rows inserted in "
2183  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2184  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2185  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2186  << "sec" << std::endl;
2187  }
2188 
2189  return thread_import_status;
2190 }
2191 
2193  int thread_id,
2194  Importer* importer,
2195  OGRSpatialReference* poGeographicSR,
2196  const FeaturePtrVector& features,
2197  size_t firstFeature,
2198  size_t numFeatures,
2199  const FieldNameToIndexMapType& fieldNameToIndexMap,
2200  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2201  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
2202  const Catalog_Namespace::SessionInfo* session_info,
2203  Executor* executor) {
2204  ImportStatus thread_import_status;
2205  const CopyParams& copy_params = importer->get_copy_params();
2206  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2207  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2208  importer->get_import_buffers(thread_id);
2209  auto query_session = session_info ? session_info->get_session_id() : "";
2210  for (const auto& p : import_buffers) {
2211  p->clear();
2212  }
2213 
2214  auto convert_timer = timer_start();
2215 
2216  // we create this on the fly based on the first feature's SR
2217  std::unique_ptr<OGRCoordinateTransformation> coordinate_transformation;
2218  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2219  if (!features[iFeature]) {
2220  continue;
2221  }
2222 
2223  // get this feature's geometry
2224  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2225  if (pGeometry) {
2226  // for geodatabase, we need to consider features with no geometry
2227  // as we still want to create a table, even if it has no geo column
2228 
2229  // transform it
2230  // avoid GDAL error if not transformable
2231  auto geometry_sr = pGeometry->getSpatialReference();
2232  if (geometry_sr) {
2233  // create an OGRCoordinateTransformation (CT) on the fly
2234  // we must assume that all geo in this file will have
2235  // the same source SR, so the CT will be valid for all
2236  // transforming to a reusable CT is faster than to an SR
2237  if (coordinate_transformation == nullptr) {
2238  coordinate_transformation.reset(
2239  OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR));
2240  if (coordinate_transformation == nullptr) {
2241  throw std::runtime_error(
2242  "Failed to create a GDAL CoordinateTransformation for incoming geo");
2243  }
2244  }
2245  pGeometry->transform(coordinate_transformation.get());
2246  }
2247  }
2248 
2249  //
2250  // lambda for importing a feature (perhaps multiple times if exploding a collection)
2251  //
2252 
2253  auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2254  size_t col_idx = 0;
2255  try {
2256  if (UNLIKELY((thread_import_status.rows_completed & 0xFFFF) == 0 &&
2257  checkInterrupt(query_session, executor))) {
2258  thread_import_status.load_failed = true;
2259  thread_import_status.load_msg = "Table load was cancelled via Query Interrupt";
2261  }
2262  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2263  auto cd = *cd_it;
2264 
2265  // is this a geo column?
2266  const auto& col_ti = cd->columnType;
2267  if (col_ti.is_geometry()) {
2268  // Note that this assumes there is one and only one geo column in the
2269  // table. Currently, the importer only supports reading a single
2270  // geospatial feature from an input shapefile / geojson file, but this
2271  // code will need to be modified if that changes
2272  SQLTypes col_type = col_ti.get_type();
2273 
2274  // store null string in the base column
2275  import_buffers[col_idx]->add_value(
2276  cd, copy_params.null_str, true, copy_params);
2277  ++col_idx;
2278 
2279  // the data we now need to extract for the other columns
2280  std::vector<double> coords;
2281  std::vector<double> bounds;
2282  std::vector<int> ring_sizes;
2283  std::vector<int> poly_rings;
2284  int render_group = 0;
2285 
2286  // extract it
2287  SQLTypeInfo import_ti{col_ti};
2288  bool is_null_geo = !import_geometry;
2289  if (is_null_geo) {
2290  if (col_ti.get_notnull()) {
2291  throw std::runtime_error("NULL geo for column " + cd->columnName);
2292  }
2294  import_ti,
2295  coords,
2296  bounds,
2297  ring_sizes,
2298  poly_rings,
2300  } else {
2302  import_geometry,
2303  import_ti,
2304  coords,
2305  bounds,
2306  ring_sizes,
2307  poly_rings,
2309  std::string msg = "Failed to extract valid geometry from feature " +
2310  std::to_string(firstFeature + iFeature + 1) +
2311  " for column " + cd->columnName;
2312  throw std::runtime_error(msg);
2313  }
2314 
2315  // validate types
2316  if (col_type != import_ti.get_type()) {
2318  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2319  col_type == SQLTypes::kMULTIPOLYGON)) {
2320  throw std::runtime_error(
2321  "Imported geometry doesn't match the type of column " +
2322  cd->columnName);
2323  }
2324  }
2325  }
2326 
2327  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2328  if (ring_sizes.size()) {
2329  // get a suitable render group for these poly coords
2330  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2331  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2332  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2333  } else {
2334  // empty poly
2335  render_group = -1;
2336  }
2337  }
2338 
2339  // create coords array value and add it to the physical column
2340  ++cd_it;
2341  auto cd_coords = *cd_it;
2342  std::vector<TDatum> td_coord_data;
2343  if (!is_null_geo) {
2344  std::vector<uint8_t> compressed_coords =
2345  Geospatial::compress_coords(coords, col_ti);
2346  for (auto cc : compressed_coords) {
2347  TDatum td_byte;
2348  td_byte.val.int_val = cc;
2349  td_coord_data.push_back(td_byte);
2350  }
2351  }
2352  TDatum tdd_coords;
2353  tdd_coords.val.arr_val = td_coord_data;
2354  tdd_coords.is_null = is_null_geo;
2355  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2356  ++col_idx;
2357 
2358  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2359  // Create ring_sizes array value and add it to the physical column
2360  ++cd_it;
2361  auto cd_ring_sizes = *cd_it;
2362  std::vector<TDatum> td_ring_sizes;
2363  if (!is_null_geo) {
2364  for (auto ring_size : ring_sizes) {
2365  TDatum td_ring_size;
2366  td_ring_size.val.int_val = ring_size;
2367  td_ring_sizes.push_back(td_ring_size);
2368  }
2369  }
2370  TDatum tdd_ring_sizes;
2371  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2372  tdd_ring_sizes.is_null = is_null_geo;
2373  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2374  ++col_idx;
2375  }
2376 
2377  if (col_type == kMULTIPOLYGON) {
2378  // Create poly_rings array value and add it to the physical column
2379  ++cd_it;
2380  auto cd_poly_rings = *cd_it;
2381  std::vector<TDatum> td_poly_rings;
2382  if (!is_null_geo) {
2383  for (auto num_rings : poly_rings) {
2384  TDatum td_num_rings;
2385  td_num_rings.val.int_val = num_rings;
2386  td_poly_rings.push_back(td_num_rings);
2387  }
2388  }
2389  TDatum tdd_poly_rings;
2390  tdd_poly_rings.val.arr_val = td_poly_rings;
2391  tdd_poly_rings.is_null = is_null_geo;
2392  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2393  ++col_idx;
2394  }
2395 
2396  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2397  col_type == kMULTIPOLYGON) {
2398  // Create bounds array value and add it to the physical column
2399  ++cd_it;
2400  auto cd_bounds = *cd_it;
2401  std::vector<TDatum> td_bounds_data;
2402  if (!is_null_geo) {
2403  for (auto b : bounds) {
2404  TDatum td_double;
2405  td_double.val.real_val = b;
2406  td_bounds_data.push_back(td_double);
2407  }
2408  }
2409  TDatum tdd_bounds;
2410  tdd_bounds.val.arr_val = td_bounds_data;
2411  tdd_bounds.is_null = is_null_geo;
2412  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2413  ++col_idx;
2414  }
2415 
2416  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2417  // Create render_group value and add it to the physical column
2418  ++cd_it;
2419  auto cd_render_group = *cd_it;
2420  TDatum td_render_group;
2421  td_render_group.val.int_val = render_group;
2422  td_render_group.is_null = is_null_geo;
2423  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2424  ++col_idx;
2425  }
2426  } else {
2427  // regular column
2428  // pull from GDAL metadata
2429  auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2430  CHECK(cit != columnNameToSourceNameMap.end());
2431  auto const& field_name = cit->second;
2432 
2433  auto const fit = fieldNameToIndexMap.find(field_name);
2434  CHECK(fit != fieldNameToIndexMap.end());
2435  auto const& field_index = fit->second;
2436  CHECK(field_index < fieldNameToIndexMap.size());
2437 
2438  auto const& feature = features[iFeature];
2439 
2440  auto field_defn = feature->GetFieldDefnRef(field_index);
2441  CHECK(field_defn);
2442 
2443  // OGRFeature::GetFieldAsString() can only return 80 characters
2444  // so for array columns, we are obliged to fetch the actual values
2445  // and construct the concatenated string ourselves
2446 
2447  std::string value_string;
2448  int array_index = 0, array_size = 0;
2449 
2450  auto stringify_numeric_list = [&](auto* values) {
2451  value_string = "{";
2452  while (array_index < array_size) {
2453  auto separator = (array_index > 0) ? "," : "";
2454  value_string += separator + std::to_string(values[array_index]);
2455  array_index++;
2456  }
2457  value_string += "}";
2458  };
2459 
2460  auto field_type = field_defn->GetType();
2461  switch (field_type) {
2462  case OFTInteger:
2463  case OFTInteger64:
2464  case OFTReal:
2465  case OFTString:
2466  case OFTBinary:
2467  case OFTDate:
2468  case OFTTime:
2469  case OFTDateTime: {
2470  value_string = feature->GetFieldAsString(field_index);
2471  } break;
2472  case OFTIntegerList: {
2473  auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2474  stringify_numeric_list(values);
2475  } break;
2476  case OFTInteger64List: {
2477  auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2478  stringify_numeric_list(values);
2479  } break;
2480  case OFTRealList: {
2481  auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2482  stringify_numeric_list(values);
2483  } break;
2484  case OFTStringList: {
2485  auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2486  value_string = "{";
2487  if (array_of_strings) {
2488  while (auto* this_string = array_of_strings[array_index]) {
2489  auto separator = (array_index > 0) ? "," : "";
2490  value_string += separator + std::string(this_string);
2491  array_index++;
2492  }
2493  }
2494  value_string += "}";
2495  } break;
2496  default:
2497  throw std::runtime_error("Unsupported geo file field type (" +
2498  std::to_string(static_cast<int>(field_type)) +
2499  ")");
2500  }
2501 
2502  static CopyParams default_copy_params;
2503  import_buffers[col_idx]->add_value(
2504  cd, value_string, false, default_copy_params);
2505  ++col_idx;
2506  }
2507  }
2508  thread_import_status.rows_completed++;
2509  } catch (QueryExecutionError& e) {
2511  throw e;
2512  }
2513  } catch (const std::exception& e) {
2514  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2515  import_buffers[col_idx_to_pop]->pop_value();
2516  }
2517  thread_import_status.rows_rejected++;
2518  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2519  }
2520  };
2521 
2522  if (pGeometry && copy_params.geo_explode_collections) {
2523  // explode and import
2524  auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2525  explode_collections_step1(col_descs);
2526  explode_collections_step2(pGeometry,
2527  collection_child_type,
2528  collection_col_name,
2529  firstFeature + iFeature + 1,
2530  execute_import_feature);
2531  } else {
2532  // import non-collection or null feature just once
2533  execute_import_feature(pGeometry);
2534  }
2535  } // end features
2536 
2537  float convert_ms =
2538  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2539  convert_timer)) /
2540  1000.0f;
2541 
2542  float load_ms = 0.0f;
2543  if (thread_import_status.rows_completed > 0) {
2544  auto load_timer = timer_start();
2545  importer->load(import_buffers, thread_import_status.rows_completed, session_info);
2546  load_ms =
2547  float(
2548  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2549  load_timer)) /
2550  1000.0f;
2551  }
2552 
2553  if (DEBUG_TIMING && thread_import_status.rows_completed > 0) {
2554  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2555  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2556  }
2557 
2558  thread_import_status.thread_id = thread_id;
2559 
2560  if (DEBUG_TIMING) {
2561  LOG(INFO) << "DEBUG: Total "
2562  << float(timer_stop<std::chrono::steady_clock::time_point,
2563  std::chrono::microseconds>(convert_timer)) /
2564  1000.0f
2565  << "ms";
2566  }
2567 
2568  return thread_import_status;
2569 }
2570 
2572  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2573  size_t row_count,
2574  const Catalog_Namespace::SessionInfo* session_info) {
2575  return loadImpl(import_buffers, row_count, false, session_info);
2576 }
2577 
2578 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2579  size_t row_count,
2580  const Catalog_Namespace::SessionInfo* session_info) {
2581  return loadImpl(import_buffers, row_count, true, session_info);
2582 }
2583 
2584 namespace {
2585 
2586 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2587  const auto& ti = import_buffer.getTypeInfo();
2588  const int8_t* values_buffer{nullptr};
2589  if (ti.is_string()) {
2590  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2591  values_buffer = import_buffer.getStringDictBuffer();
2592  } else {
2593  values_buffer = import_buffer.getAsBytes();
2594  }
2595  CHECK(values_buffer);
2596  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2597  switch (logical_size) {
2598  case 1: {
2599  return values_buffer[index];
2600  }
2601  case 2: {
2602  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2603  }
2604  case 4: {
2605  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2606  }
2607  case 8: {
2608  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2609  }
2610  default:
2611  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2612  }
2613  UNREACHABLE();
2614  return 0;
2615 }
2616 
2617 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2618  const auto& ti = import_buffer.getTypeInfo();
2619  CHECK_EQ(kFLOAT, ti.get_type());
2620  const auto values_buffer = import_buffer.getAsBytes();
2621  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2622 }
2623 
2624 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2625  const auto& ti = import_buffer.getTypeInfo();
2626  CHECK_EQ(kDOUBLE, ti.get_type());
2627  const auto values_buffer = import_buffer.getAsBytes();
2628  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2629 }
2630 
2631 } // namespace
2632 
2633 void Loader::fillShardRow(const size_t row_index,
2634  OneShardBuffers& shard_output_buffers,
2635  const OneShardBuffers& import_buffers) {
2636  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2637  const auto& input_buffer = import_buffers[col_idx];
2638  const auto& col_ti = input_buffer->getTypeInfo();
2639  const auto type =
2640  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2641 
2642  switch (type) {
2643  case kBOOLEAN:
2644  shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, row_index));
2645  break;
2646  case kTINYINT:
2647  shard_output_buffers[col_idx]->addTinyint(int_value_at(*input_buffer, row_index));
2648  break;
2649  case kSMALLINT:
2650  shard_output_buffers[col_idx]->addSmallint(
2651  int_value_at(*input_buffer, row_index));
2652  break;
2653  case kINT:
2654  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2655  break;
2656  case kBIGINT:
2657  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2658  break;
2659  case kFLOAT:
2660  shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, row_index));
2661  break;
2662  case kDOUBLE:
2663  shard_output_buffers[col_idx]->addDouble(
2664  double_value_at(*input_buffer, row_index));
2665  break;
2666  case kTEXT:
2667  case kVARCHAR:
2668  case kCHAR: {
2669  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2670  shard_output_buffers[col_idx]->addString(
2671  (*input_buffer->getStringBuffer())[row_index]);
2672  break;
2673  }
2674  case kTIME:
2675  case kTIMESTAMP:
2676  case kDATE:
2677  shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, row_index));
2678  break;
2679  case kARRAY:
2680  if (IS_STRING(col_ti.get_subtype())) {
2681  CHECK(input_buffer->getStringArrayBuffer());
2682  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2683  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2684  shard_output_buffers[col_idx]->addStringArray(input_arr);
2685  } else {
2686  shard_output_buffers[col_idx]->addArray(
2687  (*input_buffer->getArrayBuffer())[row_index]);
2688  }
2689  break;
2690  case kPOINT:
2691  case kLINESTRING:
2692  case kPOLYGON:
2693  case kMULTIPOLYGON: {
2694  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2695  shard_output_buffers[col_idx]->addGeoString(
2696  (*input_buffer->getGeoStringBuffer())[row_index]);
2697  break;
2698  }
2699  default:
2700  CHECK(false);
2701  }
2702  }
2703 }
2704 
2706  std::vector<OneShardBuffers>& all_shard_import_buffers,
2707  std::vector<size_t>& all_shard_row_counts,
2708  const OneShardBuffers& import_buffers,
2709  const size_t row_count,
2710  const size_t shard_count,
2711  const Catalog_Namespace::SessionInfo* session_info) {
2712  int col_idx{0};
2713  const ColumnDescriptor* shard_col_desc{nullptr};
2714  for (const auto col_desc : column_descs_) {
2715  ++col_idx;
2716  if (col_idx == table_desc_->shardedColumnId) {
2717  shard_col_desc = col_desc;
2718  break;
2719  }
2720  }
2721  CHECK(shard_col_desc);
2722  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2723  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2724  const auto& shard_col_ti = shard_col_desc->columnType;
2725  CHECK(shard_col_ti.is_integer() ||
2726  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2727  shard_col_ti.is_time());
2728  if (shard_col_ti.is_string()) {
2729  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2730  CHECK(payloads_ptr);
2731  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2732  }
2733 
2734  for (size_t i = 0; i < row_count; ++i) {
2735  const size_t shard =
2736  SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2737  auto& shard_output_buffers = all_shard_import_buffers[shard];
2738  fillShardRow(i, shard_output_buffers, import_buffers);
2739  ++all_shard_row_counts[shard];
2740  }
2741 }
2742 
2744  std::vector<OneShardBuffers>& all_shard_import_buffers,
2745  std::vector<size_t>& all_shard_row_counts,
2746  const OneShardBuffers& import_buffers,
2747  const size_t row_count,
2748  const size_t shard_count,
2749  const Catalog_Namespace::SessionInfo* session_info) {
2750  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2751  CHECK(shard_tds.size() == shard_count);
2752 
2753  for (size_t shard = 0; shard < shard_count; ++shard) {
2754  auto& shard_output_buffers = all_shard_import_buffers[shard];
2755  if (row_count != 0) {
2756  fillShardRow(0, shard_output_buffers, import_buffers);
2757  }
2758  // when replicating a column, row count of a shard == replicate count of the column
2759  // on the shard
2760  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2761  }
2762 }
2763 
2764 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2765  std::vector<size_t>& all_shard_row_counts,
2766  const OneShardBuffers& import_buffers,
2767  const size_t row_count,
2768  const size_t shard_count,
2769  const Catalog_Namespace::SessionInfo* session_info) {
2770  all_shard_row_counts.resize(shard_count);
2771  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2772  all_shard_import_buffers.emplace_back();
2773  for (const auto& typed_import_buffer : import_buffers) {
2774  all_shard_import_buffers.back().emplace_back(
2775  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2776  typed_import_buffer->getStringDictionary()));
2777  }
2778  }
2780  if (isAddingColumns()) {
2781  distributeToShardsNewColumns(all_shard_import_buffers,
2782  all_shard_row_counts,
2783  import_buffers,
2784  row_count,
2785  shard_count,
2786  session_info);
2787  } else {
2788  distributeToShardsExistingColumns(all_shard_import_buffers,
2789  all_shard_row_counts,
2790  import_buffers,
2791  row_count,
2792  shard_count,
2793  session_info);
2794  }
2795 }
2796 
2798  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2799  size_t row_count,
2800  bool checkpoint,
2801  const Catalog_Namespace::SessionInfo* session_info) {
2802  if (load_callback_) {
2803  auto data_blocks = TypedImportBuffer::get_data_block_pointers(import_buffers);
2804  return load_callback_(import_buffers, data_blocks, row_count);
2805  }
2806  if (table_desc_->nShards) {
2807  std::vector<OneShardBuffers> all_shard_import_buffers;
2808  std::vector<size_t> all_shard_row_counts;
2809  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2810  distributeToShards(all_shard_import_buffers,
2811  all_shard_row_counts,
2812  import_buffers,
2813  row_count,
2814  shard_tables.size(),
2815  session_info);
2816  bool success = true;
2817  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2818  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2819  all_shard_row_counts[shard_idx],
2820  shard_tables[shard_idx],
2821  checkpoint,
2822  session_info);
2823  }
2824  return success;
2825  }
2826  return loadToShard(import_buffers, row_count, table_desc_, checkpoint, session_info);
2827 }
2828 
2830  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2831  std::vector<DataBlockPtr> result(import_buffers.size());
2832  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2833  encoded_data_block_ptrs_futures;
2834  // make all async calls to string dictionary here and then continue execution
2835  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2836  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2837  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2838  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2839  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2840 
2841  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2842  buf_idx,
2843  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2844  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2845  return import_buffers[buf_idx]->getStringDictBuffer();
2846  })));
2847  }
2848  }
2849 
2850  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2851  DataBlockPtr p;
2852  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2853  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2854  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2855  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2856  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2857  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2858  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2859  p.stringsPtr = string_payload_ptr;
2860  } else {
2861  // This condition means we have column which is ENCODED string. We already made
2862  // Async request to gain the encoded integer values above so we should skip this
2863  // iteration and continue.
2864  continue;
2865  }
2866  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2867  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2868  p.stringsPtr = geo_payload_ptr;
2869  } else {
2870  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2871  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2872  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2873  import_buffers[buf_idx]->addDictEncodedStringArray(
2874  *import_buffers[buf_idx]->getStringArrayBuffer());
2875  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2876  } else {
2877  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2878  }
2879  }
2880  result[buf_idx] = p;
2881  }
2882 
2883  // wait for the async requests we made for string dictionary
2884  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2885  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2886  }
2887  return result;
2888 }
2889 
2891  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2892  size_t row_count,
2893  const TableDescriptor* shard_table,
2894  bool checkpoint,
2895  const Catalog_Namespace::SessionInfo* session_info) {
2896  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2898  ins_data.numRows = row_count;
2899  bool success = false;
2900  try {
2901  ins_data.data = TypedImportBuffer::get_data_block_pointers(import_buffers);
2902  } catch (std::exception& e) {
2903  std::ostringstream oss;
2904  oss << "Exception when loading Table " << shard_table->tableName << ", issue was "
2905  << e.what();
2906 
2907  LOG(ERROR) << oss.str();
2908  error_msg_ = oss.str();
2909  return success;
2910  }
2911  if (isAddingColumns()) {
2912  // when Adding columns we omit any columns except the ones being added
2913  ins_data.columnIds.clear();
2914  ins_data.is_default.clear();
2915  for (auto& buffer : import_buffers) {
2916  ins_data.columnIds.push_back(buffer->getColumnDesc()->columnId);
2917  ins_data.is_default.push_back(true);
2918  }
2919  } else {
2920  ins_data.is_default.resize(ins_data.columnIds.size(), false);
2921  }
2922  // release loader_lock so that in InsertOrderFragmenter::insertDat
2923  // we can have multiple threads sort/shuffle InsertData
2924  loader_lock.unlock();
2925  success = true;
2926  {
2927  try {
2928  if (checkpoint) {
2929  shard_table->fragmenter->insertData(ins_data);
2930  } else {
2931  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2932  }
2933  } catch (std::exception& e) {
2934  std::ostringstream oss;
2935  oss << "Fragmenter Insert Exception when processing Table "
2936  << shard_table->tableName << " issue was " << e.what();
2937 
2938  LOG(ERROR) << oss.str();
2939  loader_lock.lock();
2940  error_msg_ = oss.str();
2941  success = false;
2942  }
2943  }
2944  return success;
2945 }
2946 
2947 void Loader::dropColumns(const std::vector<int>& columnIds) {
2948  std::vector<const TableDescriptor*> table_descs(1, table_desc_);
2949  if (table_desc_->nShards) {
2951  }
2952  for (auto table_desc : table_descs) {
2953  table_desc->fragmenter->dropColumns(columnIds);
2954  }
2955 }
2956 
2957 void Loader::init(const bool use_catalog_locks) {
2960  for (auto cd : column_descs_) {
2961  insert_data_.columnIds.push_back(cd->columnId);
2962  if (cd->columnType.get_compression() == kENCODING_DICT) {
2963  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2964  const auto dd = use_catalog_locks
2965  ? catalog_.getMetadataForDict(cd->columnType.get_comp_param())
2967  cd->columnType.get_comp_param(), true);
2968  CHECK(dd);
2969  dict_map_[cd->columnId] = dd->stringDict.get();
2970  }
2971  }
2972  insert_data_.numRows = 0;
2973 }
2974 
2977  split_raw_data();
2979 }
2980 
2982  const std::string& file_path,
2983  const bool decompressed,
2984  const Catalog_Namespace::SessionInfo* session_info) {
2985  // we do not check interrupt status for this detection
2986  if (!p_file) {
2987  p_file = fopen(file_path.c_str(), "rb");
2988  }
2989  if (!p_file) {
2990  throw std::runtime_error("failed to open file '" + file_path +
2991  "': " + strerror(errno));
2992  }
2993 
2994  // somehow clang does not support ext/stdio_filebuf.h, so
2995  // need to diy readline with customized copy_params.line_delim...
2996  std::string line;
2997  line.reserve(1 * 1024 * 1024);
2998  auto end_time = std::chrono::steady_clock::now() +
2999  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
3000  try {
3001  while (!feof(p_file)) {
3002  int c;
3003  size_t n = 0;
3004  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
3005  if (n++ >= line.capacity()) {
3006  break;
3007  }
3008  line += c;
3009  }
3010  if (0 == n) {
3011  break;
3012  }
3013  // remember the first line, which is possibly a header line, to
3014  // ignore identical header line(s) in 2nd+ files of a archive;
3015  // otherwise, 2nd+ header may be mistaken as an all-string row
3016  // and so be final column types.
3017  if (line1.empty()) {
3018  line1 = line;
3019  } else if (line == line1) {
3020  line.clear();
3021  continue;
3022  }
3023 
3024  raw_data += line;
3026  line.clear();
3028  if (std::chrono::steady_clock::now() > end_time) {
3029  if (import_status_.rows_completed > 10000) {
3030  break;
3031  }
3032  }
3033  }
3034  } catch (std::exception& e) {
3035  }
3036 
3037  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3038  import_status_.load_failed = true;
3039 
3040  fclose(p_file);
3041  p_file = nullptr;
3042  return import_status_;
3043 }
3044 
3046  // this becomes analogous to Importer::import()
3047  (void)DataStreamSink::archivePlumber(nullptr);
3048 }
3049 
3051  if (copy_params.delimiter == '\0') {
3052  copy_params.delimiter = ',';
3053  if (boost::filesystem::extension(file_path) == ".tsv") {
3054  copy_params.delimiter = '\t';
3055  }
3056  }
3057 }
3058 
3060  const char* buf = raw_data.c_str();
3061  const char* buf_end = buf + raw_data.size();
3062  bool try_single_thread = false;
3063  for (const char* p = buf; p < buf_end; p++) {
3064  std::vector<std::string> row;
3065  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3067  buf_end,
3068  buf_end,
3069  copy_params,
3070  nullptr,
3071  row,
3072  tmp_buffers,
3073  try_single_thread,
3074  true);
3075  raw_rows.push_back(row);
3076  if (try_single_thread) {
3077  break;
3078  }
3079  }
3080  if (try_single_thread) {
3081  copy_params.threads = 1;
3082  raw_rows.clear();
3083  for (const char* p = buf; p < buf_end; p++) {
3084  std::vector<std::string> row;
3085  std::vector<std::unique_ptr<char[]>> tmp_buffers;
3087  buf_end,
3088  buf_end,
3089  copy_params,
3090  nullptr,
3091  row,
3092  tmp_buffers,
3093  try_single_thread,
3094  true);
3095  raw_rows.push_back(row);
3096  }
3097  }
3098 }
3099 
3100 template <class T>
3101 bool try_cast(const std::string& str) {
3102  try {
3103  boost::lexical_cast<T>(str);
3104  } catch (const boost::bad_lexical_cast& e) {
3105  return false;
3106  }
3107  return true;
3108 }
3109 
3110 SQLTypes Detector::detect_sqltype(const std::string& str) {
3111  SQLTypes type = kTEXT;
3112  if (try_cast<double>(str)) {
3113  type = kDOUBLE;
3114  /*if (try_cast<bool>(str)) {
3115  type = kBOOLEAN;
3116  }*/
3117  if (try_cast<int16_t>(str)) {
3118  type = kSMALLINT;
3119  } else if (try_cast<int32_t>(str)) {
3120  type = kINT;
3121  } else if (try_cast<int64_t>(str)) {
3122  type = kBIGINT;
3123  } else if (try_cast<float>(str)) {
3124  type = kFLOAT;
3125  }
3126  }
3127 
3128  // check for geo types
3129  if (type == kTEXT) {
3130  // convert to upper case
3131  std::string str_upper_case = str;
3132  std::transform(
3133  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3134 
3135  // then test for leading words
3136  if (str_upper_case.find("POINT") == 0) {
3137  type = kPOINT;
3138  } else if (str_upper_case.find("LINESTRING") == 0) {
3139  type = kLINESTRING;
3140  } else if (str_upper_case.find("POLYGON") == 0) {
3142  type = kMULTIPOLYGON;
3143  } else {
3144  type = kPOLYGON;
3145  }
3146  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
3147  type = kMULTIPOLYGON;
3148  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
3149  std::string::npos &&
3150  (str_upper_case.size() % 2) == 0) {
3151  // simple hex blob (two characters per byte, not uu-encode or base64)
3152  if (str_upper_case.size() >= 10) {
3153  // match WKB blobs for supported geometry types
3154  // the first byte specifies if the data is big-endian or little-endian
3155  // the next four bytes are the geometry type (1 = POINT etc.)
3156  // @TODO support eWKB, which has extra bits set in the geometry type
3157  auto first_five_bytes = str_upper_case.substr(0, 10);
3158  if (first_five_bytes == "0000000001" || first_five_bytes == "0101000000") {
3159  type = kPOINT;
3160  } else if (first_five_bytes == "0000000002" || first_five_bytes == "0102000000") {
3161  type = kLINESTRING;
3162  } else if (first_five_bytes == "0000000003" || first_five_bytes == "0103000000") {
3163  type = kPOLYGON;
3164  } else if (first_five_bytes == "0000000006" || first_five_bytes == "0106000000") {
3165  type = kMULTIPOLYGON;
3166  } else {
3167  // unsupported WKB type
3168  return type;
3169  }
3170  } else {
3171  // too short to be WKB
3172  return type;
3173  }
3174  }
3175  }
3176 
3177  // check for time types
3178  if (type == kTEXT) {
3179  // This won't match unix timestamp, since floats and ints were checked above.
3180  if (dateTimeParseOptional<kTIME>(str, 0)) {
3181  type = kTIME;
3182  } else if (dateTimeParseOptional<kTIMESTAMP>(str, 0)) {
3183  type = kTIMESTAMP;
3184  } else if (dateTimeParseOptional<kDATE>(str, 0)) {
3185  type = kDATE;
3186  }
3187  }
3188 
3189  return type;
3190 }
3191 
3192 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
3193  std::vector<SQLTypes> types(row.size());
3194  for (size_t i = 0; i < row.size(); i++) {
3195  types[i] = detect_sqltype(row[i]);
3196  }
3197  return types;
3198 }
3199 
3201  static std::array<int, kSQLTYPE_LAST> typeorder;
3202  typeorder[kCHAR] = 0;
3203  typeorder[kBOOLEAN] = 2;
3204  typeorder[kSMALLINT] = 3;
3205  typeorder[kINT] = 4;
3206  typeorder[kBIGINT] = 5;
3207  typeorder[kFLOAT] = 6;
3208  typeorder[kDOUBLE] = 7;
3209  typeorder[kTIMESTAMP] = 8;
3210  typeorder[kTIME] = 9;
3211  typeorder[kDATE] = 10;
3212  typeorder[kPOINT] = 11;
3213  typeorder[kLINESTRING] = 11;
3214  typeorder[kPOLYGON] = 11;
3215  typeorder[kMULTIPOLYGON] = 11;
3216  typeorder[kTEXT] = 12;
3217 
3218  // note: b < a instead of a < b because the map is ordered most to least restrictive
3219  return typeorder[b] < typeorder[a];
3220 }
3221 
3224  best_encodings =
3225  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
3226  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
3227  switch (copy_params.has_header) {
3229  has_headers = detect_headers(head_types, best_sqltypes);
3230  if (has_headers) {
3232  } else {
3234  }
3235  break;
3237  has_headers = false;
3238  break;
3240  has_headers = true;
3241  break;
3242  }
3243 }
3244 
3247 }
3248 
3249 std::vector<SQLTypes> Detector::find_best_sqltypes(
3250  const std::vector<std::vector<std::string>>& raw_rows,
3251  const CopyParams& copy_params) {
3252  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
3253 }
3254 
3255 std::vector<SQLTypes> Detector::find_best_sqltypes(
3256  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3257  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3258  const CopyParams& copy_params) {
3259  if (raw_rows.size() < 1) {
3260  throw std::runtime_error("No rows found in: " +
3261  boost::filesystem::basename(file_path));
3262  }
3263  auto end_time = std::chrono::steady_clock::now() + timeout;
3264  size_t num_cols = raw_rows.front().size();
3265  std::vector<SQLTypes> best_types(num_cols, kCHAR);
3266  std::vector<size_t> non_null_col_counts(num_cols, 0);
3267  for (auto row = row_begin; row != row_end; row++) {
3268  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3269  best_types.push_back(kCHAR);
3270  non_null_col_counts.push_back(0);
3271  }
3272  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3273  // do not count nulls
3274  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
3275  continue;
3276  }
3277  SQLTypes t = detect_sqltype(row->at(col_idx));
3278  non_null_col_counts[col_idx]++;
3279  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
3280  best_types[col_idx] = t;
3281  }
3282  }
3283  if (std::chrono::steady_clock::now() > end_time) {
3284  break;
3285  }
3286  }
3287  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3288  // if we don't have any non-null values for this column make it text to be
3289  // safe b/c that is least restrictive type
3290  if (non_null_col_counts[col_idx] == 0) {
3291  best_types[col_idx] = kTEXT;
3292  }
3293  }
3294 
3295  return best_types;
3296 }
3297 
3298 std::vector<EncodingType> Detector::find_best_encodings(
3299  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3300  const std::vector<std::vector<std::string>>::const_iterator& row_end,
3301  const std::vector<SQLTypes>& best_types) {
3302  if (raw_rows.size() < 1) {
3303  throw std::runtime_error("No rows found in: " +
3304  boost::filesystem::basename(file_path));
3305  }
3306  size_t num_cols = best_types.size();
3307  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
3308  std::vector<size_t> num_rows_per_col(num_cols, 1);
3309  std::vector<std::unordered_set<std::string>> count_set(num_cols);
3310  for (auto row = row_begin; row != row_end; row++) {
3311  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3312  if (IS_STRING(best_types[col_idx])) {
3313  count_set[col_idx].insert(row->at(col_idx));
3314  num_rows_per_col[col_idx]++;
3315  }
3316  }
3317  }
3318  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3319  if (IS_STRING(best_types[col_idx])) {
3320  float uniqueRatio =
3321  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3322  if (uniqueRatio < 0.75) {
3323  best_encodes[col_idx] = kENCODING_DICT;
3324  }
3325  }
3326  }
3327  return best_encodes;
3328 }
3329 
3330 // detect_headers returns true if:
3331 // - all elements of the first argument are kTEXT
3332 // - there is at least one instance where tail_types is more restrictive than head_types
3333 // (ie, not kTEXT)
3334 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
3335  const std::vector<SQLTypes>& tail_types) {
3336  if (head_types.size() != tail_types.size()) {
3337  return false;
3338  }
3339  bool has_headers = false;
3340  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3341  if (head_types[col_idx] != kTEXT) {
3342  return false;
3343  }
3344  has_headers = has_headers || tail_types[col_idx] != kTEXT;
3345  }
3346  return has_headers;
3347 }
3348 
3349 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3350  n = std::min(n, raw_rows.size());
3351  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3352  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3353  raw_rows.begin() + n);
3354  return sample_rows;
3355 }
3356 
3357 std::vector<std::string> Detector::get_headers() {
3358  std::vector<std::string> headers(best_sqltypes.size());
3359  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3360  if (has_headers && i < raw_rows[0].size()) {
3361  headers[i] = raw_rows[0][i];
3362  } else {
3363  headers[i] = "column_" + std::to_string(i + 1);
3364  }
3365  }
3366  return headers;
3367 }
3368 
3369 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3370  size_t row_count,
3371  const Catalog_Namespace::SessionInfo* session_info) {
3372  if (!loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3373  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3374  import_status_.load_failed = true;
3375  import_status_.load_msg = loader->getErrorMessage();
3376  }
3377 }
3378 
3380  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3381  if (loader->getTableDesc()->storageType != StorageType::FOREIGN_TABLE) {
3382  mapd_lock_guard<mapd_shared_mutex> read_lock(import_mutex_);
3384  // rollback to starting epoch - undo all the added records
3385  loader->setTableEpochs(table_epochs);
3386  } else {
3387  loader->checkpoint();
3388  }
3389  }
3390 
3391  if (loader->getTableDesc()->persistenceLevel ==
3392  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
3393  // tables
3394  auto ms = measure<>::execution([&]() {
3395  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3396  if (!import_status_.load_failed) {
3397  for (auto& p : import_buffers_vec[0]) {
3398  if (!p->stringDictCheckpoint()) {
3399  LOG(ERROR) << "Checkpointing Dictionary for Column "
3400  << p->getColumnDesc()->columnName << " failed.";
3401  import_status_.load_failed = true;
3402  import_status_.load_msg = "Dictionary checkpoint failed";
3403  break;
3404  }
3405  }
3406  }
3407  });
3408  if (DEBUG_TIMING) {
3409  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3410  << std::endl;
3411  }
3412  }
3413 }
3414 
3416  const Catalog_Namespace::SessionInfo* session_info) {
3417  // in generalized importing scheme, reaching here file_path may
3418  // contain a file path, a url or a wildcard of file paths.
3419  // see CopyTableStmt::execute.
3420  auto file_paths = omnisci::glob(file_path);
3421  if (file_paths.size() == 0) {
3422  file_paths.push_back(file_path);
3423  }
3424 
3425  // sum up sizes of all local files -- only for local files. if
3426  // file_path is a s3 url, sizes will be obtained via S3Archive.
3427  for (const auto& file_path : file_paths) {
3429  }
3430 
3431 #ifdef ENABLE_IMPORT_PARQUET
3432  // s3 parquet goes different route because the files do not use libarchive
3433  // but parquet api, and they need to landed like .7z files.
3434  //
3435  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3436  // because for example spark sql users may specify a output url w/o file
3437  // extension like this:
3438  // df.write
3439  // .mode("overwrite")
3440  // .parquet("s3://bucket/folder/parquet/mydata")
3441  // without the parameter, it means plain or compressed csv files.
3442  // note: .ORC and AVRO files should follow a similar path to Parquet?
3443  if (copy_params.file_type == FileType::PARQUET) {
3444  import_parquet(file_paths, session_info);
3445  } else
3446 #endif
3447  {
3448  import_compressed(file_paths, session_info);
3449  }
3450 
3451  return import_status_;
3452 }
3453 
3454 #ifdef ENABLE_IMPORT_PARQUET
3455 inline auto open_parquet_table(const std::string& file_path,
3456  std::shared_ptr<arrow::io::ReadableFile>& infile,
3457  std::unique_ptr<parquet::arrow::FileReader>& reader,
3458  std::shared_ptr<arrow::Table>& table) {
3459  using namespace parquet::arrow;
3460  auto file_result = arrow::io::ReadableFile::Open(file_path);
3461  PARQUET_THROW_NOT_OK(file_result.status());
3462  infile = file_result.ValueOrDie();
3463 
3464  PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3465  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3466  const auto num_row_groups = reader->num_row_groups();
3467  const auto num_columns = table->num_columns();
3468  const auto num_rows = table->num_rows();
3469  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3470  << " columns in " << num_row_groups << " groups.";
3471  return std::make_tuple(num_row_groups, num_columns, num_rows);
3472 }
3473 
3474 void Detector::import_local_parquet(const std::string& file_path,
3475  const Catalog_Namespace::SessionInfo* session_info) {
3476  /*Skip interrupt checking in detector*/
3477  std::shared_ptr<arrow::io::ReadableFile> infile;
3478  std::unique_ptr<parquet::arrow::FileReader> reader;
3479  std::shared_ptr<arrow::Table> table;
3480  int num_row_groups, num_columns;
3481  int64_t num_rows;
3482  std::tie(num_row_groups, num_columns, num_rows) =
3483  open_parquet_table(file_path, infile, reader, table);
3484  // make up header line if not yet
3485  if (0 == raw_data.size()) {
3487  copy_params.line_delim = '\n';
3488  copy_params.delimiter = ',';
3489  // must quote values to skip any embedded delimiter
3490  copy_params.quoted = true;
3491  copy_params.quote = '"';
3492  copy_params.escape = '"';
3493  for (int c = 0; c < num_columns; ++c) {
3494  if (c) {
3496  }
3497  raw_data += table->ColumnNames().at(c);
3498  }
3500  }
3501  // make up raw data... rowwize...
3502  const ColumnDescriptor cd;
3503  for (int g = 0; g < num_row_groups; ++g) {
3504  // data is columnwise
3505  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3506  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3507  arrays.resize(num_columns);
3508  for (int c = 0; c < num_columns; ++c) {
3509  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3510  for (auto chunk : arrays[c]->chunks()) {
3511  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3512  }
3513  }
3514  for (int r = 0; r < num_rows; ++r) {
3515  for (int c = 0; c < num_columns; ++c) {
3516  std::vector<std::string> buffer;
3517  for (auto chunk : arrays[c]->chunks()) {
3518  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3519  if (c) {
3521  }
3522  if (!chunk->IsNull(r)) {
3524  raw_data += boost::replace_all_copy(
3525  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3527  }
3528  }
3529  }
3531  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3532  if (++import_status_.rows_completed >= 10000) {
3533  // as if load truncated
3534  import_status_.load_failed = true;
3535  import_status_.load_msg = "Detector processed 10000 records";
3536  return;
3537  }
3538  }
3539  }
3540 }
3541 
3542 template <typename DATA_TYPE>
3544  std::vector<DATA_TYPE>& buffer,
3545  import_export::BadRowsTracker* const bad_rows_tracker) {
3546  const auto old_size = buffer.size();
3547  // erase backward to minimize memory movement overhead
3548  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3549  ++rit) {
3550  buffer.erase(buffer.begin() + *rit);
3551  }
3552  return std::make_tuple(old_size, buffer.size());
3553 }
3554 
3556  BadRowsTracker* const bad_rows_tracker) {
3557  switch (type) {
3558  case kBOOLEAN:
3559  return del_values(*bool_buffer_, bad_rows_tracker);
3560  case kTINYINT:
3561  return del_values(*tinyint_buffer_, bad_rows_tracker);
3562  case kSMALLINT:
3563  return del_values(*smallint_buffer_, bad_rows_tracker);
3564  case kINT:
3565  return del_values(*int_buffer_, bad_rows_tracker);
3566  case kBIGINT:
3567  case kNUMERIC:
3568  case kDECIMAL:
3569  case kTIME:
3570  case kTIMESTAMP:
3571  case kDATE:
3572  return del_values(*bigint_buffer_, bad_rows_tracker);
3573  case kFLOAT:
3574  return del_values(*float_buffer_, bad_rows_tracker);
3575  case kDOUBLE:
3576  return del_values(*double_buffer_, bad_rows_tracker);
3577  case kTEXT:
3578  case kVARCHAR:
3579  case kCHAR:
3580  return del_values(*string_buffer_, bad_rows_tracker);
3581  case kPOINT:
3582  case kLINESTRING:
3583  case kPOLYGON:
3584  case kMULTIPOLYGON:
3585  return del_values(*geo_string_buffer_, bad_rows_tracker);
3586  case kARRAY:
3587  return del_values(*array_buffer_, bad_rows_tracker);
3588  default:
3589  throw std::runtime_error("Invalid Type");
3590  }
3591 }
3592 
3593 void Importer::import_local_parquet(const std::string& file_path,
3594  const Catalog_Namespace::SessionInfo* session_info) {
3595  std::shared_ptr<arrow::io::ReadableFile> infile;
3596  std::unique_ptr<parquet::arrow::FileReader> reader;
3597  std::shared_ptr<arrow::Table> table;
3598  int num_row_groups, num_columns;
3599  int64_t nrow_in_file;
3600  std::tie(num_row_groups, num_columns, nrow_in_file) =
3601  open_parquet_table(file_path, infile, reader, table);
3602  // column_list has no $deleted
3603  const auto& column_list = get_column_descs();
3604  // for now geo columns expect a wkt or wkb hex string
3605  std::vector<const ColumnDescriptor*> cds;
3606  Executor* executor = Executor::getExecutor(Executor::UNITARY_EXECUTOR_ID).get();
3607  int num_physical_cols = 0;
3608  for (auto& cd : column_list) {
3609  cds.push_back(cd);
3610  num_physical_cols += cd->columnType.get_physical_cols();
3611  }
3612  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3613  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3614  std::to_string(num_columns) + " columns in file vs " +
3615  std::to_string(column_list.size() - num_physical_cols) +
3616  " columns in table.");
3617  // slice each group to import slower columns faster, eg. geo or string
3620  : std::min(static_cast<size_t>(cpu_threads()), g_max_import_threads);
3621  VLOG(1) << "Parquet import # threads: " << max_threads;
3622  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3623  // init row estimate for this file
3624  const auto filesize = get_filesize(file_path);
3625  size_t nrow_completed{0};
3626  file_offsets.push_back(0);
3627  // map logic column index to physical column index
3628  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3629  int physical_col_idx = 0;
3630  for (int i = 0; i < logic_col_idx; ++i) {
3631  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3632  }
3633  return physical_col_idx;
3634  };
3635  // load a file = nested iteration of row groups, row slices and logical columns
3636  auto query_session = session_info ? session_info->get_session_id() : "";
3637  auto ms_load_a_file = measure<>::execution([&]() {
3638  for (int row_group = 0; row_group < num_row_groups; ++row_group) {
3639  {
3640  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
3642  break;
3643  }
3644  }
3645  // a sliced row group will be handled like a (logic) parquet file, with
3646  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3647  if (UNLIKELY(checkInterrupt(query_session, executor))) {
3648  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3649  import_status_.load_failed = true;
3650  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3651  }
3652  import_buffers_vec.resize(num_slices);
3653  for (int slice = 0; slice < num_slices; slice++) {
3654  import_buffers_vec[slice].clear();
3655  for (const auto cd : cds) {
3656  import_buffers_vec[slice].emplace_back(
3657  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3658  }
3659  }
3660  /*
3661  * A caveat here is: Parquet files or arrow data is imported column wise.
3662  * Unlike importing row-wise csv files, a error on any row of any column
3663  * forces to give up entire row group of all columns, unless there is a
3664  * sophisticated method to trace erroneous rows in individual columns so
3665  * that we can union bad rows and drop them from corresponding
3666  * import_buffers_vec; otherwise, we may exceed maximum number of
3667  * truncated rows easily even with very sparse errors in the files.
3668  */
3669  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3670  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3671  auto& bad_rows_tracker = bad_rows_trackers[slice];
3672  bad_rows_tracker.file_name = file_path;
3673  bad_rows_tracker.row_group = slice;
3674  bad_rows_tracker.importer = this;
3675  }
3676  // process arrow arrays to import buffers
3677  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3678  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3679  const auto cd = cds[physical_col_idx];
3680  std::shared_ptr<arrow::ChunkedArray> array;
3681  PARQUET_THROW_NOT_OK(
3682  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3683  const size_t array_size = array->length();
3684  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3685  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3686  for (int slice = 0; slice < num_slices; ++slice) {
3687  if (UNLIKELY(checkInterrupt(query_session, executor))) {
3688  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3689  import_status_.load_failed = true;
3690  import_status_.load_msg = "Table load was cancelled via Query Interrupt";
3691  }
3692  thread_controller.startThread([&, slice] {
3693  const auto slice_offset = slice % num_slices;
3694  ArraySliceRange slice_range(
3695  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3696  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3697  auto& bad_rows_tracker = bad_rows_trackers[slice];
3698  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3699  import_buffer->import_buffers = &import_buffers_vec[slice];
3700  import_buffer->col_idx = physical_col_idx + 1;
3701  for (auto chunk : array->chunks()) {
3702  import_buffer->add_arrow_values(
3703  cd, *chunk, false, slice_range, &bad_rows_tracker);
3704  }
3705  });
3706  }
3707  thread_controller.finish();
3708  }
3709  std::vector<size_t> nrow_in_slice_raw(num_slices);
3710  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3711  // trim bad rows from import buffers
3712  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3713  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3714  const auto cd = cds[physical_col_idx];
3715  for (int slice = 0; slice < num_slices; ++slice) {
3716  auto& bad_rows_tracker = bad_rows_trackers[slice];
3717  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3718  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3719  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3720  }
3721  }
3722  // flush slices of this row group to chunks
3723  for (int slice = 0; slice < num_slices; ++slice) {
3724  load(import_buffers_vec[slice],
3725  nrow_in_slice_successfully_loaded[slice],
3726  session_info);
3727  }
3728  // update import stats
3729  const auto nrow_original =
3730  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3731  const auto nrow_imported =
3732  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3733  nrow_in_slice_successfully_loaded.end(),
3734  0);
3735  const auto nrow_dropped = nrow_original - nrow_imported;
3736  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3737  << " rows, drop " << nrow_dropped << " rows.";
3738  {
3739  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3740  import_status_.rows_completed += nrow_imported;
3741  import_status_.rows_rejected += nrow_dropped;
3743  import_status_.load_failed = true;
3745  ") rows rejected exceeded. Halting load.";
3746  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3747  << ") rows rejected exceeded. Halting load.";
3748  }
3749  }
3750  // row estimate
3751  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3752  nrow_completed += nrow_imported;
3753  file_offsets.back() =
3754  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3755  // sum up current total file offsets
3756  const auto total_file_offset =
3757  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3758  // estimate number of rows per current total file offset
3759  if (total_file_offset) {
3761  (float)total_file_size / total_file_offset * import_status_.rows_completed;
3762  VLOG(3) << "rows_completed " << import_status_.rows_completed
3763  << ", rows_estimated " << import_status_.rows_estimated
3764  << ", total_file_size " << total_file_size << ", total_file_offset "
3765  << total_file_offset;
3766  }
3767  }
3768  });
3769  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3770  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3771 }
3772 
3773 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
3774  const Catalog_Namespace::SessionInfo* session_info) {
3775  auto importer = dynamic_cast<Importer*>(this);
3776  auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
3777  : std::vector<Catalog_Namespace::TableEpochInfo>{};
3778  try {
3779  std::exception_ptr teptr;
3780  // file_paths may contain one local file path, a list of local file paths
3781  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3782  for (auto const& file_path : file_paths) {
3783  std::map<int, std::string> url_parts;
3784  Archive::parse_url(file_path, url_parts);
3785 
3786  // for a s3 url we need to know the obj keys that it comprises
3787  std::vector<std::string> objkeys;
3788  std::unique_ptr<S3ParquetArchive> us3arch;
3789  if ("s3" == url_parts[2]) {
3790 #ifdef HAVE_AWS_S3
3791  us3arch.reset(new S3ParquetArchive(file_path,
3798  us3arch->init_for_read();
3799  total_file_size += us3arch->get_total_file_size();
3800  objkeys = us3arch->get_objkeys();
3801 #else
3802  throw std::runtime_error("AWS S3 support not available");
3803 #endif // HAVE_AWS_S3
3804  } else {
3805  objkeys.emplace_back(file_path);
3806  }
3807 
3808  // for each obj key of a s3 url we need to land it before
3809  // importing it like doing with a 'local file'.
3810  for (auto const& objkey : objkeys) {
3811  try {
3812  auto file_path =
3813  us3arch
3814  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3815  : objkey;
3816  import_local_parquet(file_path, session_info);
3817  if (us3arch) {
3818  us3arch->vacuum(objkey);
3819  }
3820  } catch (...) {
3821  if (us3arch) {
3822  us3arch->vacuum(objkey);
3823  }
3824  throw;
3825  }
3826  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
3828  break;
3829  }
3830  }
3831  }
3832  // rethrow any exception happened herebefore
3833  if (teptr) {
3834  std::rethrow_exception(teptr);
3835  }
3836  } catch (const std::exception& e) {
3837  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
3838  import_status_.load_failed = true;
3839  import_status_.load_msg = e.what();
3840  }
3841 
3842  if (importer) {
3843  importer->checkpoint(table_epochs);
3844  }
3845 }
3846 #endif // ENABLE_IMPORT_PARQUET
3847 
3849  std::vector<std::string>& file_paths,
3850  const Catalog_Namespace::SessionInfo* session_info) {
3851  // a new requirement is to have one single input stream into
3852  // Importer::importDelimited, so need to move pipe related
3853  // stuff to the outmost block.
3854  int fd[2];
3855 #ifdef _WIN32
3856  // For some reason when folly is used to create the pipe, reader can
3857  // read nothing.
3858  auto pipe_res =
3859  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
3860 #else
3861  auto pipe_res = pipe(fd);
3862 #endif
3863  if (pipe_res < 0) {
3864  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3865  }
3866 #ifndef _WIN32
3867  signal(SIGPIPE, SIG_IGN);
3868 #endif
3869 
3870  std::exception_ptr teptr;
3871  // create a thread to read uncompressed byte stream out of pipe and
3872  // feed into importDelimited()
3873  ImportStatus ret1;
3874  auto th_pipe_reader = std::thread([&]() {
3875  try {
3876  // importDelimited will read from FILE* p_file
3877  if (0 == (p_file = fdopen(fd[0], "r"))) {
3878  throw std::runtime_error(std::string("failed to open a pipe: ") +
3879  strerror(errno));
3880  }
3881 
3882  // in future, depending on data types of this uncompressed stream
3883  // it can be feed into other function such like importParquet, etc
3884  ret1 = importDelimited(file_path, true, session_info);
3885 
3886  } catch (...) {
3887  if (!teptr) { // no replace
3888  teptr = std::current_exception();
3889  }
3890  }
3891 
3892  if (p_file) {
3893  fclose(p_file);
3894  }
3895  p_file = 0;
3896  });
3897 
3898  // create a thread to iterate all files (in all archives) and
3899  // forward the uncompressed byte stream to fd[1] which is
3900  // then feed into importDelimited, importParquet, and etc.
3901  auto th_pipe_writer = std::thread([&]() {
3902  std::unique_ptr<S3Archive> us3arch;
3903  bool stop = false;
3904  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3905  try {
3906  auto file_path = file_paths[fi];
3907  std::unique_ptr<Archive> uarch;
3908  std::map<int, std::string> url_parts;
3909  Archive::parse_url(file_path, url_parts);
3910  const std::string S3_objkey_url_scheme = "s3ok";
3911  if ("file" == url_parts[2] || "" == url_parts[2]) {
3912  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3913  } else if ("s3" == url_parts[2]) {
3914 #ifdef HAVE_AWS_S3
3915  // new a S3Archive with a shared s3client.
3916  // should be safe b/c no wildcard with s3 url
3917  us3arch.reset(new S3Archive(file_path,
3924  us3arch->init_for_read();
3925  total_file_size += us3arch->get_total_file_size();
3926  // not land all files here but one by one in following iterations
3927  for (const auto& objkey : us3arch->get_objkeys()) {
3928  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3929  }
3930  continue;
3931 #else
3932  throw std::runtime_error("AWS S3 support not available");
3933 #endif // HAVE_AWS_S3
3934  } else if (S3_objkey_url_scheme == url_parts[2]) {
3935 #ifdef HAVE_AWS_S3
3936  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3937  auto file_path =
3938  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3939  if (0 == file_path.size()) {
3940  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3941  }
3942  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3943  // file not removed until file closed
3944  us3arch->vacuum(objkey);
3945 #else
3946  throw std::runtime_error("AWS S3 support not available");
3947 #endif // HAVE_AWS_S3
3948  }
3949 #if 0 // TODO(ppan): implement and enable any other archive class
3950  else
3951  if ("hdfs" == url_parts[2])
3952  uarch.reset(new HdfsArchive(file_path));
3953 #endif
3954  else {
3955  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3956  }
3957 
3958  // init the archive for read
3959  auto& arch = *uarch;
3960 
3961  // coming here, the archive of url should be ready to be read, unarchived
3962  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3963  const void* buf;
3964  size_t size;
3965  bool just_saw_archive_header;
3966  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3967  bool first_text_header_skipped = false;
3968  // start reading uncompressed bytes of this archive from libarchive
3969  // note! this archive may contain more than one files!
3970  file_offsets.push_back(0);
3971  size_t num_block_read = 0;
3972  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3973  bool insert_line_delim_after_this_file = false;
3974  while (!stop) {
3975  int64_t offset{-1};
3976  auto ok = arch.read_data_block(&buf, &size, &offset);
3977  // can't use (uncompressed) size, so track (max) file offset.
3978  // also we want to capture offset even on e.o.f.
3979  if (offset > 0) {
3980  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3981  file_offsets.back() = offset;
3982  }
3983  if (!ok) {
3984  break;
3985  }
3986  // one subtle point here is now we concatenate all files
3987  // to a single FILE stream with which we call importDelimited
3988  // only once. this would make it misunderstand that only one
3989  // header line is with this 'single' stream, while actually
3990  // we may have one header line for each of the files.
3991  // so we need to skip header lines here instead in importDelimited.
3992  const char* buf2 = (const char*)buf;
3993  int size2 = size;
3995  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3996  while (size2-- > 0) {
3997  if (*buf2++ == copy_params.line_delim) {
3998  break;
3999  }
4000  }
4001  if (size2 <= 0) {
4002  LOG(WARNING) << "No line delimiter in block." << std::endl;
4003  } else {
4004  just_saw_archive_header = false;
4005  first_text_header_skipped = true;
4006  }
4007  }
4008  // In very rare occasions the write pipe somehow operates in a mode similar
4009  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4010  // means blocking mode. On such a unreliable blocking mode, a possible fix
4011  // is to loop reading till no bytes left, otherwise the annoying `failed to
4012  // write pipe: Success`...
4013  if (size2 > 0) {
4014  int nremaining = size2;
4015  while (nremaining > 0) {
4016  // try to write the entire remainder of the buffer to the pipe
4017  int nwritten = write(fd[1], buf2, nremaining);
4018  // how did we do?
4019  if (nwritten < 0) {
4020  // something bad happened
4021  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4022  // ignore these, assume nothing written, try again
4023  nwritten = 0;
4024  } else {
4025  // a real error
4026  throw std::runtime_error(
4027  std::string("failed or interrupted write to pipe: ") +
4028  strerror(errno));
4029  }
4030  } else if (nwritten == nremaining) {
4031  // we wrote everything; we're done
4032  break;
4033  }
4034  // only wrote some (or nothing), try again
4035  nremaining -= nwritten;
4036  buf2 += nwritten;
4037  // no exception when too many rejected
4038  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4040  stop = true;
4041  break;
4042  }
4043  }
4044  // check that this file (buf for size) ended with a line delim
4045  if (size > 0) {
4046  const char* plast = static_cast<const char*>(buf) + (size - 1);
4047  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4048  }
4049  }
4050  ++num_block_read;
4051  }
4052 
4053  // if that file didn't end with a line delim, we insert one here to terminate
4054  // that file's stream use a loop for the same reason as above
4055  if (insert_line_delim_after_this_file) {
4056  while (true) {
4057  // write the delim char to the pipe
4058  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4059  // how did we do?
4060  if (nwritten < 0) {
4061  // something bad happened
4062  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4063  // ignore these, assume nothing written, try again
4064  nwritten = 0;
4065  } else {
4066  // a real error
4067  throw std::runtime_error(
4068  std::string("failed or interrupted write to pipe: ") +
4069  strerror(errno));
4070  }
4071  } else if (nwritten == 1) {
4072  // we wrote it; we're done
4073  break;
4074  }
4075  }
4076  }
4077  }
4078  } catch (...) {
4079  // when import is aborted because too many data errors or because end of a
4080  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4081  // suppressed.
4082  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4084  break;
4085  }
4086  if (import_status_.rows_completed > 0) {
4087  if (nullptr != dynamic_cast<Detector*>(this)) {
4088  break;
4089  }
4090  }
4091  if (!teptr) { // no replace
4092  teptr = std::current_exception();
4093  }
4094  break;
4095  }
4096  }
4097  // close writer end
4098  close(fd[1]);
4099  });
4100 
4101  th_pipe_reader.join();
4102  th_pipe_writer.join();
4103 
4104  // rethrow any exception happened herebefore
4105  if (teptr) {
4106  std::rethrow_exception(teptr);
4107  }
4108 }
4109 
4111  return DataStreamSink::archivePlumber(session_info);
4112 }
4113 
4115  const std::string& file_path,
4116  const bool decompressed,
4117  const Catalog_Namespace::SessionInfo* session_info) {
4119  auto query_session = session_info ? session_info->get_session_id() : "";
4120 
4121  if (!p_file) {
4122  p_file = fopen(file_path.c_str(), "rb");
4123  }
4124  if (!p_file) {
4125  throw std::runtime_error("failed to open file '" + file_path +
4126  "': " + strerror(errno));
4127  }
4128 
4129  if (!decompressed) {
4130  (void)fseek(p_file, 0, SEEK_END);
4131  file_size = ftell(p_file);
4132  }
4133 
4134  if (copy_params.threads == 0) {
4135  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4137  } else {
4138  max_threads = static_cast<size_t>(copy_params.threads);
4139  }
4140  VLOG(1) << "Delimited import # threads: " << max_threads;
4141 
4142  // deal with small files
4143  size_t alloc_size = copy_params.buffer_size;
4144  if (!decompressed && file_size < alloc_size) {
4145  alloc_size = file_size;
4146  }
4147 
4148  for (size_t i = 0; i < max_threads; i++) {
4149  import_buffers_vec.emplace_back();
4150  for (const auto cd : loader->get_column_descs()) {
4151  import_buffers_vec[i].emplace_back(
4152  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
4153  }
4154  }
4155 
4156  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4157  size_t current_pos = 0;
4158  size_t end_pos;
4159  size_t begin_pos = 0;
4160 
4161  (void)fseek(p_file, current_pos, SEEK_SET);
4162  size_t size =
4163  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
4164 
4165  // make render group analyzers for each poly column
4166  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4168  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4169  loader->getTableDesc()->tableId, false, false, false);
4170  for (auto cd : columnDescriptors) {
4171  SQLTypes ct = cd->columnType.get_type();
4172  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4173  auto rga = std::make_shared<RenderGroupAnalyzer>();
4174  rga->seedFromExistingTableContents(loader, cd->columnName);
4175  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4176  }
4177  }
4178  }
4179 
4180  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
4181  loader->getTableDesc()->tableId};
4182  auto table_epochs = loader->getTableEpochs();
4184  {
4185  std::list<std::future<ImportStatus>> threads;
4186 
4187  // use a stack to track thread_ids which must not overlap among threads
4188  // because thread_id is used to index import_buffers_vec[]
4189  std::stack<size_t> stack_thread_ids;
4190  for (size_t i = 0; i < max_threads; i++) {
4191  stack_thread_ids.push(i);
4192  }
4193  // added for true row index on error
4194  size_t first_row_index_this_buffer = 0;
4195 
4196  while (size > 0) {
4197  unsigned int num_rows_this_buffer = 0;
4198  CHECK(scratch_buffer);
4199  end_pos = delimited_parser::find_row_end_pos(alloc_size,
4200  scratch_buffer,
4201  size,
4202  copy_params,
4203  first_row_index_this_buffer,
4204  num_rows_this_buffer,
4205  p_file);
4206 
4207  // unput residual
4208  int nresidual = size - end_pos;
4209  std::unique_ptr<char[]> unbuf;
4210  if (nresidual > 0) {
4211  unbuf = std::make_unique<char[]>(nresidual);
4212  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4213  }
4214 
4215  // get a thread_id not in use
4216  auto thread_id = stack_thread_ids.top();
4217  stack_thread_ids.pop();
4218  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
4219 
4220  threads.push_back(std::async(std::launch::async,
4222  thread_id,
4223  this,
4224  std::move(scratch_buffer),
4225  begin_pos,
4226  end_pos,
4227  end_pos,
4228  columnIdToRenderGroupAnalyzerMap,
4229  first_row_index_this_buffer,
4230  session_info,
4231  executor));
4232 
4233  first_row_index_this_buffer += num_rows_this_buffer;
4234 
4235  current_pos += end_pos;
4236  scratch_buffer = std::make_unique<char[]>(alloc_size);
4237  CHECK(scratch_buffer);
4238  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4239  size = nresidual +
4240  fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual, p_file);
4241 
4242  begin_pos = 0;
4243  while (threads.size() > 0) {
4244  int nready = 0;
4245  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4246  it != threads.end();) {
4247  auto& p = *it;
4248  std::chrono::milliseconds span(0);
4249  if (p.wait_for(span) == std::future_status::ready) {
4250  auto ret_import_status = p.get();
4251  {
4252  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
4253  import_status_ += ret_import_status;
4254  if (ret_import_status.load_failed) {
4256  }
4257  }
4258  // sum up current total file offsets
4259  size_t total_file_offset{0};
4260  if (decompressed) {
4261  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4262  for (const auto file_offset : file_offsets) {
4263  total_file_offset += file_offset;
4264  }
4265  }
4266  // estimate number of rows per current total file offset
4267  if (decompressed ? total_file_offset : current_pos) {
4269  (decompressed ? (float)total_file_size / total_file_offset
4270  : (float)file_size / current_pos) *
4272  }
4273  VLOG(3) << "rows_completed " << import_status_.rows_completed
4274  << ", rows_estimated " << import_status_.rows_estimated
4275  << ", total_file_size " << total_file_size << ", total_file_offset "
4276  << total_file_offset;
4278  // recall thread_id for reuse
4279  stack_thread_ids.push(ret_import_status.thread_id);
4280  threads.erase(it++);
4281  ++nready;
4282  } else {
4283  ++it;
4284  }
4285  }
4286 
4287  if (nready == 0) {
4288  std::this_thread::yield();
4289  }
4290 
4291  // on eof, wait all threads to finish
4292  if (0 == size) {
4293  continue;
4294  }
4295 
4296  // keep reading if any free thread slot
4297  // this is one of the major difference from old threading model !!
4298  if (threads.size() < max_threads) {
4299  break;
4300  }
4301  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4303  break;
4304  }
4305  }
4306  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
4308  import_status_.load_failed = true;
4309  // todo use better message
4310  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
4311  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4312  break;
4313  }
4315  LOG(ERROR) << "Load failed, the issue was: " + import_status_.load_msg;
4316  break;
4317  }
4318  }
4319 
4320  // join dangling threads in case of LOG(ERROR) above
4321  for (auto& p : threads) {
4322  p.wait();
4323  }
4324  }
4325 
4326  checkpoint(table_epochs);
4327 
4328  fclose(p_file);
4329  p_file = nullptr;
4330  return import_status_;
4331 }
4332 
4334  if (getTableDesc()->persistenceLevel ==
4335  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident
4336  // tables
4338  }
4339 }
4340 
4341 std::vector<Catalog_Namespace::TableEpochInfo> Loader::getTableEpochs() const {
4342  return getCatalog().getTableEpochs(getCatalog().getCurrentDB().dbId,
4343  getTableDesc()->tableId);
4344 }
4345 
4347  const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4348  getCatalog().setTableEpochs(getCatalog().getCurrentDB().dbId, table_epochs);
4349 }
4350 
4351 /* static */
4353  // for now we only support S3
4354  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4355  // only set if non-empty to allow GDAL defaults to persist
4356  // explicitly clear if empty to revert to default and not reuse a previous session's
4357  // keys
4358  if (copy_params.s3_region.size()) {
4359 #if DEBUG_AWS_AUTHENTICATION
4360  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4361 #endif
4362  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4363  } else {
4364 #if DEBUG_AWS_AUTHENTICATION
4365  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4366 #endif
4367  CPLSetConfigOption("AWS_REGION", nullptr);
4368  }
4369  if (copy_params.s3_endpoint.size()) {
4370 #if DEBUG_AWS_AUTHENTICATION
4371  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4372 #endif
4373  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4374  } else {
4375 #if DEBUG_AWS_AUTHENTICATION
4376  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4377 #endif
4378  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4379  }
4380  if (copy_params.s3_access_key.size()) {
4381 #if DEBUG_AWS_AUTHENTICATION
4382  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4383  << "'";
4384 #endif
4385  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4386  } else {
4387 #if DEBUG_AWS_AUTHENTICATION
4388  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4389 #endif
4390  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4391  }
4392  if (copy_params.s3_secret_key.size()) {
4393 #if DEBUG_AWS_AUTHENTICATION
4394  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4395  << "'";
4396 #endif
4397  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4398  } else {
4399 #if DEBUG_AWS_AUTHENTICATION
4400  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4401 #endif
4402  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4403  }
4404 
4405 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4406  // if we haven't set keys, we need to disable signed access
4407  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4408 #if DEBUG_AWS_AUTHENTICATION
4409  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4410 #endif
4411  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4412  } else {
4413 #if DEBUG_AWS_AUTHENTICATION
4414  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4415 #endif
4416  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4417  }
4418 #endif
4419 }
4420 
4421 /* static */
4422 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4423  const CopyParams& copy_params) {
4424  // lazy init GDAL
4426 
4427  // set authorization tokens
4428  setGDALAuthorizationTokens(copy_params);
4429 
4430  // open the file
4431  OGRDataSource* poDS;
4432 #if GDAL_VERSION_MAJOR == 1
4433  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4434 #else
4435  poDS = (OGRDataSource*)GDALOpenEx(
4436  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4437  if (poDS == nullptr) {
4438  poDS = (OGRDataSource*)GDALOpenEx(
4439  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4440  if (poDS) {
4441  LOG(INFO) << "openGDALDataset had to open as read-only";
4442  }
4443  }
4444 #endif
4445  if (poDS == nullptr) {
4446  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4447  }
4448  // NOTE(adb): If extending this function, refactor to ensure any errors will not
4449  // result in a memory leak if GDAL successfully opened the input dataset.
4450  return poDS;
4451 }
4452 
4453 namespace {
4454 
4455 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4456  const OGRDataSourceUqPtr& poDS,
4457  const std::string& file_name) {
4458  // get layer with specified name, or default to first layer
4459  OGRLayer* poLayer = nullptr;
4460  if (geo_layer_name.size()) {
4461  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4462  if (poLayer == nullptr) {
4463  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4464  file_name);
4465  }
4466  } else {
4467  poLayer = poDS->GetLayer(0);
4468  if (poLayer == nullptr) {
4469  throw std::runtime_error("No layers found in " + file_name);
4470  }
4471  }
4472  return *poLayer;
4473 }
4474 
4475 } // namespace
4476 
4477 /* static */
4479  const std::string& file_name,
4480  const std::string& geo_column_name,
4481  std::map<std::string, std::vector<std::string>>& metadata,
4482  int rowLimit,
4483  const CopyParams& copy_params) {
4484  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4485  if (poDS == nullptr) {
4486  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4487  file_name);
4488  }
4489 
4490  OGRLayer& layer =
4491  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4492 
4493  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4494  CHECK(poFDefn);
4495 
4496  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4497  auto nFeats = layer.GetFeatureCount();
4498  size_t numFeatures =
4499  std::max(static_cast<decltype(nFeats)>(0),
4500  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4501  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4502  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4503  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4504  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4505  }
4506  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4507  layer.ResetReading();
4508  size_t iFeature = 0;
4509  while (iFeature < numFeatures) {
4510  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4511  if (!poFeature) {
4512  break;
4513  }
4514 
4515  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4516  if (poGeometry != nullptr) {
4517  // validate geom type (again?)
4518  switch (wkbFlatten(poGeometry->getGeometryType())) {
4519  case wkbPoint:
4520  case wkbLineString:
4521  case wkbPolygon:
4522  case wkbMultiPolygon:
4523  break;
4524  case wkbMultiPoint:
4525  case wkbMultiLineString:
4526  // supported if geo_explode_collections is specified
4527  if (!copy_params.geo_explode_collections) {
4528  throw std::runtime_error("Unsupported geometry type: " +
4529  std::string(poGeometry->getGeometryName()));
4530  }
4531  break;
4532  default:
4533  throw std::runtime_error("Unsupported geometry type: " +
4534  std::string(poGeometry->getGeometryName()));
4535  }
4536 
4537  // populate metadata for regular fields
4538  for (auto i : metadata) {
4539  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4540  if (iField >= 0) { // geom is -1
4541  metadata[i.first].at(iFeature) =
4542  std::string(poFeature->GetFieldAsString(iField));
4543  }
4544  }
4545 
4546  // populate metadata for geo column with WKT string
4547  char* wkts = nullptr;
4548  poGeometry->exportToWkt(&wkts);
4549  CHECK(wkts);
4550  metadata[geo_column_name].at(iFeature) = wkts;
4551  CPLFree(wkts);
4552  }
4553  iFeature++;
4554  }
4555 }
4556 
4557 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4558  switch (ogr_type) {
4559  case OFTInteger:
4560  return std::make_pair(kINT, false);
4561  case OFTIntegerList:
4562  return std::make_pair(kINT, true);
4563 #if GDAL_VERSION_MAJOR > 1
4564  case OFTInteger64:
4565  return std::make_pair(kBIGINT, false);
4566  case OFTInteger64List:
4567  return std::make_pair(kBIGINT, true);
4568 #endif
4569  case OFTReal:
4570  return std::make_pair(kDOUBLE, false);
4571  case OFTRealList:
4572  return std::make_pair(kDOUBLE, true);
4573  case OFTString:
4574  return std::make_pair(kTEXT, false);
4575  case OFTStringList:
4576  return std::make_pair(kTEXT, true);
4577  case OFTDate:
4578  return std::make_pair(kDATE, false);
4579  case OFTTime:
4580  return std::make_pair(kTIME, false);
4581  case OFTDateTime:
4582  return std::make_pair(kTIMESTAMP, false);
4583  case OFTBinary:
4584  // Interpret binary blobs as byte arrays here
4585  // but actual import will store NULL as GDAL will not
4586  // extract the blob (OGRFeature::GetFieldAsString will
4587  // result in the import buffers having an empty string)
4588  return std::make_pair(kTINYINT, true);
4589  default:
4590  break;
4591  }
4592  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4593 }
4594 
4595 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4596  switch (ogr_type) {
4597  case wkbPoint:
4598  return kPOINT;
4599  case wkbLineString:
4600  return kLINESTRING;
4601  case wkbPolygon:
4602  return kPOLYGON;
4603  case wkbMultiPolygon:
4604  return kMULTIPOLYGON;
4605  default:
4606  break;
4607  }
4608  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4609 }
4610 
4611 /* static */
4612 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4613  const std::string& file_name,
4614  const std::string& geo_column_name,
4615  const CopyParams& copy_params) {
4616  std::list<ColumnDescriptor> cds;
4617 
4618  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4619  if (poDS == nullptr) {
4620  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4621  file_name);
4622  }
4623 
4624  OGRLayer& layer =
4625  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4626 
4627  layer.ResetReading();
4628  // TODO(andrewseidl): support multiple features
4629  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4630  if (poFeature == nullptr) {
4631  throw std::runtime_error("No features found in " + file_name);
4632  }
4633  // get fields as regular columns
4634  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4635  CHECK(poFDefn);
4636  int iField;
4637  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4638  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4639  auto typePair = ogr_to_type(poFieldDefn->GetType());
4640  ColumnDescriptor cd;
4641  cd.columnName = poFieldDefn->GetNameRef();
4642  cd.sourceName = poFieldDefn->GetNameRef();
4643  SQLTypeInfo ti;
4644  if (typePair.second) {
4645  ti.set_type(kARRAY);
4646  ti.set_subtype(typePair.first);
4647  } else {
4648  ti.set_type(typePair.first);
4649  }
4650  if (typePair.first == kTEXT) {
4652  ti.set_comp_param(32);
4653  }
4654  ti.set_fixed_size();
4655  cd.columnType = ti;
4656  cds.push_back(cd);
4657  }
4658  // get geo column, if any
4659  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4660  if (poGeometry) {
4661  ColumnDescriptor cd;
4662  cd.columnName = geo_column_name;
4663  cd.sourceName = geo_column_name;
4664 
4665  // get GDAL type
4666  auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
4667 
4668  // if exploding, override any collection type to child type
4669  if (copy_params.geo_explode_collections) {
4670  if (ogr_type == wkbMultiPolygon) {
4671  ogr_type = wkbPolygon;
4672  } else if (ogr_type == wkbMultiLineString) {
4673  ogr_type = wkbLineString;
4674  } else if (ogr_type == wkbMultiPoint) {
4675  ogr_type = wkbPoint;
4676  }
4677  }
4678 
4679  // convert to internal type
4680  SQLTypes geoType = ogr_to_type(ogr_type);
4681 
4682  // for now, we promote POLYGON to MULTIPOLYGON (unless exploding)
4684  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4685  }
4686 
4687  // build full internal type
4688  SQLTypeInfo ti;
4689  ti.set_type(geoType);
4690  ti.set_subtype(copy_params.geo_coords_type);
4691  ti.set_input_srid(copy_params.geo_coords_srid);
4692  ti.set_output_srid(copy_params.geo_coords_srid);
4693  ti.set_compression(copy_params.geo_coords_encoding);
4694  ti.set_comp_param(copy_params.geo_coords_comp_param);
4695  cd.columnType = ti;
4696 
4697  cds.push_back(cd);
4698  }
4699  return cds;
4700 }
4701 
4702 bool Importer::gdalStatInternal(const std::string& path,
4703  const CopyParams& copy_params,
4704  bool also_dir) {
4705  // lazy init GDAL
4707 
4708  // set authorization tokens
4709  setGDALAuthorizationTokens(copy_params);
4710 
4711 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4712  // clear GDAL stat cache
4713  // without this, file existence will be cached, even if authentication changes
4714  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4715  VSICurlClearCache();
4716 #endif
4717 
4718  // stat path
4719  VSIStatBufL sb;
4720  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4721  if (result < 0) {
4722  return false;
4723  }
4724 
4725  // exists?
4726  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4727  return true;
4728  } else if (VSI_ISREG(sb.st_mode)) {
4729  return true;
4730  }
4731  return false;
4732 }
4733 
4734 /* static */
4735 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4736  return gdalStatInternal(path, copy_params, false);
4737 }
4738 
4739 /* static */
4740 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4741  const CopyParams& copy_params) {
4742  return gdalStatInternal(path, copy_params, true);
4743 }
4744 
4745 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4746  std::vector<std::string>& files) {
4747  // prepare to gather subdirectories
4748  std::vector<std::string> subdirectories;
4749 
4750  // get entries
4751  char** entries = VSIReadDir(archive_path.c_str());
4752  if (!entries) {
4753  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4754  return;
4755  }
4756 
4757  // force scope
4758  {
4759  // request clean-up
4760  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4761 
4762  // check all the entries
4763  int index = 0;
4764  while (true) {
4765  // get next entry, or drop out if there isn't one
4766  char* entry_c = entries[index++];
4767  if (!entry_c) {
4768  break;
4769  }
4770  std::string entry(entry_c);
4771 
4772  // ignore '.' and '..'
4773  if (entry == "." || entry == "..") {
4774  continue;
4775  }
4776 
4777  // build the full path
4778  std::string entry_path = archive_path + std::string("/") + entry;
4779 
4780  // is it a file or a sub-folder
4781  VSIStatBufL sb;
4782  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4783  if (result < 0) {
4784  break;
4785  }
4786 
4787  if (VSI_ISDIR(sb.st_mode)) {
4788  // a directory that ends with .gdb could be a Geodatabase bundle
4789  // arguably dangerous to decide this purely by name, but any further
4790  // validation would be very complex especially at this scope
4791  if (boost::iends_with(entry_path, ".gdb")) {
4792  // add the directory as if it was a file and don't recurse into it
4793  files.push_back(entry_path);
4794  } else {
4795  // add subdirectory to be recursed into
4796  subdirectories.push_back(entry_path);
4797  }
4798  } else {
4799  // add this file
4800  files.push_back(entry_path);
4801  }
4802  }
4803  }
4804 
4805  // recurse into each subdirectories we found
4806  for (const auto& subdirectory : subdirectories) {
4807  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4808  }
4809 }
4810 
4811 /* static */
4812 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4813  const std::string& archive_path,
4814  const CopyParams& copy_params) {
4815  // lazy init GDAL
4817 
4818  // set authorization tokens
4819  setGDALAuthorizationTokens(copy_params);
4820 
4821  // prepare to gather files
4822  std::vector<std::string> files;
4823 
4824  // gather the files recursively
4825  gdalGatherFilesInArchiveRecursive(archive_path, files);
4826 
4827  // convert to relative paths inside archive
4828  for (auto& file : files) {
4829  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4830  }
4831 
4832  // done
4833  return files;
4834 }
4835 
4836 /* static */
4837 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4838  const std::string& file_name,
4839  const CopyParams& copy_params) {
4840  // lazy init GDAL
4842 
4843  // set authorization tokens
4844  setGDALAuthorizationTokens(copy_params);
4845 
4846  // prepare to gather layer info
4847  std::vector<GeoFileLayerInfo> layer_info;
4848 
4849  // open the data set
4850  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4851  if (poDS == nullptr) {
4852  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4853  file_name);
4854  }
4855 
4856  // enumerate the layers
4857  for (auto&& poLayer : poDS->GetLayers()) {
4859  // prepare to read this layer
4860  poLayer->ResetReading();
4861  // skip layer if empty
4862  if (poLayer->GetFeatureCount() > 0) {
4863  // get first feature
4864  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4865  CHECK(first_feature);
4866  // check feature for geometry
4867  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4868  if (!geometry) {
4869  // layer has no geometry
4870  contents = GeoFileLayerContents::NON_GEO;
4871  } else {
4872  // check the geometry type
4873  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4874  switch (wkbFlatten(geometry_type)) {
4875  case wkbPoint:
4876  case wkbLineString:
4877  case wkbPolygon:
4878  case wkbMultiPolygon:
4879  // layer has supported geo
4880  contents = GeoFileLayerContents::GEO;
4881  break;
4882  case wkbMultiPoint:
4883  case wkbMultiLineString:
4884  // supported if geo_explode_collections is specified
4885  contents = copy_params.geo_explode_collections
4888  break;
4889  default:
4890  // layer has unsupported geometry
4892  break;
4893  }
4894  }
4895  }
4896  // store info for this layer
4897  layer_info.emplace_back(poLayer->GetName(), contents);
4898  }
4899 
4900  // done
4901  return layer_info;
4902 }
4903 
4905  const Catalog_Namespace::SessionInfo* session_info) {
4906  // initial status
4909  if (poDS == nullptr) {
4910  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4911  file_path);
4912  }
4913 
4914  OGRLayer& layer =
4916 
4917  // get the number of features in this layer
4918  size_t numFeatures = layer.GetFeatureCount();
4919 
4920  // build map of metadata field (additional columns) name to index
4921  // use shared_ptr since we need to pass it to the worker
4922  FieldNameToIndexMapType fieldNameToIndexMap;
4923  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4924  CHECK(poFDefn);
4925  size_t numFields = poFDefn->GetFieldCount();
4926  for (size_t iField = 0; iField < numFields; iField++) {
4927  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4928  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4929  }
4930 
4931  // the geographic spatial reference we want to put everything in
4932  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4933  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4934 
4935 #if GDAL_VERSION_MAJOR >= 3
4936  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4937  // this results in X and Y being transposed for angle-based
4938  // coordinate systems. This restores the previous behavior.
4939  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4940 #endif
4941 
4942 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4943  // just one "thread"
4944  max_threads = 1;
4945 #else
4946  // how many threads to use
4947  if (copy_params.threads == 0) {
4948  max_threads = std::min(static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)),
4950  } else {
4951  max_threads = copy_params.threads;
4952  }
4953 #endif
4954 
4955  VLOG(1) << "GDAL import # threads: " << max_threads;
4956 
4957  // import geo table is specifically handled in both DBHandler and QueryRunner
4958  // that is separate path against a normal SQL execution
4959  // so we here explicitly enroll the import session to allow interruption
4960  // while importing geo table
4961  auto query_session = session_info ? session_info->get_session_id() : "";
4962  auto query_submitted_time = ::toString(std::chrono::system_clock::now());
4964  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
4965  executor->enrollQuerySession(query_session,
4966  "Import Geo Table",
4967  query_submitted_time,
4969  QuerySessionStatus::QueryStatus::RUNNING);
4970  }
4971 
4972  ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
4973  // reset the runtime query interrupt status
4974  if (g_enable_non_kernel_time_query_interrupt && !query_session.empty()) {
4975  executor->clearQuerySessionStatus(query_session, query_submitted_time, false);
4976  }
4977  };
4978 
4979  // make an import buffer for each thread
4980  CHECK_EQ(import_buffers_vec.size(), 0u);
4981  import_buffers_vec.resize(max_threads);
4982  for (size_t i = 0; i < max_threads; i++) {
4983  for (const auto cd : loader->get_column_descs()) {
4984  import_buffers_vec[i].emplace_back(
4985  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4986  }
4987  }
4988 
4989  // make render group analyzers for each poly column
4990  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4992  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4993  loader->getTableDesc()->tableId, false, false, false);
4994  for (auto cd : columnDescriptors) {
4995  SQLTypes ct = cd->columnType.get_type();
4996  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4997  auto rga = std::make_shared<RenderGroupAnalyzer>();
4998  rga->seedFromExistingTableContents(loader, cd->columnName);
4999  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
5000  }
5001  }
5002  }
5003 
5004 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5005  // threads
5006  std::list<std::future<ImportStatus>> threads;
5007 
5008  // use a stack to track thread_ids which must not overlap among threads
5009  // because thread_id is used to index import_buffers_vec[]
5010  std::stack<size_t> stack_thread_ids;
5011  for (size_t i = 0; i < max_threads; i++) {
5012  stack_thread_ids.push(i);
5013  }
5014 #endif
5015 
5016  // checkpoint the table
5017  auto table_epochs = loader->getTableEpochs();
5018 
5019  // reset the layer
5020  layer.ResetReading();
5021 
5022  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5023 
5024  // make a features buffer for each thread
5025  std::vector<FeaturePtrVector> features(max_threads);
5026 
5027  // for each feature...
5028  size_t firstFeatureThisChunk = 0;
5029  while (firstFeatureThisChunk < numFeatures) {
5030  // how many features this chunk
5031  size_t numFeaturesThisChunk =
5032  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5033 
5034 // get a thread_id not in use
5035 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5036  size_t thread_id = 0;
5037 #else
5038  auto thread_id = stack_thread_ids.top();
5039  stack_thread_ids.pop();
5040  CHECK(thread_id < max_threads);
5041 #endif
5042 
5043  // fill features buffer for new thread
5044  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
5045  features[thread_id].emplace_back(layer.GetNextFeature());
5046  }
5047 
5048 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5049  // call worker function directly
5050  auto ret_import_status = import_thread_shapefile(0,
5051  this,
5052  poGeographicSR.get(),
5053  std::move(features[thread_id]),
5054  firstFeatureThisChunk,
5055  numFeaturesThisChunk,
5056  fieldNameToIndexMap,
5057  columnNameToSourceNameMap,
5058  columnIdToRenderGroupAnalyzerMap,
5059  session_info,
5060  executor.get());
5061  import_status += ret_import_status;
5062  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
5063  import_status.rows_completed;
5064  set_import_status(import_id, import_status);
5065 #else
5066  // fire up that thread to import this geometry
5067  threads.push_back(std::async(std::launch::async,
5069  thread_id,
5070  this,
5071  poGeographicSR.get(),
5072  std::move(features[thread_id]),
5073  firstFeatureThisChunk,
5074  numFeaturesThisChunk,
5075  fieldNameToIndexMap,
5076  columnNameToSourceNameMap,
5077  columnIdToRenderGroupAnalyzerMap,
5078  session_info,
5079  executor.get()));
5080 
5081  // let the threads run
5082  while (threads.size() > 0) {
5083  int nready = 0;
5084  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5085  it != threads.end();) {
5086  auto& p = *it;
5087  std::chrono::milliseconds span(
5088  0); //(std::distance(it, threads.end()) == 1? 1: 0);
5089  if (p.wait_for(span) == std::future_status::ready) {
5090  auto ret_import_status = p.get();
5091  {
5092  mapd_lock_guard<mapd_shared_mutex> write_lock(import_mutex_);
5093  import_status_ += ret_import_status;
5095  ((float)firstFeatureThisChunk / (float)numFeatures) *
5099  break;
5100  }
5101  }
5102  // recall thread_id for reuse
5103  stack_thread_ids.push(ret_import_status.thread_id);
5104 
5105  threads.erase(it++);
5106  ++nready;
5107  } else {
5108  ++it;
5109  }
5110  }
5111 
5112  if (nready == 0) {
5113  std::this_thread::yield();
5114  }
5115 
5116  // keep reading if any free thread slot
5117  // this is one of the major difference from old threading model !!
5118  if (threads.size() < max_threads) {
5119  break;
5120  }
5121  }
5122 #endif
5123 
5124  // out of rows?
5125 
5126  mapd_unique_lock<mapd_shared_mutex> write_lock(import_mutex_);
5128  import_status_.load_failed = true;
5129  // todo use better message
5130  import_status_.load_msg = "Maximum rows rejected exceeded. Halting load";
5131  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
5132  break;
5133  }
5135  LOG(ERROR) << "A call to the Loader failed in GDAL, Please review the logs for "
5136  "more details";
5137  break;
5138  }
5139 
5140  firstFeatureThisChunk += numFeaturesThisChunk;
5141  }
5142 
5143 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5144  // wait for any remaining threads
5145  if (threads.size()) {
5146  for (auto& p : threads) {
5147  // wait for the thread
5148  p.wait();
5149  // get the result and update the final import status
5150  auto ret_import_status = p.get();
5151  import_status_ += ret_import_status;
5154  }
5155  }
5156 #endif
5157 
5158  checkpoint(table_epochs);
5159 
5160  return import_status_;
5161 }
5162 
5163 //
5164 // class RenderGroupAnalyzer
5165 //
5166 
5168  const std::unique_ptr<Loader>& loader,
5169  const std::string& geoColumnBaseName) {
5170  // start timer
5171  auto seedTimer = timer_start();
5172 
5173  // start with a fresh tree
5174  _rtree = nullptr;
5175  _numRenderGroups = 0;
5176 
5177  // get the table descriptor
5178  const auto& cat = loader->getCatalog();
5179  if (loader->getTableDesc()->storageType == StorageType::FOREIGN_TABLE) {
5181  LOG(INFO) << "DEBUG: Table is a foreign table";
5182  }
5183  _rtree = std::make_unique<RTree>();
5184  CHECK(_rtree);
5185  return;
5186  }
5187 
5188  const std::string& tableName = loader->getTableDesc()->tableName;
5189  const auto td = cat.getMetadataForTable(tableName);
5190  CHECK(td);
5191  CHECK(td->fragmenter);
5192 
5193  // if the table is empty, just make an empty tree
5194  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
5196  LOG(INFO) << "DEBUG: Table is empty!";
5197  }
5198  _rtree = std::make_unique<RTree>();
5199  CHECK(_rtree);
5200  return;
5201  }
5202 
5203  // no seeding possible without these two columns
5204  const auto cd_bounds =
5205  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
5206  const auto cd_render_group =
5207  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
5208  if (!cd_bounds || !cd_render_group) {
5209  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5210  " doesn't have bounds or render_group columns!");
5211  }
5212 
5213  // and validate their types
5214  if (cd_bounds->columnType.get_type() != kARRAY ||
5215  cd_bounds->columnType.get_subtype() != kDOUBLE) {
5216  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5217  " bounds column is wrong type!");
5218  }
5219  if (cd_render_group->columnType.get_type() != kINT) {
5220  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
5221  " render_group column is wrong type!");
5222  }
5223 
5224  // get chunk accessor table
5225  auto chunkAccessorTable = getChunkAccessorTable(
5226  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
5227  const auto table_count = std::get<0>(chunkAccessorTable.back());
5228 
5230  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
5231  << "'";
5232  }
5233 
5234  std::vector<Node> nodes;
5235  try {
5236  nodes.resize(table_count);
5237  } catch (const std::exception& e) {
5238  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
5239  std::to_string(table_count) + " rows");
5240  }
5241 
5242  for (size_t row = 0; row < table_count; row++) {
5243  ArrayDatum ad;
5244  VarlenDatum vd;
5245  bool is_end;
5246 
5247  // get ChunkIters and fragment row offset
5248  size_t rowOffset = 0;
5249  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
5250  auto& boundsChunkIter = chunkIters[0];
5251  auto& renderGroupChunkIter = chunkIters[1];
5252 
5253  // get bounds values
5254  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
5255  CHECK(!is_end);
5256  CHECK(ad.pointer);
5257  int numBounds = (int)(ad.length / sizeof(double));
5258  CHECK(numBounds == 4);
5259 
5260  // convert to bounding box
5261  double* bounds = reinterpret_cast<double*>(ad.pointer);
5262  BoundingBox bounding_box;
5263  boost::geometry::assign_inverse(bounding_box);
5264  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5265  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5266 
5267  // get render group
5268  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
5269  CHECK(!is_end);
5270  CHECK(vd.pointer);
5271  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
5272 
5273  // skip rows with invalid render groups (e.g. EMPTY geometry)
5274  if (renderGroup < 0) {
5275  continue;
5276  }
5277 
5278  // store
5279  nodes[row] = std::make_pair(bounding_box, renderGroup);
5280 
5281  // how many render groups do we have now?
5282  if (renderGroup >= _numRenderGroups) {
5283  _numRenderGroups = renderGroup + 1;
5284  }
5285 
5287  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
5288  }
5289  }
5290 
5291  // bulk-load the tree
5292  auto bulk_load_timer = timer_start();
5293  _rtree = std::make_unique<RTree>(nodes);
5294  CHECK(_rtree);
5295  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
5296  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
5297  << timer_stop(bulk_load_timer) << " ms for tree)";
5298 
5300  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
5301  }
5302 }
5303 
5305  const std::vector<double>& bounds) {
5306  // validate
5307  CHECK(bounds.size() == 4);
5308 
5309  // get bounds
5310  BoundingBox bounding_box;
5311  boost::geometry::assign_inverse(bounding_box);
5312  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
5313  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
5314 
5315  // remainder under mutex to allow this to be multi-threaded
5316  std::lock_guard<std::mutex> guard(_rtreeMutex);
5317 
5318  // get the intersecting nodes
5319  std::vector<Node> intersects;
5320  _rtree->query(boost::geometry::index::intersects(bounding_box),
5321  std::back_inserter(intersects));
5322 
5323  // build bitset of render groups of the intersecting rectangles
5324  // clear bit means available, allows use of find_first()
5325  boost::dynamic_bitset<> bits(_numRenderGroups);
5326  bits.set();
5327  for (const auto& intersection : intersects) {
5328  CHECK(intersection.second < _numRenderGroups);
5329  bits.reset(intersection.second);
5330  }
5331 
5332  // find first available group
5333  int firstAvailableRenderGroup = 0;
5334  size_t firstSetBit = bits.find_first();
5335  if (firstSetBit == boost::dynamic_bitset<>::npos) {
5336  // all known groups represented, add a new one
5337  firstAvailableRenderGroup = _numRenderGroups;
5338  _numRenderGroups++;
5339  } else {
5340  firstAvailableRenderGroup = (int)firstSetBit;
5341  }
5342 
5343  // insert new node
5344  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
5345 
5346  // return it
5347  return firstAvailableRenderGroup;
5348 }
5349 
5350 std::vector<std::unique_ptr<TypedImportBuffer>> setup_column_loaders(
5351  const TableDescriptor* td,
5352  Loader* loader) {
5353  CHECK(td);
5354  auto col_descs = loader->get_column_descs();
5355 
5356  std::vector<std::unique_ptr<TypedImportBuffer>> import_buffers;
5357  for (auto cd : col_descs) {
5358  import_buffers.emplace_back(
5359  std::make_unique<TypedImportBuffer>(cd, loader->getStringDict(cd)));
5360  }
5361 
5362  return import_buffers;
5363 }
5364 
5365 [[nodiscard]] std::vector<std::unique_ptr<TypedImportBuffer>> fill_missing_columns(
5367  Fragmenter_Namespace::InsertData& insert_data) {
5368  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> defaults_buffers;
5369  if (insert_data.is_default.size() == 0) {
5370  insert_data.is_default.resize(insert_data.columnIds.size(), false);
5371  }
5372  CHECK(insert_data.is_default.size() == insert_data.is_default.size());
5373  auto cds = cat->getAllColumnMetadataForTable(insert_data.tableId, false, false, true);
5374  if (cds.size() == insert_data.columnIds.size()) {
5375  // all columns specified
5376  return defaults_buffers;
5377  }
5378  for (auto cd : cds) {
5379  if (std::find(insert_data.columnIds.begin(),
5380  insert_data.columnIds.end(),
5381  cd->columnId) == insert_data.columnIds.end()) {
5382  StringDictionary* dict = nullptr;
5383  if (cd->columnType.get_type() == kARRAY &&
5384  IS_STRING(cd->columnType.get_subtype())) {
5385  throw std::runtime_error("Cannot omit column \"" + cd->columnName +
5386</