OmniSciDB  04ee39c94c
Importer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * @file Importer.cpp
19  * @author Wei Hong <wei@mapd.com>
20  * @brief Functions for Importer class
21  */
22 
23 #include <gdal.h>
24 #include <ogrsf_frmts.h>
25 #include <unistd.h>
26 #include <boost/algorithm/string.hpp>
27 #include <boost/dynamic_bitset.hpp>
28 #include <boost/filesystem.hpp>
29 #include <boost/variant.hpp>
30 #include <csignal>
31 #include <cstdio>
32 #include <cstdlib>
33 #include <fstream>
34 #include <future>
35 #include <iomanip>
36 #include <list>
37 #include <memory>
38 #include <mutex>
39 #include <numeric>
40 #include <stack>
41 #include <stdexcept>
42 #include <thread>
43 #include <typeinfo>
44 #include <unordered_map>
45 #include <unordered_set>
46 #include <utility>
47 #include <vector>
48 #include "../QueryEngine/TypePunning.h"
49 #include "../Shared/geo_compression.h"
50 #include "../Shared/geo_types.h"
51 #include "../Shared/geosupport.h"
52 #include "../Shared/import_helpers.h"
53 #include "../Shared/mapd_glob.h"
54 #include "../Shared/mapdpath.h"
55 #include "../Shared/measure.h"
56 #include "../Shared/scope.h"
57 #include "../Shared/shard_key.h"
58 #include "../Shared/thread_count.h"
59 #include "Shared/Logger.h"
60 #include "Shared/SqlTypesLayout.h"
61 
62 #include "Importer.h"
65 #include "gen-cpp/MapD.h"
66 
67 #include <arrow/api.h>
68 #include <arrow/io/api.h>
69 
70 #include "../Archive/PosixFileArchive.h"
71 #include "../Archive/S3Archive.h"
72 #include "ArrowImporter.h"
73 
74 inline auto get_filesize(const std::string& file_path) {
75  boost::filesystem::path boost_file_path{file_path};
77  const auto filesize = boost::filesystem::file_size(boost_file_path, ec);
78  return ec ? 0 : filesize;
79 }
80 
81 namespace {
82 
84  void operator()(OGRDataSource* datasource) {
85  if (datasource) {
86  GDALClose(datasource);
87  }
88  }
89 };
90 using OGRDataSourceUqPtr = std::unique_ptr<OGRDataSource, OGRDataSourceDeleter>;
91 
93  void operator()(OGRFeature* feature) {
94  if (feature) {
95  OGRFeature::DestroyFeature(feature);
96  }
97  }
98 };
99 using OGRFeatureUqPtr = std::unique_ptr<OGRFeature, OGRFeatureDeleter>;
100 
102  void operator()(OGRSpatialReference* ref) {
103  if (ref) {
104  OGRSpatialReference::DestroySpatialReference(ref);
105  }
106  }
107 };
109  std::unique_ptr<OGRSpatialReference, OGRSpatialReferenceDeleter>;
110 
111 } // namespace
112 
113 // For logging std::vector<std::string> row.
114 namespace boost {
115 namespace log {
116 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
117  out << '[';
118  for (size_t i = 0; i < row.size(); ++i) {
119  out << (i ? ", " : "") << row[i];
120  }
121  out << ']';
122  return out;
123 }
124 } // namespace log
125 } // namespace boost
126 
127 namespace Importer_NS {
128 
129 using FieldNameToIndexMapType = std::map<std::string, size_t>;
130 using ColumnNameToSourceNameMapType = std::map<std::string, std::string>;
132  std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
133 using FeaturePtrVector = std::vector<OGRFeatureUqPtr>;
134 
135 #define DEBUG_TIMING false
136 #define DEBUG_RENDER_GROUP_ANALYZER 0
137 #define DEBUG_AWS_AUTHENTICATION 0
138 
139 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
140 
141 static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
142 
144 static std::map<std::string, ImportStatus> import_status_map;
145 
146 Importer::Importer(Catalog_Namespace::Catalog& c,
147  const TableDescriptor* t,
148  const std::string& f,
149  const CopyParams& p)
150  : Importer(new Loader(c, t), f, p) {}
151 
152 Importer::Importer(Loader* providedLoader, const std::string& f, const CopyParams& p)
153  : DataStreamSink(p, f), loader(providedLoader) {
154  import_id = boost::filesystem::path(file_path).filename().string();
155  file_size = 0;
156  max_threads = 0;
157  p_file = nullptr;
158  buffer[0] = nullptr;
159  buffer[1] = nullptr;
160  // we may be overallocating a little more memory here due to dropping phy cols.
161  // it shouldn't be an issue because iteration of it is not supposed to go OOB.
162  auto is_array = std::unique_ptr<bool[]>(new bool[loader->get_column_descs().size()]);
163  int i = 0;
164  bool has_array = false;
165  // TODO: replace this ugly way of skipping phy cols once if isPhyGeo is defined
166  int skip_physical_cols = 0;
167  for (auto& p : loader->get_column_descs()) {
168  // phy geo columns can't be in input file
169  if (skip_physical_cols-- > 0) {
170  continue;
171  }
172  // neither are rowid or $deleted$
173  // note: columns can be added after rowid/$deleted$
174  if (p->isVirtualCol || p->isDeletedCol) {
175  continue;
176  }
177  skip_physical_cols = p->columnType.get_physical_cols();
178  if (p->columnType.get_type() == kARRAY) {
179  is_array.get()[i] = true;
180  has_array = true;
181  } else {
182  is_array.get()[i] = false;
183  }
184  ++i;
185  }
186  if (has_array) {
187  is_array_a = std::unique_ptr<bool[]>(is_array.release());
188  } else {
189  is_array_a = std::unique_ptr<bool[]>(nullptr);
190  }
191 }
192 
194  if (p_file != nullptr) {
195  fclose(p_file);
196  }
197  if (buffer[0] != nullptr) {
198  free(buffer[0]);
199  }
200  if (buffer[1] != nullptr) {
201  free(buffer[1]);
202  }
203 }
204 
206  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
207  return import_status_map.at(import_id);
208 }
209 
210 void Importer::set_import_status(const std::string& import_id, ImportStatus is) {
211  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
212  is.end = std::chrono::steady_clock::now();
213  is.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.end - is.start);
214  import_status_map[import_id] = is;
215 }
216 
217 static const std::string trim_space(const char* field, const size_t len) {
218  size_t i = 0;
219  size_t j = len;
220  while (i < j && (field[i] == ' ' || field[i] == '\r')) {
221  i++;
222  }
223  while (i < j && (field[j - 1] == ' ' || field[j - 1] == '\r')) {
224  j--;
225  }
226  return std::string(field + i, j - i);
227 }
228 
229 static const bool is_eol(const char& p, const std::string& line_delims) {
230  for (auto i : line_delims) {
231  if (p == i) {
232  return true;
233  }
234  }
235  return false;
236 }
237 
238 static const char* get_row(const char* buf,
239  const char* buf_end,
240  const char* entire_buf_end,
241  const CopyParams& copy_params,
242  bool is_begin,
243  const bool* is_array,
244  std::vector<std::string>& row,
245  bool& try_single_thread) {
246  const char* field = buf;
247  const char* p;
248  bool in_quote = false;
249  bool in_array = false;
250  bool has_escape = false;
251  bool strip_quotes = false;
252  try_single_thread = false;
253  std::string line_endings({copy_params.line_delim, '\r', '\n'});
254  for (p = buf; p < entire_buf_end; p++) {
255  if (*p == copy_params.escape && p < entire_buf_end - 1 &&
256  *(p + 1) == copy_params.quote) {
257  p++;
258  has_escape = true;
259  } else if (copy_params.quoted && *p == copy_params.quote) {
260  in_quote = !in_quote;
261  if (in_quote) {
262  strip_quotes = true;
263  }
264  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_begin &&
265  is_array[row.size()]) {
266  in_array = true;
267  } else if (!in_quote && is_array != nullptr && *p == copy_params.array_end &&
268  is_array[row.size()]) {
269  in_array = false;
270  } else if (*p == copy_params.delimiter || is_eol(*p, line_endings)) {
271  if (!in_quote && !in_array) {
272  if (!has_escape && !strip_quotes) {
273  std::string s = trim_space(field, p - field);
274  row.push_back(s);
275  } else {
276  auto field_buf = std::make_unique<char[]>(p - field + 1);
277  int j = 0, i = 0;
278  for (; i < p - field; i++, j++) {
279  if (has_escape && field[i] == copy_params.escape &&
280  field[i + 1] == copy_params.quote) {
281  field_buf[j] = copy_params.quote;
282  i++;
283  } else {
284  field_buf[j] = field[i];
285  }
286  }
287  std::string s = trim_space(field_buf.get(), j);
288  if (copy_params.quoted && s.size() > 0 && s.front() == copy_params.quote) {
289  s.erase(0, 1);
290  }
291  if (copy_params.quoted && s.size() > 0 && s.back() == copy_params.quote) {
292  s.pop_back();
293  }
294  row.push_back(s);
295  }
296  field = p + 1;
297  has_escape = false;
298  strip_quotes = false;
299  }
300  if (is_eol(*p, line_endings) &&
301  ((!in_quote && !in_array) || copy_params.threads != 1)) {
302  while (p + 1 < buf_end && is_eol(*(p + 1), line_endings)) {
303  p++;
304  }
305  break;
306  }
307  }
308  }
309  /*
310  @TODO(wei) do error handling
311  */
312  if (in_quote) {
313  LOG(ERROR) << "Unmatched quote.";
314  try_single_thread = true;
315  }
316  if (in_array) {
317  LOG(ERROR) << "Unmatched array.";
318  try_single_thread = true;
319  }
320  return p;
321 }
322 
323 int8_t* appendDatum(int8_t* buf, Datum d, const SQLTypeInfo& ti) {
324  switch (ti.get_type()) {
325  case kBOOLEAN:
326  *(bool*)buf = d.boolval;
327  return buf + sizeof(bool);
328  case kNUMERIC:
329  case kDECIMAL:
330  case kBIGINT:
331  *(int64_t*)buf = d.bigintval;
332  return buf + sizeof(int64_t);
333  case kINT:
334  *(int32_t*)buf = d.intval;
335  return buf + sizeof(int32_t);
336  case kSMALLINT:
337  *(int16_t*)buf = d.smallintval;
338  return buf + sizeof(int16_t);
339  case kTINYINT:
340  *(int8_t*)buf = d.tinyintval;
341  return buf + sizeof(int8_t);
342  case kFLOAT:
343  *(float*)buf = d.floatval;
344  return buf + sizeof(float);
345  case kDOUBLE:
346  *(double*)buf = d.doubleval;
347  return buf + sizeof(double);
348  case kTIME:
349  case kTIMESTAMP:
350  case kDATE:
351  *reinterpret_cast<int64_t*>(buf) = d.bigintval;
352  return buf + sizeof(int64_t);
353  default:
354  return NULL;
355  }
356  return NULL;
357 }
358 
360  Datum d;
361  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
362  switch (type) {
363  case kBOOLEAN:
365  break;
366  case kBIGINT:
368  break;
369  case kINT:
371  break;
372  case kSMALLINT:
374  break;
375  case kTINYINT:
377  break;
378  case kFLOAT:
379  d.floatval = NULL_FLOAT;
380  break;
381  case kDOUBLE:
383  break;
384  case kTIME:
385  case kTIMESTAMP:
386  case kDATE:
388  break;
389  case kPOINT:
390  case kLINESTRING:
391  case kPOLYGON:
392  case kMULTIPOLYGON:
393  throw std::runtime_error("Internal error: geometry type in NullDatum.");
394  default:
395  throw std::runtime_error("Internal error: invalid type in NullDatum.");
396  }
397  return d;
398 }
399 
401  Datum d;
402  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
403  switch (type) {
404  case kBOOLEAN:
406  break;
407  case kBIGINT:
409  break;
410  case kINT:
412  break;
413  case kSMALLINT:
415  break;
416  case kTINYINT:
418  break;
419  case kFLOAT:
421  break;
422  case kDOUBLE:
424  break;
425  case kTIME:
426  case kTIMESTAMP:
427  case kDATE:
429  break;
430  case kPOINT:
431  case kLINESTRING:
432  case kPOLYGON:
433  case kMULTIPOLYGON:
434  throw std::runtime_error("Internal error: geometry type in NullArrayDatum.");
435  default:
436  throw std::runtime_error("Internal error: invalid type in NullArrayDatum.");
437  }
438  return d;
439 }
440 
441 ArrayDatum StringToArray(const std::string& s,
442  const SQLTypeInfo& ti,
443  const CopyParams& copy_params) {
444  SQLTypeInfo elem_ti = ti.get_elem_type();
445  if (s == copy_params.null_str || s == "NULL" || s.empty()) {
446  return ArrayDatum(0, NULL, true);
447  }
448  if (s[0] != copy_params.array_begin || s[s.size() - 1] != copy_params.array_end) {
449  LOG(WARNING) << "Malformed array: " << s;
450  return ArrayDatum(0, NULL, true);
451  }
452  std::vector<std::string> elem_strs;
453  size_t last = 1;
454  for (size_t i = s.find(copy_params.array_delim, 1); i != std::string::npos;
455  i = s.find(copy_params.array_delim, last)) {
456  elem_strs.push_back(s.substr(last, i - last));
457  last = i + 1;
458  }
459  if (last + 1 <= s.size()) {
460  elem_strs.push_back(s.substr(last, s.size() - 1 - last));
461  }
462  if (elem_strs.size() == 1) {
463  auto str = elem_strs.front();
464  auto str_trimmed = trim_space(str.c_str(), str.length());
465  if (str_trimmed == "") {
466  elem_strs.clear(); // Empty array
467  }
468  }
469  if (!elem_ti.is_string()) {
470  size_t len = elem_strs.size() * elem_ti.get_size();
471  int8_t* buf = (int8_t*)checked_malloc(len);
472  int8_t* p = buf;
473  for (auto& es : elem_strs) {
474  auto e = trim_space(es.c_str(), es.length());
475  bool is_null = (e == copy_params.null_str) || e == "NULL";
476  if (!elem_ti.is_string() && e == "") {
477  is_null = true;
478  }
479  if (elem_ti.is_number() || elem_ti.is_time()) {
480  if (!isdigit(e[0]) && e[0] != '-') {
481  is_null = true;
482  }
483  }
484  Datum d = is_null ? NullDatum(elem_ti) : StringToDatum(e, elem_ti);
485  p = appendDatum(p, d, elem_ti);
486  }
487  return ArrayDatum(len, buf, false);
488  }
489  // must not be called for array of strings
490  CHECK(false);
491  return ArrayDatum(0, NULL, true);
492 }
493 
495  SQLTypeInfo elem_ti = ti.get_elem_type();
496  auto len = ti.get_size();
497 
498  if (elem_ti.is_string()) {
499  // must not be called for array of strings
500  CHECK(false);
501  return ArrayDatum(0, NULL, true);
502  }
503 
504  if (len > 0) {
505  // Compose a NULL fixlen array
506  int8_t* buf = (int8_t*)checked_malloc(len);
507  // First scalar is a NULL_ARRAY sentinel
508  Datum d = NullArrayDatum(elem_ti);
509  int8_t* p = appendDatum(buf, d, elem_ti);
510  // Rest is filled with normal NULL sentinels
511  Datum d0 = NullDatum(elem_ti);
512  while ((p - buf) < len) {
513  p = appendDatum(p, d0, elem_ti);
514  }
515  CHECK((p - buf) == len);
516  return ArrayDatum(len, buf, true);
517  }
518  // NULL varlen array
519  return ArrayDatum(0, NULL, true);
520 }
521 
523  return NullArray(ti);
524 }
525 
526 void addBinaryStringArray(const TDatum& datum, std::vector<std::string>& string_vec) {
527  const auto& arr = datum.val.arr_val;
528  for (const auto& elem_datum : arr) {
529  string_vec.push_back(elem_datum.val.str_val);
530  }
531 }
532 
533 Datum TDatumToDatum(const TDatum& datum, SQLTypeInfo& ti) {
534  Datum d;
535  const auto type = ti.is_decimal() ? decimal_to_int_type(ti) : ti.get_type();
536  switch (type) {
537  case kBOOLEAN:
538  d.boolval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
539  break;
540  case kBIGINT:
541  d.bigintval =
542  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
543  break;
544  case kINT:
545  d.intval = datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
546  break;
547  case kSMALLINT:
548  d.smallintval =
549  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
550  break;
551  case kTINYINT:
552  d.tinyintval =
553  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
554  break;
555  case kFLOAT:
556  d.floatval = datum.is_null ? NULL_FLOAT : datum.val.real_val;
557  break;
558  case kDOUBLE:
559  d.doubleval = datum.is_null ? NULL_DOUBLE : datum.val.real_val;
560  break;
561  case kTIME:
562  case kTIMESTAMP:
563  case kDATE:
564  d.bigintval =
565  datum.is_null ? inline_fixed_encoding_null_val(ti) : datum.val.int_val;
566  break;
567  case kPOINT:
568  case kLINESTRING:
569  case kPOLYGON:
570  case kMULTIPOLYGON:
571  throw std::runtime_error("Internal error: geometry type in TDatumToDatum.");
572  default:
573  throw std::runtime_error("Internal error: invalid type in TDatumToDatum.");
574  }
575  return d;
576 }
577 
578 ArrayDatum TDatumToArrayDatum(const TDatum& datum, const SQLTypeInfo& ti) {
579  SQLTypeInfo elem_ti = ti.get_elem_type();
580 
581  CHECK(!elem_ti.is_string());
582 
583  if (datum.is_null) {
584  return NullArray(ti);
585  }
586 
587  size_t len = datum.val.arr_val.size() * elem_ti.get_size();
588  int8_t* buf = (int8_t*)checked_malloc(len);
589  int8_t* p = buf;
590  for (auto& e : datum.val.arr_val) {
591  p = appendDatum(p, TDatumToDatum(e, elem_ti), elem_ti);
592  }
593 
594  return ArrayDatum(len, buf, false);
595 }
596 
597 static size_t find_beginning(const char* buffer,
598  size_t begin,
599  size_t end,
600  const CopyParams& copy_params) {
601  // @TODO(wei) line_delim is in quotes note supported
602  if (begin == 0 || (begin > 0 && buffer[begin - 1] == copy_params.line_delim)) {
603  return 0;
604  }
605  size_t i;
606  const char* buf = buffer + begin;
607  for (i = 0; i < end - begin; i++) {
608  if (buf[i] == copy_params.line_delim) {
609  return i + 1;
610  }
611  }
612  return i;
613 }
614 
616  const std::string& val,
617  const bool is_null,
618  const CopyParams& copy_params,
619  const int64_t replicate_count) {
620  set_replicate_count(replicate_count);
621  const auto type = cd->columnType.get_type();
622  switch (type) {
623  case kBOOLEAN: {
624  if (is_null) {
625  if (cd->columnType.get_notnull()) {
626  throw std::runtime_error("NULL for column " + cd->columnName);
627  }
629  } else {
630  SQLTypeInfo ti = cd->columnType;
631  Datum d = StringToDatum(val, ti);
632  addBoolean((int8_t)d.boolval);
633  }
634  break;
635  }
636  case kTINYINT: {
637  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
638  SQLTypeInfo ti = cd->columnType;
639  Datum d = StringToDatum(val, ti);
640  addTinyint(d.tinyintval);
641  } else {
642  if (cd->columnType.get_notnull()) {
643  throw std::runtime_error("NULL for column " + cd->columnName);
644  }
646  }
647  break;
648  }
649  case kSMALLINT: {
650  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
651  SQLTypeInfo ti = cd->columnType;
652  Datum d = StringToDatum(val, ti);
653  addSmallint(d.smallintval);
654  } else {
655  if (cd->columnType.get_notnull()) {
656  throw std::runtime_error("NULL for column " + cd->columnName);
657  }
658  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
659  }
660  break;
661  }
662  case kINT: {
663  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
664  SQLTypeInfo ti = cd->columnType;
665  Datum d = StringToDatum(val, ti);
666  addInt(d.intval);
667  } else {
668  if (cd->columnType.get_notnull()) {
669  throw std::runtime_error("NULL for column " + cd->columnName);
670  }
672  }
673  break;
674  }
675  case kBIGINT: {
676  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
677  SQLTypeInfo ti = cd->columnType;
678  Datum d = StringToDatum(val, ti);
679  addBigint(d.bigintval);
680  } else {
681  if (cd->columnType.get_notnull()) {
682  throw std::runtime_error("NULL for column " + cd->columnName);
683  }
685  }
686  break;
687  }
688  case kDECIMAL:
689  case kNUMERIC: {
690  if (!is_null) {
691  SQLTypeInfo ti(kNUMERIC, 0, 0, false);
692  Datum d = StringToDatum(val, ti);
693  const auto converted_decimal_value =
695  addBigint(converted_decimal_value);
696  } else {
697  if (cd->columnType.get_notnull()) {
698  throw std::runtime_error("NULL for column " + cd->columnName);
699  }
701  }
702  break;
703  }
704  case kFLOAT:
705  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
706  addFloat((float)std::atof(val.c_str()));
707  } else {
708  if (cd->columnType.get_notnull()) {
709  throw std::runtime_error("NULL for column " + cd->columnName);
710  }
711  addFloat(NULL_FLOAT);
712  }
713  break;
714  case kDOUBLE:
715  if (!is_null && (val[0] == '.' || isdigit(val[0]) || val[0] == '-')) {
716  addDouble(std::atof(val.c_str()));
717  } else {
718  if (cd->columnType.get_notnull()) {
719  throw std::runtime_error("NULL for column " + cd->columnName);
720  }
721  addDouble(NULL_DOUBLE);
722  }
723  break;
724  case kTEXT:
725  case kVARCHAR:
726  case kCHAR: {
727  // @TODO(wei) for now, use empty string for nulls
728  if (is_null) {
729  if (cd->columnType.get_notnull()) {
730  throw std::runtime_error("NULL for column " + cd->columnName);
731  }
732  addString(std::string());
733  } else {
734  if (val.length() > StringDictionary::MAX_STRLEN) {
735  throw std::runtime_error("String too long for column " + cd->columnName +
736  " was " + std::to_string(val.length()) + " max is " +
738  }
739  addString(val);
740  }
741  break;
742  }
743  case kTIME:
744  case kTIMESTAMP:
745  case kDATE:
746  if (!is_null && (isdigit(val[0]) || val[0] == '-')) {
747  SQLTypeInfo ti = cd->columnType;
748  Datum d = StringToDatum(val, ti);
749  addBigint(d.bigintval);
750  } else {
751  if (cd->columnType.get_notnull()) {
752  throw std::runtime_error("NULL for column " + cd->columnName);
753  }
755  }
756  break;
757  case kARRAY: {
758  if (is_null && cd->columnType.get_notnull()) {
759  throw std::runtime_error("NULL for column " + cd->columnName);
760  }
761  SQLTypeInfo ti = cd->columnType;
762  if (IS_STRING(ti.get_subtype())) {
763  std::vector<std::string> string_vec;
764  // Just parse string array, don't push it to buffer yet as we might throw
765  ImporterUtils::parseStringArray(val, copy_params, string_vec);
766  if (!is_null) {
767  // TODO: add support for NULL string arrays
768  if (ti.get_size() > 0) {
769  auto sti = ti.get_elem_type();
770  size_t expected_size = ti.get_size() / sti.get_size();
771  size_t actual_size = string_vec.size();
772  if (actual_size != expected_size) {
773  throw std::runtime_error("Fixed length array column " + cd->columnName +
774  " expects " + std::to_string(expected_size) +
775  " values, received " +
776  std::to_string(actual_size));
777  }
778  }
779  addStringArray(string_vec);
780  } else {
781  if (ti.get_size() > 0) {
782  // TODO: remove once NULL fixlen arrays are allowed
783  throw std::runtime_error("Fixed length array column " + cd->columnName +
784  " currently cannot accept NULL arrays");
785  }
786  // TODO: add support for NULL string arrays, replace with addStringArray(),
787  // for now add whatever parseStringArray() outputs for NULLs ("NULL")
788  addStringArray(string_vec);
789  }
790  } else {
791  if (!is_null) {
792  ArrayDatum d = StringToArray(val, ti, copy_params);
793  if (d.is_null) { // val could be "NULL"
794  addArray(NullArray(ti));
795  } else {
796  if (ti.get_size() > 0 && static_cast<size_t>(ti.get_size()) != d.length) {
797  throw std::runtime_error("Fixed length array for column " + cd->columnName +
798  " has incorrect length: " + val);
799  }
800  addArray(d);
801  }
802  } else {
803  addArray(NullArray(ti));
804  }
805  }
806  break;
807  }
808  case kPOINT:
809  case kLINESTRING:
810  case kPOLYGON:
811  case kMULTIPOLYGON:
812  addGeoString(val);
813  break;
814  default:
815  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
816  }
817 }
818 
820  const auto type = column_desc_->columnType.is_decimal()
821  ? decimal_to_int_type(column_desc_->columnType)
822  : column_desc_->columnType.get_type();
823  switch (type) {
824  case kBOOLEAN:
825  bool_buffer_->pop_back();
826  break;
827  case kTINYINT:
828  tinyint_buffer_->pop_back();
829  break;
830  case kSMALLINT:
831  smallint_buffer_->pop_back();
832  break;
833  case kINT:
834  int_buffer_->pop_back();
835  break;
836  case kBIGINT:
837  bigint_buffer_->pop_back();
838  break;
839  case kFLOAT:
840  float_buffer_->pop_back();
841  break;
842  case kDOUBLE:
843  double_buffer_->pop_back();
844  break;
845  case kTEXT:
846  case kVARCHAR:
847  case kCHAR:
848  string_buffer_->pop_back();
849  break;
850  case kDATE:
851  case kTIME:
852  case kTIMESTAMP:
853  bigint_buffer_->pop_back();
854  break;
855  case kARRAY:
856  if (IS_STRING(column_desc_->columnType.get_subtype())) {
857  string_array_buffer_->pop_back();
858  } else {
859  array_buffer_->pop_back();
860  }
861  break;
862  case kPOINT:
863  case kLINESTRING:
864  case kPOLYGON:
865  case kMULTIPOLYGON:
866  geo_string_buffer_->pop_back();
867  break;
868  default:
869  CHECK(false) << "TypedImportBuffer::pop_value() does not support type " << type;
870  }
871 }
872 
873 struct GeoImportException : std::runtime_error {
874  using std::runtime_error::runtime_error;
875 };
876 
877 // appends (streams) a slice of Arrow array of values (RHS) to TypedImportBuffer (LHS)
878 template <typename DATA_TYPE>
880  const ColumnDescriptor* cd,
881  const Array& array,
882  std::vector<DATA_TYPE>& buffer,
883  const ArraySliceRange& slice_range,
884  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
885  auto data =
886  std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
887  auto f_value_getter = value_getter(array, cd, bad_rows_tracker);
888  std::function<void(const int64_t)> f_add_geo_phy_cols = [&](const int64_t row) {};
889  if (bad_rows_tracker && cd->columnType.is_geometry()) {
890  f_add_geo_phy_cols = [&](const int64_t row) {
891  // Populate physical columns (ref. MapDHandler::load_table)
892  std::vector<double> coords, bounds;
893  std::vector<int> ring_sizes, poly_rings;
894  int render_group = 0;
895  SQLTypeInfo ti;
896  // replace any unexpected exception from getGeoColumns or other
897  // on this path with a GeoImportException so that we wont over
898  // push a null to the logical column...
899  try {
900  arrow_throw_if<GeoImportException>(
901  array.IsNull(row) ||
902  !Geo_namespace::GeoTypesFactory::getGeoColumns(geo_string_buffer_->back(),
903  ti,
904  coords,
905  bounds,
906  ring_sizes,
907  poly_rings,
908  false),
909  error_context(cd, bad_rows_tracker) + "Invalid geometry");
910  arrow_throw_if<GeoImportException>(
911  cd->columnType.get_type() != ti.get_type(),
912  error_context(cd, bad_rows_tracker) + "Geometry type mismatch");
913  auto col_idx_workpad = col_idx; // what a pitfall!!
915  bad_rows_tracker->importer->getCatalog(),
916  cd,
917  *import_buffers,
918  col_idx_workpad,
919  coords,
920  bounds,
921  ring_sizes,
922  poly_rings,
923  render_group);
924  } catch (GeoImportException&) {
925  throw;
926  } catch (std::runtime_error& e) {
927  throw GeoImportException(e.what());
928  } catch (const std::exception& e) {
929  throw GeoImportException(e.what());
930  } catch (...) {
931  throw GeoImportException("unknown exception");
932  }
933  };
934  }
935  auto f_mark_a_bad_row = [&](const auto row) {
936  std::unique_lock<std::mutex> lck(bad_rows_tracker->mutex);
937  bad_rows_tracker->rows.insert(row - slice_range.first);
938  };
939  buffer.reserve(slice_range.second - slice_range.first);
940  for (size_t row = slice_range.first; row < slice_range.second; ++row) {
941  try {
942  *data << (array.IsNull(row) ? nullptr : f_value_getter(array, row));
943  f_add_geo_phy_cols(row);
944  } catch (GeoImportException&) {
945  f_mark_a_bad_row(row);
946  } catch (ArrowImporterException&) {
947  // trace bad rows of each column; otherwise rethrow.
948  if (bad_rows_tracker) {
949  *data << nullptr;
950  f_mark_a_bad_row(row);
951  } else {
952  throw;
953  }
954  }
955  }
956  return buffer.size();
957 }
958 
960  const Array& col,
961  const bool exact_type_match,
962  const ArraySliceRange& slice_range,
963  BadRowsTracker* const bad_rows_tracker) {
964  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
965  : cd->columnType.get_type();
966  if (cd->columnType.get_notnull()) {
967  // We can't have any null values for this column; to have them is an error
968  arrow_throw_if(col.null_count() > 0, "NULL not allowed for column " + cd->columnName);
969  }
970 
971  switch (type) {
972  case kBOOLEAN:
973  if (exact_type_match) {
974  arrow_throw_if(col.type_id() != Type::BOOL, "Expected boolean type");
975  }
976  return convert_arrow_val_to_import_buffer(
977  cd, col, *bool_buffer_, slice_range, bad_rows_tracker);
978  case kTINYINT:
979  if (exact_type_match) {
980  arrow_throw_if(col.type_id() != Type::INT8, "Expected int8 type");
981  }
982  return convert_arrow_val_to_import_buffer(
983  cd, col, *tinyint_buffer_, slice_range, bad_rows_tracker);
984  case kSMALLINT:
985  if (exact_type_match) {
986  arrow_throw_if(col.type_id() != Type::INT16, "Expected int16 type");
987  }
988  return convert_arrow_val_to_import_buffer(
989  cd, col, *smallint_buffer_, slice_range, bad_rows_tracker);
990  case kINT:
991  if (exact_type_match) {
992  arrow_throw_if(col.type_id() != Type::INT32, "Expected int32 type");
993  }
994  return convert_arrow_val_to_import_buffer(
995  cd, col, *int_buffer_, slice_range, bad_rows_tracker);
996  case kBIGINT:
997  if (exact_type_match) {
998  arrow_throw_if(col.type_id() != Type::INT64, "Expected int64 type");
999  }
1000  return convert_arrow_val_to_import_buffer(
1001  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
1002  case kFLOAT:
1003  if (exact_type_match) {
1004  arrow_throw_if(col.type_id() != Type::FLOAT, "Expected float type");
1005  }
1006  return convert_arrow_val_to_import_buffer(
1007  cd, col, *float_buffer_, slice_range, bad_rows_tracker);
1008  case kDOUBLE:
1009  if (exact_type_match) {
1010  arrow_throw_if(col.type_id() != Type::DOUBLE, "Expected double type");
1011  }
1012  return convert_arrow_val_to_import_buffer(
1013  cd, col, *double_buffer_, slice_range, bad_rows_tracker);
1014  case kTEXT:
1015  case kVARCHAR:
1016  case kCHAR:
1017  if (exact_type_match) {
1018  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
1019  "Expected string type");
1020  }
1021  return convert_arrow_val_to_import_buffer(
1022  cd, col, *string_buffer_, slice_range, bad_rows_tracker);
1023  case kTIME:
1024  if (exact_type_match) {
1025  arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
1026  "Expected time32 or time64 type");
1027  }
1028  return convert_arrow_val_to_import_buffer(
1029  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
1030  case kTIMESTAMP:
1031  if (exact_type_match) {
1032  arrow_throw_if(col.type_id() != Type::TIMESTAMP, "Expected timestamp type");
1033  }
1034  return convert_arrow_val_to_import_buffer(
1035  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
1036  case kDATE:
1037  if (exact_type_match) {
1038  arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
1039  "Expected date32 or date64 type");
1040  }
1041  return convert_arrow_val_to_import_buffer(
1042  cd, col, *bigint_buffer_, slice_range, bad_rows_tracker);
1043  case kPOINT:
1044  case kLINESTRING:
1045  case kPOLYGON:
1046  case kMULTIPOLYGON:
1047  arrow_throw_if(col.type_id() != Type::BINARY && col.type_id() != Type::STRING,
1048  "Expected string type");
1049  return convert_arrow_val_to_import_buffer(
1050  cd, col, *geo_string_buffer_, slice_range, bad_rows_tracker);
1051  case kARRAY:
1052  throw std::runtime_error("Arrow array appends not yet supported");
1053  default:
1054  throw std::runtime_error("Invalid Type");
1055  }
1056 }
1057 
1058 // this is exclusively used by load_table_binary_columnar
1059 size_t TypedImportBuffer::add_values(const ColumnDescriptor* cd, const TColumn& col) {
1060  size_t dataSize = 0;
1061  if (cd->columnType.get_notnull()) {
1062  // We can't have any null values for this column; to have them is an error
1063  if (std::any_of(col.nulls.begin(), col.nulls.end(), [](int i) { return i != 0; })) {
1064  throw std::runtime_error("NULL for column " + cd->columnName);
1065  }
1066  }
1067 
1068  switch (cd->columnType.get_type()) {
1069  case kBOOLEAN: {
1070  dataSize = col.data.int_col.size();
1071  bool_buffer_->reserve(dataSize);
1072  for (size_t i = 0; i < dataSize; i++) {
1073  if (col.nulls[i]) {
1074  bool_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1075  } else {
1076  bool_buffer_->push_back((int8_t)col.data.int_col[i]);
1077  }
1078  }
1079  break;
1080  }
1081  case kTINYINT: {
1082  dataSize = col.data.int_col.size();
1083  tinyint_buffer_->reserve(dataSize);
1084  for (size_t i = 0; i < dataSize; i++) {
1085  if (col.nulls[i]) {
1086  tinyint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1087  } else {
1088  tinyint_buffer_->push_back((int8_t)col.data.int_col[i]);
1089  }
1090  }
1091  break;
1092  }
1093  case kSMALLINT: {
1094  dataSize = col.data.int_col.size();
1095  smallint_buffer_->reserve(dataSize);
1096  for (size_t i = 0; i < dataSize; i++) {
1097  if (col.nulls[i]) {
1098  smallint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1099  } else {
1100  smallint_buffer_->push_back((int16_t)col.data.int_col[i]);
1101  }
1102  }
1103  break;
1104  }
1105  case kINT: {
1106  dataSize = col.data.int_col.size();
1107  int_buffer_->reserve(dataSize);
1108  for (size_t i = 0; i < dataSize; i++) {
1109  if (col.nulls[i]) {
1110  int_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1111  } else {
1112  int_buffer_->push_back((int32_t)col.data.int_col[i]);
1113  }
1114  }
1115  break;
1116  }
1117  case kBIGINT:
1118  case kNUMERIC:
1119  case kDECIMAL: {
1120  dataSize = col.data.int_col.size();
1121  bigint_buffer_->reserve(dataSize);
1122  for (size_t i = 0; i < dataSize; i++) {
1123  if (col.nulls[i]) {
1124  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1125  } else {
1126  bigint_buffer_->push_back((int64_t)col.data.int_col[i]);
1127  }
1128  }
1129  break;
1130  }
1131  case kFLOAT: {
1132  dataSize = col.data.real_col.size();
1133  float_buffer_->reserve(dataSize);
1134  for (size_t i = 0; i < dataSize; i++) {
1135  if (col.nulls[i]) {
1136  float_buffer_->push_back(NULL_FLOAT);
1137  } else {
1138  float_buffer_->push_back((float)col.data.real_col[i]);
1139  }
1140  }
1141  break;
1142  }
1143  case kDOUBLE: {
1144  dataSize = col.data.real_col.size();
1145  double_buffer_->reserve(dataSize);
1146  for (size_t i = 0; i < dataSize; i++) {
1147  if (col.nulls[i]) {
1148  double_buffer_->push_back(NULL_DOUBLE);
1149  } else {
1150  double_buffer_->push_back((double)col.data.real_col[i]);
1151  }
1152  }
1153  break;
1154  }
1155  case kTEXT:
1156  case kVARCHAR:
1157  case kCHAR: {
1158  // TODO: for now, use empty string for nulls
1159  dataSize = col.data.str_col.size();
1160  string_buffer_->reserve(dataSize);
1161  for (size_t i = 0; i < dataSize; i++) {
1162  if (col.nulls[i]) {
1163  string_buffer_->push_back(std::string());
1164  } else {
1165  string_buffer_->push_back(col.data.str_col[i]);
1166  }
1167  }
1168  break;
1169  }
1170  case kTIME:
1171  case kTIMESTAMP:
1172  case kDATE: {
1173  dataSize = col.data.int_col.size();
1174  bigint_buffer_->reserve(dataSize);
1175  for (size_t i = 0; i < dataSize; i++) {
1176  if (col.nulls[i]) {
1177  bigint_buffer_->push_back(inline_fixed_encoding_null_val(cd->columnType));
1178  } else {
1179  bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1180  }
1181  }
1182  break;
1183  }
1184  case kPOINT:
1185  case kLINESTRING:
1186  case kPOLYGON:
1187  case kMULTIPOLYGON: {
1188  dataSize = col.data.str_col.size();
1189  geo_string_buffer_->reserve(dataSize);
1190  for (size_t i = 0; i < dataSize; i++) {
1191  if (col.nulls[i]) {
1192  // TODO: add support for NULL geo
1193  geo_string_buffer_->push_back(std::string());
1194  } else {
1195  geo_string_buffer_->push_back(col.data.str_col[i]);
1196  }
1197  }
1198  break;
1199  }
1200  case kARRAY: {
1201  dataSize = col.data.arr_col.size();
1202  if (IS_STRING(cd->columnType.get_subtype())) {
1203  for (size_t i = 0; i < dataSize; i++) {
1204  std::vector<std::string>& string_vec = addStringArray();
1205  if (!col.nulls[i]) {
1206  size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1207  for (size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1208  string_vec.push_back(col.data.arr_col[i].data.str_col[str_idx]);
1209  }
1210  }
1211  }
1212  } else {
1213  auto elem_ti = cd->columnType.get_subtype();
1214  switch (elem_ti) {
1215  case kBOOLEAN: {
1216  for (size_t i = 0; i < dataSize; i++) {
1217  if (col.nulls[i]) {
1218  addArray(NullArray(cd->columnType));
1219  } else {
1220  size_t len = col.data.arr_col[i].data.int_col.size();
1221  size_t byteSize = len * sizeof(int8_t);
1222  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1223  int8_t* p = buf;
1224  for (size_t j = 0; j < len; ++j) {
1225  *(bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1226  p += sizeof(bool);
1227  }
1228  addArray(ArrayDatum(byteSize, buf, false));
1229  }
1230  }
1231  break;
1232  }
1233  case kTINYINT: {
1234  for (size_t i = 0; i < dataSize; i++) {
1235  if (col.nulls[i]) {
1236  addArray(NullArray(cd->columnType));
1237  } else {
1238  size_t len = col.data.arr_col[i].data.int_col.size();
1239  size_t byteSize = len * sizeof(int8_t);
1240  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1241  int8_t* p = buf;
1242  for (size_t j = 0; j < len; ++j) {
1243  *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1244  p += sizeof(int8_t);
1245  }
1246  addArray(ArrayDatum(byteSize, buf, false));
1247  }
1248  }
1249  break;
1250  }
1251  case kSMALLINT: {
1252  for (size_t i = 0; i < dataSize; i++) {
1253  if (col.nulls[i]) {
1254  addArray(NullArray(cd->columnType));
1255  } else {
1256  size_t len = col.data.arr_col[i].data.int_col.size();
1257  size_t byteSize = len * sizeof(int16_t);
1258  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1259  int8_t* p = buf;
1260  for (size_t j = 0; j < len; ++j) {
1261  *(int16_t*)p =
1262  static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1263  p += sizeof(int16_t);
1264  }
1265  addArray(ArrayDatum(byteSize, buf, false));
1266  }
1267  }
1268  break;
1269  }
1270  case kINT: {
1271  for (size_t i = 0; i < dataSize; i++) {
1272  if (col.nulls[i]) {
1273  addArray(NullArray(cd->columnType));
1274  } else {
1275  size_t len = col.data.arr_col[i].data.int_col.size();
1276  size_t byteSize = len * sizeof(int32_t);
1277  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1278  int8_t* p = buf;
1279  for (size_t j = 0; j < len; ++j) {
1280  *(int32_t*)p =
1281  static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1282  p += sizeof(int32_t);
1283  }
1284  addArray(ArrayDatum(byteSize, buf, false));
1285  }
1286  }
1287  break;
1288  }
1289  case kBIGINT:
1290  case kNUMERIC:
1291  case kDECIMAL: {
1292  for (size_t i = 0; i < dataSize; i++) {
1293  if (col.nulls[i]) {
1294  addArray(NullArray(cd->columnType));
1295  } else {
1296  size_t len = col.data.arr_col[i].data.int_col.size();
1297  size_t byteSize = len * sizeof(int64_t);
1298  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1299  int8_t* p = buf;
1300  for (size_t j = 0; j < len; ++j) {
1301  *(int64_t*)p =
1302  static_cast<int64_t>(col.data.arr_col[j].data.int_col[j]);
1303  p += sizeof(int64_t);
1304  }
1305  addArray(ArrayDatum(byteSize, buf, false));
1306  }
1307  }
1308  break;
1309  }
1310  case kFLOAT: {
1311  for (size_t i = 0; i < dataSize; i++) {
1312  if (col.nulls[i]) {
1313  addArray(NullArray(cd->columnType));
1314  } else {
1315  size_t len = col.data.arr_col[i].data.real_col.size();
1316  size_t byteSize = len * sizeof(float);
1317  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1318  int8_t* p = buf;
1319  for (size_t j = 0; j < len; ++j) {
1320  *(float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1321  p += sizeof(float);
1322  }
1323  addArray(ArrayDatum(byteSize, buf, false));
1324  }
1325  }
1326  break;
1327  }
1328  case kDOUBLE: {
1329  for (size_t i = 0; i < dataSize; i++) {
1330  if (col.nulls[i]) {
1331  addArray(NullArray(cd->columnType));
1332  } else {
1333  size_t len = col.data.arr_col[i].data.real_col.size();
1334  size_t byteSize = len * sizeof(double);
1335  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1336  int8_t* p = buf;
1337  for (size_t j = 0; j < len; ++j) {
1338  *(double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1339  p += sizeof(double);
1340  }
1341  addArray(ArrayDatum(byteSize, buf, false));
1342  }
1343  }
1344  break;
1345  }
1346  case kTIME:
1347  case kTIMESTAMP:
1348  case kDATE: {
1349  for (size_t i = 0; i < dataSize; i++) {
1350  if (col.nulls[i]) {
1351  addArray(NullArray(cd->columnType));
1352  } else {
1353  size_t len = col.data.arr_col[i].data.int_col.size();
1354  size_t byteWidth = sizeof(int64_t);
1355  size_t byteSize = len * byteWidth;
1356  int8_t* buf = (int8_t*)checked_malloc(len * byteSize);
1357  int8_t* p = buf;
1358  for (size_t j = 0; j < len; ++j) {
1359  *reinterpret_cast<int64_t*>(p) =
1360  static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1361  p += sizeof(int64_t);
1362  }
1363  addArray(ArrayDatum(byteSize, buf, false));
1364  }
1365  }
1366  break;
1367  }
1368  default:
1369  throw std::runtime_error("Invalid Array Type");
1370  }
1371  }
1372  break;
1373  }
1374  default:
1375  throw std::runtime_error("Invalid Type");
1376  }
1377  return dataSize;
1378 }
1379 
1381  const TDatum& datum,
1382  const bool is_null,
1383  const int64_t replicate_count) {
1384  set_replicate_count(replicate_count);
1385  const auto type = cd->columnType.is_decimal() ? decimal_to_int_type(cd->columnType)
1386  : cd->columnType.get_type();
1387  switch (type) {
1388  case kBOOLEAN: {
1389  if (is_null) {
1390  if (cd->columnType.get_notnull()) {
1391  throw std::runtime_error("NULL for column " + cd->columnName);
1392  }
1393  addBoolean(inline_fixed_encoding_null_val(cd->columnType));
1394  } else {
1395  addBoolean((int8_t)datum.val.int_val);
1396  }
1397  break;
1398  }
1399  case kTINYINT:
1400  if (!is_null) {
1401  addTinyint((int8_t)datum.val.int_val);
1402  } else {
1403  if (cd->columnType.get_notnull()) {
1404  throw std::runtime_error("NULL for column " + cd->columnName);
1405  }
1406  addTinyint(inline_fixed_encoding_null_val(cd->columnType));
1407  }
1408  break;
1409  case kSMALLINT:
1410  if (!is_null) {
1411  addSmallint((int16_t)datum.val.int_val);
1412  } else {
1413  if (cd->columnType.get_notnull()) {
1414  throw std::runtime_error("NULL for column " + cd->columnName);
1415  }
1416  addSmallint(inline_fixed_encoding_null_val(cd->columnType));
1417  }
1418  break;
1419  case kINT:
1420  if (!is_null) {
1421  addInt((int32_t)datum.val.int_val);
1422  } else {
1423  if (cd->columnType.get_notnull()) {
1424  throw std::runtime_error("NULL for column " + cd->columnName);
1425  }
1427  }
1428  break;
1429  case kBIGINT:
1430  if (!is_null) {
1431  addBigint(datum.val.int_val);
1432  } else {
1433  if (cd->columnType.get_notnull()) {
1434  throw std::runtime_error("NULL for column " + cd->columnName);
1435  }
1437  }
1438  break;
1439  case kFLOAT:
1440  if (!is_null) {
1441  addFloat((float)datum.val.real_val);
1442  } else {
1443  if (cd->columnType.get_notnull()) {
1444  throw std::runtime_error("NULL for column " + cd->columnName);
1445  }
1446  addFloat(NULL_FLOAT);
1447  }
1448  break;
1449  case kDOUBLE:
1450  if (!is_null) {
1451  addDouble(datum.val.real_val);
1452  } else {
1453  if (cd->columnType.get_notnull()) {
1454  throw std::runtime_error("NULL for column " + cd->columnName);
1455  }
1456  addDouble(NULL_DOUBLE);
1457  }
1458  break;
1459  case kTEXT:
1460  case kVARCHAR:
1461  case kCHAR: {
1462  // @TODO(wei) for now, use empty string for nulls
1463  if (is_null) {
1464  if (cd->columnType.get_notnull()) {
1465  throw std::runtime_error("NULL for column " + cd->columnName);
1466  }
1467  addString(std::string());
1468  } else {
1469  addString(datum.val.str_val);
1470  }
1471  break;
1472  }
1473  case kTIME:
1474  case kTIMESTAMP:
1475  case kDATE: {
1476  if (!is_null) {
1477  addBigint(datum.val.int_val);
1478  } else {
1479  if (cd->columnType.get_notnull()) {
1480  throw std::runtime_error("NULL for column " + cd->columnName);
1481  }
1483  }
1484  break;
1485  }
1486  case kARRAY:
1487  if (is_null && cd->columnType.get_notnull()) {
1488  throw std::runtime_error("NULL for column " + cd->columnName);
1489  }
1490  if (IS_STRING(cd->columnType.get_subtype())) {
1491  std::vector<std::string>& string_vec = addStringArray();
1492  addBinaryStringArray(datum, string_vec);
1493  } else {
1494  if (!is_null) {
1495  addArray(TDatumToArrayDatum(datum, cd->columnType));
1496  } else {
1497  addArray(NullArray(cd->columnType));
1498  }
1499  }
1500  break;
1501  case kPOINT:
1502  case kLINESTRING:
1503  case kPOLYGON:
1504  case kMULTIPOLYGON:
1505  if (is_null) {
1506  if (cd->columnType.get_notnull()) {
1507  throw std::runtime_error("NULL for column " + cd->columnName);
1508  }
1509  addGeoString(std::string());
1510  } else {
1511  addGeoString(datum.val.str_val);
1512  }
1513  break;
1514  default:
1515  CHECK(false) << "TypedImportBuffer::add_value() does not support type " << type;
1516  }
1517 }
1518 
1519 bool importGeoFromLonLat(double lon, double lat, std::vector<double>& coords) {
1520  if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1521  return false;
1522  }
1523  auto point = new OGRPoint(lon, lat);
1524  // NOTE(adb): Use OGRSpatialReferenceUqPtr to ensure proper deletion
1525  // auto poSR0 = new OGRSpatialReference();
1526  // poSR0->importFromEPSG(4326);
1527  // point->assignSpatialReference(poSR0);
1528 
1529  // auto poSR = new OGRSpatialReference();
1530  // poSR->importFromEPSG(3857);
1531  // point->transformTo(poSR);
1532 
1533  coords.push_back(point->getX());
1534  coords.push_back(point->getY());
1535  return true;
1536 }
1537 
1538 uint64_t compress_coord(double coord, const SQLTypeInfo& ti, bool x) {
1539  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1542  }
1543  return *reinterpret_cast<uint64_t*>(may_alias_ptr(&coord));
1544 }
1545 
1546 std::vector<uint8_t> compress_coords(std::vector<double>& coords, const SQLTypeInfo& ti) {
1547  std::vector<uint8_t> compressed_coords;
1548  bool x = true;
1549  for (auto coord : coords) {
1550  auto coord_data_ptr = reinterpret_cast<uint64_t*>(&coord);
1551  uint64_t coord_data = *coord_data_ptr;
1552  size_t coord_data_size = sizeof(double);
1553 
1554  if (ti.get_output_srid() == 4326) {
1555  if (x) {
1556  if (coord < -180.0 || coord > 180.0) {
1557  throw std::runtime_error("WGS84 longitude " + std::to_string(coord) +
1558  " is out of bounds");
1559  }
1560  } else {
1561  if (coord < -90.0 || coord > 90.0) {
1562  throw std::runtime_error("WGS84 latitude " + std::to_string(coord) +
1563  " is out of bounds");
1564  }
1565  }
1566  if (ti.get_compression() == kENCODING_GEOINT && ti.get_comp_param() == 32) {
1567  coord_data = compress_coord(coord, ti, x);
1568  coord_data_size = ti.get_comp_param() / 8;
1569  }
1570  x = !x;
1571  }
1572 
1573  for (size_t i = 0; i < coord_data_size; i++) {
1574  compressed_coords.push_back(coord_data & 0xFF);
1575  coord_data >>= 8;
1576  }
1577  }
1578  return compressed_coords;
1579 }
1580 
1582  const Catalog_Namespace::Catalog& catalog,
1583  const ColumnDescriptor* cd,
1584  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1585  size_t& col_idx,
1586  std::vector<double>& coords,
1587  std::vector<double>& bounds,
1588  std::vector<int>& ring_sizes,
1589  std::vector<int>& poly_rings,
1590  int render_group,
1591  const int64_t replicate_count) {
1592  const auto col_ti = cd->columnType;
1593  const auto col_type = col_ti.get_type();
1594  auto columnId = cd->columnId;
1595  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1596  std::vector<TDatum> td_coords_data;
1597  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1598  for (auto cc : compressed_coords) {
1599  TDatum td_byte;
1600  td_byte.val.int_val = cc;
1601  td_coords_data.push_back(td_byte);
1602  }
1603  TDatum tdd_coords;
1604  tdd_coords.val.arr_val = td_coords_data;
1605  tdd_coords.is_null = false;
1606  import_buffers[col_idx++]->add_value(cd_coords, tdd_coords, false, replicate_count);
1607 
1608  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1609  // Create ring_sizes array value and add it to the physical column
1610  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1611  std::vector<TDatum> td_ring_sizes;
1612  for (auto ring_size : ring_sizes) {
1613  TDatum td_ring_size;
1614  td_ring_size.val.int_val = ring_size;
1615  td_ring_sizes.push_back(td_ring_size);
1616  }
1617  TDatum tdd_ring_sizes;
1618  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1619  tdd_ring_sizes.is_null = false;
1620  import_buffers[col_idx++]->add_value(
1621  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1622  }
1623 
1624  if (col_type == kMULTIPOLYGON) {
1625  // Create poly_rings array value and add it to the physical column
1626  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1627  std::vector<TDatum> td_poly_rings;
1628  for (auto num_rings : poly_rings) {
1629  TDatum td_num_rings;
1630  td_num_rings.val.int_val = num_rings;
1631  td_poly_rings.push_back(td_num_rings);
1632  }
1633  TDatum tdd_poly_rings;
1634  tdd_poly_rings.val.arr_val = td_poly_rings;
1635  tdd_poly_rings.is_null = false;
1636  import_buffers[col_idx++]->add_value(
1637  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1638  }
1639 
1640  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1641  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1642  std::vector<TDatum> td_bounds_data;
1643  for (auto b : bounds) {
1644  TDatum td_double;
1645  td_double.val.real_val = b;
1646  td_bounds_data.push_back(td_double);
1647  }
1648  TDatum tdd_bounds;
1649  tdd_bounds.val.arr_val = td_bounds_data;
1650  tdd_bounds.is_null = false;
1651  import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1652  }
1653 
1654  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1655  // Create render_group value and add it to the physical column
1656  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1657  TDatum td_render_group;
1658  td_render_group.val.int_val = render_group;
1659  td_render_group.is_null = false;
1660  import_buffers[col_idx++]->add_value(
1661  cd_render_group, td_render_group, false, replicate_count);
1662  }
1663 }
1664 
1666  const Catalog_Namespace::Catalog& catalog,
1667  const ColumnDescriptor* cd,
1668  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1669  size_t& col_idx,
1670  std::vector<std::vector<double>>& coords_column,
1671  std::vector<std::vector<double>>& bounds_column,
1672  std::vector<std::vector<int>>& ring_sizes_column,
1673  std::vector<std::vector<int>>& poly_rings_column,
1674  int render_group,
1675  const int64_t replicate_count) {
1676  const auto col_ti = cd->columnType;
1677  const auto col_type = col_ti.get_type();
1678  auto columnId = cd->columnId;
1679 
1680  auto coords_row_count = coords_column.size();
1681  auto cd_coords = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1682  for (auto coords : coords_column) {
1683  std::vector<TDatum> td_coords_data;
1684  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
1685  for (auto cc : compressed_coords) {
1686  TDatum td_byte;
1687  td_byte.val.int_val = cc;
1688  td_coords_data.push_back(td_byte);
1689  }
1690  TDatum tdd_coords;
1691  tdd_coords.val.arr_val = td_coords_data;
1692  tdd_coords.is_null = false;
1693  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false, replicate_count);
1694  }
1695  col_idx++;
1696 
1697  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1698  if (ring_sizes_column.size() != coords_row_count) {
1699  CHECK(false) << "Geometry import columnar: ring sizes column size mismatch";
1700  }
1701  // Create ring_sizes array value and add it to the physical column
1702  auto cd_ring_sizes = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1703  for (auto ring_sizes : ring_sizes_column) {
1704  std::vector<TDatum> td_ring_sizes;
1705  for (auto ring_size : ring_sizes) {
1706  TDatum td_ring_size;
1707  td_ring_size.val.int_val = ring_size;
1708  td_ring_sizes.push_back(td_ring_size);
1709  }
1710  TDatum tdd_ring_sizes;
1711  tdd_ring_sizes.val.arr_val = td_ring_sizes;
1712  tdd_ring_sizes.is_null = false;
1713  import_buffers[col_idx]->add_value(
1714  cd_ring_sizes, tdd_ring_sizes, false, replicate_count);
1715  }
1716  col_idx++;
1717  }
1718 
1719  if (col_type == kMULTIPOLYGON) {
1720  if (poly_rings_column.size() != coords_row_count) {
1721  CHECK(false) << "Geometry import columnar: poly rings column size mismatch";
1722  }
1723  // Create poly_rings array value and add it to the physical column
1724  auto cd_poly_rings = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1725  for (auto poly_rings : poly_rings_column) {
1726  std::vector<TDatum> td_poly_rings;
1727  for (auto num_rings : poly_rings) {
1728  TDatum td_num_rings;
1729  td_num_rings.val.int_val = num_rings;
1730  td_poly_rings.push_back(td_num_rings);
1731  }
1732  TDatum tdd_poly_rings;
1733  tdd_poly_rings.val.arr_val = td_poly_rings;
1734  tdd_poly_rings.is_null = false;
1735  import_buffers[col_idx]->add_value(
1736  cd_poly_rings, tdd_poly_rings, false, replicate_count);
1737  }
1738  col_idx++;
1739  }
1740 
1741  if (col_type == kLINESTRING || col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1742  if (bounds_column.size() != coords_row_count) {
1743  CHECK(false) << "Geometry import columnar: bounds column size mismatch";
1744  }
1745  auto cd_bounds = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1746  for (auto bounds : bounds_column) {
1747  std::vector<TDatum> td_bounds_data;
1748  for (auto b : bounds) {
1749  TDatum td_double;
1750  td_double.val.real_val = b;
1751  td_bounds_data.push_back(td_double);
1752  }
1753  TDatum tdd_bounds;
1754  tdd_bounds.val.arr_val = td_bounds_data;
1755  tdd_bounds.is_null = false;
1756  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false, replicate_count);
1757  }
1758  col_idx++;
1759  }
1760 
1761  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1762  // Create render_group value and add it to the physical column
1763  auto cd_render_group = catalog.getMetadataForColumn(cd->tableId, ++columnId);
1764  TDatum td_render_group;
1765  td_render_group.val.int_val = render_group;
1766  td_render_group.is_null = false;
1767  for (decltype(coords_row_count) i = 0; i < coords_row_count; i++) {
1768  import_buffers[col_idx]->add_value(
1769  cd_render_group, td_render_group, false, replicate_count);
1770  }
1771  col_idx++;
1772  }
1773 }
1774 
1776  int thread_id,
1777  Importer* importer,
1778  std::unique_ptr<char[]> scratch_buffer,
1779  size_t begin_pos,
1780  size_t end_pos,
1781  size_t total_size,
1782  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap,
1783  size_t first_row_index_this_buffer) {
1785  int64_t total_get_row_time_us = 0;
1786  int64_t total_str_to_val_time_us = 0;
1787  CHECK(scratch_buffer);
1788  auto buffer = scratch_buffer.get();
1789  auto load_ms = measure<>::execution([]() {});
1790  auto ms = measure<>::execution([&]() {
1791  const CopyParams& copy_params = importer->get_copy_params();
1792  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
1793  size_t begin = find_beginning(buffer, begin_pos, end_pos, copy_params);
1794  const char* thread_buf = buffer + begin_pos + begin;
1795  const char* thread_buf_end = buffer + end_pos;
1796  const char* buf_end = buffer + total_size;
1797  bool try_single_thread = false;
1798  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
1799  importer->get_import_buffers(thread_id);
1801  int phys_cols = 0;
1802  int point_cols = 0;
1803  for (const auto cd : col_descs) {
1804  const auto& col_ti = cd->columnType;
1805  phys_cols += col_ti.get_physical_cols();
1806  if (cd->columnType.get_type() == kPOINT) {
1807  point_cols++;
1808  }
1809  }
1810  auto num_cols = col_descs.size() - phys_cols;
1811  for (const auto& p : import_buffers) {
1812  p->clear();
1813  }
1814  std::vector<std::string> row;
1815  size_t row_index_plus_one = 0;
1816  for (const char* p = thread_buf; p < thread_buf_end; p++) {
1817  row.clear();
1818  if (DEBUG_TIMING) {
1820  p = get_row(p,
1821  thread_buf_end,
1822  buf_end,
1823  copy_params,
1824  p == thread_buf,
1825  importer->get_is_array(),
1826  row,
1827  try_single_thread);
1828  });
1829  total_get_row_time_us += us;
1830  } else {
1831  p = get_row(p,
1832  thread_buf_end,
1833  buf_end,
1834  copy_params,
1835  p == thread_buf,
1836  importer->get_is_array(),
1837  row,
1838  try_single_thread);
1839  }
1840  row_index_plus_one++;
1841  // Each POINT could consume two separate coords instead of a single WKT
1842  if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
1843  import_status.rows_rejected++;
1844  LOG(ERROR) << "Incorrect Row (expected " << num_cols << " columns, has "
1845  << row.size() << "): " << row;
1846  if (import_status.rows_rejected > copy_params.max_reject) {
1847  break;
1848  }
1849  continue;
1850  }
1852  size_t import_idx = 0;
1853  size_t col_idx = 0;
1854  try {
1855  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1856  auto cd = *cd_it;
1857  const auto& col_ti = cd->columnType;
1858  if (col_ti.get_physical_cols() == 0) {
1859  // not geo
1860 
1861  // store the string (possibly null)
1862  bool is_null =
1863  (row[import_idx] == copy_params.null_str || row[import_idx] == "NULL");
1864  // Note: default copy_params.null_str is "\N", but everyone uses "NULL".
1865  // So initially nullness may be missed and not passed to add_value,
1866  // which then might also check and still decide it's actually a NULL, e.g.
1867  // if kINT doesn't start with a digit or a '-' then it's considered NULL.
1868  // So "NULL" is not recognized as NULL but then it's not recognized as
1869  // a valid kINT, so it's a NULL after all.
1870  // Checking for "NULL" here too, as a widely accepted notation for NULL.
1871  if (!cd->columnType.is_string() && row[import_idx].empty()) {
1872  is_null = true;
1873  }
1874  import_buffers[col_idx]->add_value(
1875  cd, row[import_idx], is_null, copy_params);
1876 
1877  // next
1878  ++import_idx;
1879  ++col_idx;
1880  } else {
1881  // geo
1882 
1883  // store null string in the base column
1884  import_buffers[col_idx]->add_value(
1885  cd, copy_params.null_str, true, copy_params);
1886 
1887  // WKT from string we're not storing
1888  std::string wkt{row[import_idx]};
1889 
1890  // next
1891  ++import_idx;
1892  ++col_idx;
1893 
1894  SQLTypes col_type = col_ti.get_type();
1895  CHECK(IS_GEO(col_type));
1896 
1897  std::vector<double> coords;
1898  std::vector<double> bounds;
1899  std::vector<int> ring_sizes;
1900  std::vector<int> poly_rings;
1901  int render_group = 0;
1902 
1903  if (col_type == kPOINT && wkt.size() > 0 &&
1904  (wkt[0] == '.' || isdigit(wkt[0]) || wkt[0] == '-')) {
1905  // Invalid WKT, looks more like a scalar.
1906  // Try custom POINT import: from two separate scalars rather than WKT
1907  // string
1908  double lon = std::atof(wkt.c_str());
1909  double lat = NAN;
1910  std::string lat_str{row[import_idx]};
1911  ++import_idx;
1912  if (lat_str.size() > 0 &&
1913  (lat_str[0] == '.' || isdigit(lat_str[0]) || lat_str[0] == '-')) {
1914  lat = std::atof(lat_str.c_str());
1915  }
1916  // Swap coordinates if this table uses a reverse order: lat/lon
1917  if (!copy_params.lonlat) {
1918  std::swap(lat, lon);
1919  }
1920  // TODO: should check if POINT column should have been declared with SRID
1921  // WGS 84, EPSG 4326 ? if (col_ti.get_dimension() != 4326) {
1922  // throw std::runtime_error("POINT column " + cd->columnName + " is not
1923  // WGS84, cannot insert lon/lat");
1924  // }
1925  if (!importGeoFromLonLat(lon, lat, coords)) {
1926  throw std::runtime_error(
1927  "Cannot read lon/lat to insert into POINT column " +
1928  cd->columnName);
1929  }
1930  } else {
1931  // import it
1932  SQLTypeInfo import_ti;
1934  wkt,
1935  import_ti,
1936  coords,
1937  bounds,
1938  ring_sizes,
1939  poly_rings,
1940  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
1941  std::string msg =
1942  "Failed to extract valid geometry from row " +
1943  std::to_string(first_row_index_this_buffer + row_index_plus_one) +
1944  " for column " + cd->columnName;
1945  throw std::runtime_error(msg);
1946  }
1947 
1948  // validate types
1949  if (col_type != import_ti.get_type()) {
1950  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
1951  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
1952  col_type == SQLTypes::kMULTIPOLYGON)) {
1953  throw std::runtime_error(
1954  "Imported geometry doesn't match the type of column " +
1955  cd->columnName);
1956  }
1957  }
1958 
1959  if (columnIdToRenderGroupAnalyzerMap.size()) {
1960  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
1961  if (ring_sizes.size()) {
1962  // get a suitable render group for these poly coords
1963  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
1964  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
1965  render_group =
1966  (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
1967  } else {
1968  // empty poly
1969  render_group = -1;
1970  }
1971  }
1972  }
1973  }
1974 
1976  cd,
1977  import_buffers,
1978  col_idx,
1979  coords,
1980  bounds,
1981  ring_sizes,
1982  poly_rings,
1983  render_group);
1984  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1985  ++cd_it;
1986  }
1987  }
1988  }
1989  import_status.rows_completed++;
1990  } catch (const std::exception& e) {
1991  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
1992  import_buffers[col_idx_to_pop]->pop_value();
1993  }
1994  import_status.rows_rejected++;
1995  LOG(ERROR) << "Input exception thrown: " << e.what()
1996  << ". Row discarded. Data: " << row;
1997  }
1998  });
1999  total_str_to_val_time_us += us;
2000  }
2001  if (import_status.rows_completed > 0) {
2002  load_ms = measure<>::execution(
2003  [&]() { importer->load(import_buffers, import_status.rows_completed); });
2004  }
2005  });
2006  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2007  LOG(INFO) << "Thread" << std::this_thread::get_id() << ":"
2008  << import_status.rows_completed << " rows inserted in "
2009  << (double)ms / 1000.0 << "sec, Insert Time: " << (double)load_ms / 1000.0
2010  << "sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2011  << "sec, str_to_val: " << (double)total_str_to_val_time_us / 1000000.0
2012  << "sec" << std::endl;
2013  }
2014 
2015  import_status.thread_id = thread_id;
2016  // LOG(INFO) << " return " << import_status.thread_id << std::endl;
2017 
2018  return import_status;
2019 }
2020 
2022  int thread_id,
2023  Importer* importer,
2024  OGRSpatialReference* poGeographicSR,
2025  const FeaturePtrVector& features,
2026  size_t firstFeature,
2027  size_t numFeatures,
2028  const FieldNameToIndexMapType& fieldNameToIndexMap,
2029  const ColumnNameToSourceNameMapType& columnNameToSourceNameMap,
2030  const ColumnIdToRenderGroupAnalyzerMapType& columnIdToRenderGroupAnalyzerMap) {
2032  const CopyParams& copy_params = importer->get_copy_params();
2033  const std::list<const ColumnDescriptor*>& col_descs = importer->get_column_descs();
2034  std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2035  importer->get_import_buffers(thread_id);
2036 
2037  for (const auto& p : import_buffers) {
2038  p->clear();
2039  }
2040 
2041  auto convert_timer = timer_start();
2042 
2043  for (size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2044  if (!features[iFeature]) {
2045  continue;
2046  }
2047 
2048  // get this feature's geometry
2049  OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2050  if (pGeometry) {
2051  // for geodatabase, we need to consider features with no geometry
2052  // as we still want to create a table, even if it has no geo column
2053 
2054  // transform it
2055  // avoid GDAL error if not transformable
2056  if (pGeometry->getSpatialReference()) {
2057  pGeometry->transformTo(poGeographicSR);
2058  }
2059  }
2060 
2061  size_t col_idx = 0;
2062  try {
2063  for (auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2064  auto cd = *cd_it;
2065 
2066  // is this a geo column?
2067  const auto& col_ti = cd->columnType;
2068  if (col_ti.is_geometry()) {
2069  // some Shapefiles get us here, but the OGRGeometryRef is null
2070  if (!pGeometry) {
2071  std::string msg = "Geometry feature " +
2072  std::to_string(firstFeature + iFeature + 1) +
2073  " has null GeometryRef";
2074  throw std::runtime_error(msg);
2075  }
2076 
2077  // Note that this assumes there is one and only one geo column in the table.
2078  // Currently, the importer only supports reading a single geospatial feature
2079  // from an input shapefile / geojson file, but this code will need to be
2080  // modified if that changes
2081  SQLTypes col_type = col_ti.get_type();
2082 
2083  // store null string in the base column
2084  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
2085  ++col_idx;
2086 
2087  // the data we now need to extract for the other columns
2088  std::vector<double> coords;
2089  std::vector<double> bounds;
2090  std::vector<int> ring_sizes;
2091  std::vector<int> poly_rings;
2092  int render_group = 0;
2093 
2094  // extract it
2095  SQLTypeInfo import_ti;
2096 
2098  pGeometry,
2099  import_ti,
2100  coords,
2101  bounds,
2102  ring_sizes,
2103  poly_rings,
2104  PROMOTE_POLYGON_TO_MULTIPOLYGON)) {
2105  std::string msg = "Failed to extract valid geometry from feature " +
2106  std::to_string(firstFeature + iFeature + 1) +
2107  " for column " + cd->columnName;
2108  throw std::runtime_error(msg);
2109  }
2110 
2111  // validate types
2112  if (col_type != import_ti.get_type()) {
2113  if (!PROMOTE_POLYGON_TO_MULTIPOLYGON ||
2114  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
2115  col_type == SQLTypes::kMULTIPOLYGON)) {
2116  throw std::runtime_error(
2117  "Imported geometry doesn't match the type of column " + cd->columnName);
2118  }
2119  }
2120 
2121  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2122  if (ring_sizes.size()) {
2123  // get a suitable render group for these poly coords
2124  auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2125  CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2126  render_group = (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2127  } else {
2128  // empty poly
2129  render_group = -1;
2130  }
2131  }
2132 
2133  // create coords array value and add it to the physical column
2134  ++cd_it;
2135  auto cd_coords = *cd_it;
2136  std::vector<TDatum> td_coord_data;
2137  std::vector<uint8_t> compressed_coords = compress_coords(coords, col_ti);
2138  for (auto cc : compressed_coords) {
2139  TDatum td_byte;
2140  td_byte.val.int_val = cc;
2141  td_coord_data.push_back(td_byte);
2142  }
2143  TDatum tdd_coords;
2144  tdd_coords.val.arr_val = td_coord_data;
2145  tdd_coords.is_null = false;
2146  import_buffers[col_idx]->add_value(cd_coords, tdd_coords, false);
2147  ++col_idx;
2148 
2149  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2150  // Create ring_sizes array value and add it to the physical column
2151  ++cd_it;
2152  auto cd_ring_sizes = *cd_it;
2153  std::vector<TDatum> td_ring_sizes;
2154  for (auto ring_size : ring_sizes) {
2155  TDatum td_ring_size;
2156  td_ring_size.val.int_val = ring_size;
2157  td_ring_sizes.push_back(td_ring_size);
2158  }
2159  TDatum tdd_ring_sizes;
2160  tdd_ring_sizes.val.arr_val = td_ring_sizes;
2161  tdd_ring_sizes.is_null = false;
2162  import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes, false);
2163  ++col_idx;
2164  }
2165 
2166  if (col_type == kMULTIPOLYGON) {
2167  // Create poly_rings array value and add it to the physical column
2168  ++cd_it;
2169  auto cd_poly_rings = *cd_it;
2170  std::vector<TDatum> td_poly_rings;
2171  for (auto num_rings : poly_rings) {
2172  TDatum td_num_rings;
2173  td_num_rings.val.int_val = num_rings;
2174  td_poly_rings.push_back(td_num_rings);
2175  }
2176  TDatum tdd_poly_rings;
2177  tdd_poly_rings.val.arr_val = td_poly_rings;
2178  tdd_poly_rings.is_null = false;
2179  import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings, false);
2180  ++col_idx;
2181  }
2182 
2183  if (col_type == kLINESTRING || col_type == kPOLYGON ||
2184  col_type == kMULTIPOLYGON) {
2185  // Create bounds array value and add it to the physical column
2186  ++cd_it;
2187  auto cd_bounds = *cd_it;
2188  std::vector<TDatum> td_bounds_data;
2189  for (auto b : bounds) {
2190  TDatum td_double;
2191  td_double.val.real_val = b;
2192  td_bounds_data.push_back(td_double);
2193  }
2194  TDatum tdd_bounds;
2195  tdd_bounds.val.arr_val = td_bounds_data;
2196  tdd_bounds.is_null = false;
2197  import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds, false);
2198  ++col_idx;
2199  }
2200 
2201  if (col_type == kPOLYGON || col_type == kMULTIPOLYGON) {
2202  // Create render_group value and add it to the physical column
2203  ++cd_it;
2204  auto cd_render_group = *cd_it;
2205  TDatum td_render_group;
2206  td_render_group.val.int_val = render_group;
2207  td_render_group.is_null = false;
2208  import_buffers[col_idx]->add_value(cd_render_group, td_render_group, false);
2209  ++col_idx;
2210  }
2211  } else {
2212  // regular column
2213  // pull from GDAL metadata
2214  const auto cit = columnNameToSourceNameMap.find(cd->columnName);
2215  CHECK(cit != columnNameToSourceNameMap.end());
2216  const std::string& fieldName = cit->second;
2217  const auto fit = fieldNameToIndexMap.find(fieldName);
2218  CHECK(fit != fieldNameToIndexMap.end());
2219  size_t iField = fit->second;
2220  CHECK(iField < fieldNameToIndexMap.size());
2221  std::string fieldContents = features[iFeature]->GetFieldAsString(iField);
2222  import_buffers[col_idx]->add_value(cd, fieldContents, false, copy_params);
2223  ++col_idx;
2224  }
2225  }
2226  import_status.rows_completed++;
2227  } catch (const std::exception& e) {
2228  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2229  import_buffers[col_idx_to_pop]->pop_value();
2230  }
2231  import_status.rows_rejected++;
2232  LOG(ERROR) << "Input exception thrown: " << e.what() << ". Row discarded.";
2233  }
2234  }
2235  float convert_ms =
2236  float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2237  convert_timer)) /
2238  1000.0f;
2239 
2240  float load_ms = 0.0f;
2241  if (import_status.rows_completed > 0) {
2242  auto load_timer = timer_start();
2243  importer->load(import_buffers, import_status.rows_completed);
2244  load_ms =
2245  float(
2246  timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>(
2247  load_timer)) /
2248  1000.0f;
2249  }
2250 
2251  if (DEBUG_TIMING && import_status.rows_completed > 0) {
2252  LOG(INFO) << "DEBUG: Process " << convert_ms << "ms";
2253  LOG(INFO) << "DEBUG: Load " << load_ms << "ms";
2254  }
2255 
2256  import_status.thread_id = thread_id;
2257 
2258  if (DEBUG_TIMING) {
2259  LOG(INFO) << "DEBUG: Total "
2260  << float(timer_stop<std::chrono::steady_clock::time_point,
2261  std::chrono::microseconds>(convert_timer)) /
2262  1000.0f
2263  << "ms";
2264  }
2265 
2266  return import_status;
2267 }
2268 
2269 static size_t find_end(const char* buffer, size_t size, const CopyParams& copy_params) {
2270  int i;
2271  // @TODO(wei) line_delim is in quotes note supported
2272  for (i = size - 1; i >= 0 && buffer[i] != copy_params.line_delim; i--) {
2273  ;
2274  }
2275 
2276  if (i < 0) {
2277  int slen = size < 50 ? size : 50;
2278  std::string showMsgStr(buffer, buffer + slen);
2279  LOG(ERROR) << "No line delimiter in block. Block was of size " << size
2280  << " bytes, first few characters " << showMsgStr;
2281  return size;
2282  }
2283  return i + 1;
2284 }
2285 
2287  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2288  size_t row_count) {
2289  return loadImpl(import_buffers, row_count, false);
2290 }
2291 
2292 bool Loader::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2293  size_t row_count) {
2294  return loadImpl(import_buffers, row_count, true);
2295 }
2296 
2297 namespace {
2298 
2299 int64_t int_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2300  const auto& ti = import_buffer.getTypeInfo();
2301  const int8_t* values_buffer{nullptr};
2302  if (ti.is_string()) {
2303  CHECK_EQ(kENCODING_DICT, ti.get_compression());
2304  values_buffer = import_buffer.getStringDictBuffer();
2305  } else {
2306  values_buffer = import_buffer.getAsBytes();
2307  }
2308  CHECK(values_buffer);
2309  const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2310  switch (logical_size) {
2311  case 1: {
2312  return values_buffer[index];
2313  }
2314  case 2: {
2315  return reinterpret_cast<const int16_t*>(values_buffer)[index];
2316  }
2317  case 4: {
2318  return reinterpret_cast<const int32_t*>(values_buffer)[index];
2319  }
2320  case 8: {
2321  return reinterpret_cast<const int64_t*>(values_buffer)[index];
2322  }
2323  default:
2324  LOG(FATAL) << "Unexpected size for shard key: " << logical_size;
2325  }
2326  UNREACHABLE();
2327  return 0;
2328 }
2329 
2330 float float_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2331  const auto& ti = import_buffer.getTypeInfo();
2332  CHECK_EQ(kFLOAT, ti.get_type());
2333  const auto values_buffer = import_buffer.getAsBytes();
2334  return reinterpret_cast<const float*>(may_alias_ptr(values_buffer))[index];
2335 }
2336 
2337 double double_value_at(const TypedImportBuffer& import_buffer, const size_t index) {
2338  const auto& ti = import_buffer.getTypeInfo();
2339  CHECK_EQ(kDOUBLE, ti.get_type());
2340  const auto values_buffer = import_buffer.getAsBytes();
2341  return reinterpret_cast<const double*>(may_alias_ptr(values_buffer))[index];
2342 }
2343 
2344 } // namespace
2345 
2346 void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_buffers,
2347  std::vector<size_t>& all_shard_row_counts,
2348  const OneShardBuffers& import_buffers,
2349  const size_t row_count,
2350  const size_t shard_count) {
2351  all_shard_row_counts.resize(shard_count);
2352  for (size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2353  all_shard_import_buffers.emplace_back();
2354  for (const auto& typed_import_buffer : import_buffers) {
2355  all_shard_import_buffers.back().emplace_back(
2356  new TypedImportBuffer(typed_import_buffer->getColumnDesc(),
2357  typed_import_buffer->getStringDictionary()));
2358  }
2359  }
2360  CHECK_GT(table_desc_->shardedColumnId, 0);
2361  int col_idx{0};
2362  const ColumnDescriptor* shard_col_desc{nullptr};
2363  for (const auto col_desc : column_descs_) {
2364  ++col_idx;
2365  if (col_idx == table_desc_->shardedColumnId) {
2366  shard_col_desc = col_desc;
2367  break;
2368  }
2369  }
2370  CHECK(shard_col_desc);
2371  CHECK_LE(static_cast<size_t>(table_desc_->shardedColumnId), import_buffers.size());
2372  auto& shard_column_input_buffer = import_buffers[table_desc_->shardedColumnId - 1];
2373  const auto& shard_col_ti = shard_col_desc->columnType;
2374  CHECK(shard_col_ti.is_integer() ||
2375  (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT) ||
2376  shard_col_ti.is_time());
2377  if (shard_col_ti.is_string()) {
2378  const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2379  CHECK(payloads_ptr);
2380  shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2381  }
2382 
2383  // for each replicated (alter added) columns, number of rows in a shard is
2384  // inferred from that of the sharding column, not simply evenly distributed.
2385  const auto shard_tds = catalog_.getPhysicalTablesDescriptors(table_desc_);
2386  // Here the loop count is overloaded. For normal imports, we loop thru all
2387  // input values (rows), so the loop count is the number of input rows.
2388  // For ALTER ADD COLUMN, we replicate one default value to existing rows in
2389  // all shards, so the loop count is the number of shards.
2390  const auto loop_count = getReplicating() ? table_desc_->nShards : row_count;
2391  for (size_t i = 0; i < loop_count; ++i) {
2392  const size_t shard =
2393  getReplicating()
2394  ? i
2395  : SHARD_FOR_KEY(int_value_at(*shard_column_input_buffer, i), shard_count);
2396  auto& shard_output_buffers = all_shard_import_buffers[shard];
2397 
2398  // when replicate a column, populate 'rows' to all shards only once
2399  // and its value is fetch from the first and the single row
2400  const auto row_index = getReplicating() ? 0 : i;
2401 
2402  for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2403  const auto& input_buffer = import_buffers[col_idx];
2404  const auto& col_ti = input_buffer->getTypeInfo();
2405  const auto type =
2406  col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
2407 
2408  // for a replicated (added) column, populate rows_per_shard as per-shard replicate
2409  // count. and, bypass non-replicated column.
2410  if (getReplicating()) {
2411  if (input_buffer->get_replicate_count() > 0) {
2412  shard_output_buffers[col_idx]->set_replicate_count(
2413  shard_tds[shard]->fragmenter->getNumRows());
2414  } else {
2415  continue;
2416  }
2417  }
2418 
2419  switch (type) {
2420  case kBOOLEAN:
2421  shard_output_buffers[col_idx]->addBoolean(
2422  int_value_at(*input_buffer, row_index));
2423  break;
2424  case kTINYINT:
2425  shard_output_buffers[col_idx]->addTinyint(
2426  int_value_at(*input_buffer, row_index));
2427  break;
2428  case kSMALLINT:
2429  shard_output_buffers[col_idx]->addSmallint(
2430  int_value_at(*input_buffer, row_index));
2431  break;
2432  case kINT:
2433  shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, row_index));
2434  break;
2435  case kBIGINT:
2436  shard_output_buffers[col_idx]->addBigint(
2437  int_value_at(*input_buffer, row_index));
2438  break;
2439  case kFLOAT:
2440  shard_output_buffers[col_idx]->addFloat(
2441  float_value_at(*input_buffer, row_index));
2442  break;
2443  case kDOUBLE:
2444  shard_output_buffers[col_idx]->addDouble(
2445  double_value_at(*input_buffer, row_index));
2446  break;
2447  case kTEXT:
2448  case kVARCHAR:
2449  case kCHAR: {
2450  CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2451  shard_output_buffers[col_idx]->addString(
2452  (*input_buffer->getStringBuffer())[row_index]);
2453  break;
2454  }
2455  case kTIME:
2456  case kTIMESTAMP:
2457  case kDATE:
2458  shard_output_buffers[col_idx]->addBigint(
2459  int_value_at(*input_buffer, row_index));
2460  break;
2461  case kARRAY:
2462  if (IS_STRING(col_ti.get_subtype())) {
2463  CHECK(input_buffer->getStringArrayBuffer());
2464  CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2465  const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2466  shard_output_buffers[col_idx]->addStringArray(input_arr);
2467  } else {
2468  shard_output_buffers[col_idx]->addArray(
2469  (*input_buffer->getArrayBuffer())[row_index]);
2470  }
2471  break;
2472  case kPOINT:
2473  case kLINESTRING:
2474  case kPOLYGON:
2475  case kMULTIPOLYGON: {
2476  CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2477  shard_output_buffers[col_idx]->addGeoString(
2478  (*input_buffer->getGeoStringBuffer())[row_index]);
2479  break;
2480  }
2481  default:
2482  CHECK(false);
2483  }
2484  }
2485  ++all_shard_row_counts[shard];
2486  // when replicating a column, row count of a shard == replicate count of the column on
2487  // the shard
2488  if (getReplicating()) {
2489  all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2490  }
2491  }
2492 }
2493 
2495  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2496  size_t row_count,
2497  bool checkpoint) {
2498  if (table_desc_->nShards) {
2499  std::vector<OneShardBuffers> all_shard_import_buffers;
2500  std::vector<size_t> all_shard_row_counts;
2501  const auto shard_tables = catalog_.getPhysicalTablesDescriptors(table_desc_);
2502  distributeToShards(all_shard_import_buffers,
2503  all_shard_row_counts,
2504  import_buffers,
2505  row_count,
2506  shard_tables.size());
2507  bool success = true;
2508  for (size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2509  if (!all_shard_row_counts[shard_idx]) {
2510  continue;
2511  }
2512  success = success && loadToShard(all_shard_import_buffers[shard_idx],
2513  all_shard_row_counts[shard_idx],
2514  shard_tables[shard_idx],
2515  checkpoint);
2516  }
2517  return success;
2518  }
2519  return loadToShard(import_buffers, row_count, table_desc_, checkpoint);
2520 }
2521 
2522 std::vector<DataBlockPtr> Loader::get_data_block_pointers(
2523  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2524  std::vector<DataBlockPtr> result(import_buffers.size());
2525  std::vector<std::pair<const size_t, std::future<int8_t*>>>
2526  encoded_data_block_ptrs_futures;
2527  // make all async calls to string dictionary here and then continue execution
2528  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2529  if (import_buffers[buf_idx]->getTypeInfo().is_string() &&
2530  import_buffers[buf_idx]->getTypeInfo().get_compression() != kENCODING_NONE) {
2531  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2532  CHECK_EQ(kENCODING_DICT, import_buffers[buf_idx]->getTypeInfo().get_compression());
2533 
2534  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2535  buf_idx,
2536  std::async(std::launch::async, [buf_idx, &import_buffers, string_payload_ptr] {
2537  import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2538  return import_buffers[buf_idx]->getStringDictBuffer();
2539  })));
2540  }
2541  }
2542 
2543  for (size_t buf_idx = 0; buf_idx < import_buffers.size(); buf_idx++) {
2544  DataBlockPtr p;
2545  if (import_buffers[buf_idx]->getTypeInfo().is_number() ||
2546  import_buffers[buf_idx]->getTypeInfo().is_time() ||
2547  import_buffers[buf_idx]->getTypeInfo().get_type() == kBOOLEAN) {
2548  p.numbersPtr = import_buffers[buf_idx]->getAsBytes();
2549  } else if (import_buffers[buf_idx]->getTypeInfo().is_string()) {
2550  auto string_payload_ptr = import_buffers[buf_idx]->getStringBuffer();
2551  if (import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_NONE) {
2552  p.stringsPtr = string_payload_ptr;
2553  } else {
2554  // This condition means we have column which is ENCODED string. We already made
2555  // Async request to gain the encoded integer values above so we should skip this
2556  // iteration and continue.
2557  continue;
2558  }
2559  } else if (import_buffers[buf_idx]->getTypeInfo().is_geometry()) {
2560  auto geo_payload_ptr = import_buffers[buf_idx]->getGeoStringBuffer();
2561  p.stringsPtr = geo_payload_ptr;
2562  } else {
2563  CHECK(import_buffers[buf_idx]->getTypeInfo().get_type() == kARRAY);
2564  if (IS_STRING(import_buffers[buf_idx]->getTypeInfo().get_subtype())) {
2565  CHECK(import_buffers[buf_idx]->getTypeInfo().get_compression() == kENCODING_DICT);
2566  import_buffers[buf_idx]->addDictEncodedStringArray(
2567  *import_buffers[buf_idx]->getStringArrayBuffer());
2568  p.arraysPtr = import_buffers[buf_idx]->getStringArrayDictBuffer();
2569  } else {
2570  p.arraysPtr = import_buffers[buf_idx]->getArrayBuffer();
2571  }
2572  }
2573  result[buf_idx] = p;
2574  }
2575 
2576  // wait for the async requests we made for string dictionary
2577  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2578  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2579  }
2580  return result;
2581 }
2582 
2584  const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2585  size_t row_count,
2586  const TableDescriptor* shard_table,
2587  bool checkpoint) {
2588  std::unique_lock<std::mutex> loader_lock(loader_mutex_);
2589  // patch insert_data with new column
2590  if (this->getReplicating()) {
2591  for (const auto& import_buff : import_buffers) {
2592  insert_data_.replicate_count = import_buff->get_replicate_count();
2593  insert_data_.columnDescriptors[import_buff->getColumnDesc()->columnId] =
2594  import_buff->getColumnDesc();
2595  }
2596  }
2597 
2598  Fragmenter_Namespace::InsertData ins_data(insert_data_);
2599  ins_data.numRows = row_count;
2600  bool success = true;
2601 
2602  ins_data.data = get_data_block_pointers(import_buffers);
2603 
2604  for (const auto& import_buffer : import_buffers) {
2605  ins_data.bypass.push_back(0 == import_buffer->get_replicate_count());
2606  }
2607 
2608  // release loader_lock so that in InsertOrderFragmenter::insertDat
2609  // we can have multiple threads sort/shuffle InsertData
2610  loader_lock.unlock();
2611 
2612  {
2613  try {
2614  if (checkpoint) {
2615  shard_table->fragmenter->insertData(ins_data);
2616  } else {
2617  shard_table->fragmenter->insertDataNoCheckpoint(ins_data);
2618  }
2619  } catch (std::exception& e) {
2620  LOG(ERROR) << "Fragmenter Insert Exception: " << e.what();
2621  success = false;
2622  }
2623  }
2624  return success;
2625 }
2626 
2628  insert_data_.databaseId = catalog_.getCurrentDB().dbId;
2629  insert_data_.tableId = table_desc_->tableId;
2630  for (auto cd : column_descs_) {
2631  insert_data_.columnIds.push_back(cd->columnId);
2632  if (cd->columnType.get_compression() == kENCODING_DICT) {
2633  CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
2634  const auto dd = catalog_.getMetadataForDict(cd->columnType.get_comp_param());
2635  CHECK(dd);
2636  dict_map_[cd->columnId] = dd->stringDict.get();
2637  }
2638  }
2639  insert_data_.numRows = 0;
2640 }
2641 
2643  detect_row_delimiter();
2644  split_raw_data();
2645  find_best_sqltypes_and_headers();
2646 }
2647 
2649  const bool decompressed) {
2650  if (!p_file) {
2651  p_file = fopen(file_path.c_str(), "rb");
2652  }
2653  if (!p_file) {
2654  throw std::runtime_error("failed to open file '" + file_path +
2655  "': " + strerror(errno));
2656  }
2657 
2658  // somehow clang does not support ext/stdio_filebuf.h, so
2659  // need to diy readline with customized copy_params.line_delim...
2660  char line[1 << 20];
2661  auto end_time = std::chrono::steady_clock::now() +
2662  timeout * (boost::istarts_with(file_path, "s3://") ? 3 : 1);
2663  try {
2664  while (!feof(p_file)) {
2665  int c;
2666  size_t n = 0;
2667  while (EOF != (c = fgetc(p_file)) && copy_params.line_delim != c) {
2668  line[n++] = c;
2669  if (n >= sizeof(line) - 1) {
2670  break;
2671  }
2672  }
2673  if (0 == n) {
2674  break;
2675  }
2676  line[n] = 0;
2677  // remember the first line, which is possibly a header line, to
2678  // ignore identical header line(s) in 2nd+ files of a archive;
2679  // otherwise, 2nd+ header may be mistaken as an all-string row
2680  // and so be final column types.
2681  if (line1.empty()) {
2682  line1 = line;
2683  } else if (line == line1) {
2684  continue;
2685  }
2686 
2687  raw_data += std::string(line, n);
2688  raw_data += copy_params.line_delim;
2690  if (std::chrono::steady_clock::now() > end_time) {
2691  if (import_status.rows_completed > 10000) {
2692  break;
2693  }
2694  }
2695  }
2696  } catch (std::exception& e) {
2697  }
2698 
2699  // as if load truncated
2701  load_failed = true;
2702 
2703  fclose(p_file);
2704  p_file = nullptr;
2705  return import_status;
2706 }
2707 
2709  // this becomes analogous to Importer::import()
2711 }
2712 
2714  if (copy_params.delimiter == '\0') {
2715  copy_params.delimiter = ',';
2716  if (boost::filesystem::extension(file_path) == ".tsv") {
2717  copy_params.delimiter = '\t';
2718  }
2719  }
2720 }
2721 
2723  const char* buf = raw_data.c_str();
2724  const char* buf_end = buf + raw_data.size();
2725  bool try_single_thread = false;
2726  for (const char* p = buf; p < buf_end; p++) {
2727  std::vector<std::string> row;
2728  p = get_row(p, buf_end, buf_end, copy_params, true, nullptr, row, try_single_thread);
2729  raw_rows.push_back(row);
2730  if (try_single_thread) {
2731  break;
2732  }
2733  }
2734  if (try_single_thread) {
2735  copy_params.threads = 1;
2736  raw_rows.clear();
2737  for (const char* p = buf; p < buf_end; p++) {
2738  std::vector<std::string> row;
2739  p = get_row(
2740  p, buf_end, buf_end, copy_params, true, nullptr, row, try_single_thread);
2741  raw_rows.push_back(row);
2742  }
2743  }
2744 }
2745 
2746 template <class T>
2747 bool try_cast(const std::string& str) {
2748  try {
2749  boost::lexical_cast<T>(str);
2750  } catch (const boost::bad_lexical_cast& e) {
2751  return false;
2752  }
2753  return true;
2754 }
2755 
2756 inline char* try_strptimes(const char* str, const std::vector<std::string>& formats) {
2757  std::tm tm_struct;
2758  char* buf;
2759  for (auto format : formats) {
2760  buf = strptime(str, format.c_str(), &tm_struct);
2761  if (buf) {
2762  return buf;
2763  }
2764  }
2765  return nullptr;
2766 }
2767 
2768 SQLTypes Detector::detect_sqltype(const std::string& str) {
2769  SQLTypes type = kTEXT;
2770  if (try_cast<double>(str)) {
2771  type = kDOUBLE;
2772  /*if (try_cast<bool>(str)) {
2773  type = kBOOLEAN;
2774  }*/
2775  if (try_cast<int16_t>(str)) {
2776  type = kSMALLINT;
2777  } else if (try_cast<int32_t>(str)) {
2778  type = kINT;
2779  } else if (try_cast<int64_t>(str)) {
2780  type = kBIGINT;
2781  } else if (try_cast<float>(str)) {
2782  type = kFLOAT;
2783  }
2784  }
2785 
2786  // check for geo types
2787  if (type == kTEXT) {
2788  // convert to upper case
2789  std::string str_upper_case = str;
2790  std::transform(
2791  str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
2792 
2793  // then test for leading words
2794  if (str_upper_case.find("POINT") == 0) {
2795  type = kPOINT;
2796  } else if (str_upper_case.find("LINESTRING") == 0) {
2797  type = kLINESTRING;
2798  } else if (str_upper_case.find("POLYGON") == 0) {
2799  if (PROMOTE_POLYGON_TO_MULTIPOLYGON) {
2800  type = kMULTIPOLYGON;
2801  } else {
2802  type = kPOLYGON;
2803  }
2804  } else if (str_upper_case.find("MULTIPOLYGON") == 0) {
2805  type = kMULTIPOLYGON;
2806  } else if (str_upper_case.find_first_not_of("0123456789ABCDEF") ==
2807  std::string::npos &&
2808  (str_upper_case.size() % 2) == 0) {
2809  // could be a WKB hex blob
2810  // we can't handle these yet
2811  // leave as TEXT for now
2812  // deliberate return here, as otherwise this would get matched as TIME
2813  // @TODO
2814  // implement WKB import
2815  return type;
2816  }
2817  }
2818 
2819  // check for time types
2820  if (type == kTEXT) {
2821  // @TODO
2822  // make these tests more robust so they don't match stuff they should not
2823  char* buf;
2824  buf = try_strptimes(str.c_str(), {"%Y-%m-%d", "%m/%d/%Y", "%d-%b-%y", "%d/%b/%Y"});
2825  if (buf) {
2826  type = kDATE;
2827  if (*buf == 'T' || *buf == ' ' || *buf == ':') {
2828  buf++;
2829  }
2830  }
2831  buf = try_strptimes(buf == nullptr ? str.c_str() : buf,
2832  {"%T %z", "%T", "%H%M%S", "%R"});
2833  if (buf) {
2834  if (type == kDATE) {
2835  type = kTIMESTAMP;
2836  } else {
2837  type = kTIME;
2838  }
2839  }
2840  }
2841 
2842  return type;
2843 }
2844 
2845 std::vector<SQLTypes> Detector::detect_column_types(const std::vector<std::string>& row) {
2846  std::vector<SQLTypes> types(row.size());
2847  for (size_t i = 0; i < row.size(); i++) {
2848  types[i] = detect_sqltype(row[i]);
2849  }
2850  return types;
2851 }
2852 
2854  static std::array<int, kSQLTYPE_LAST> typeorder;
2855  typeorder[kCHAR] = 0;
2856  typeorder[kBOOLEAN] = 2;
2857  typeorder[kSMALLINT] = 3;
2858  typeorder[kINT] = 4;
2859  typeorder[kBIGINT] = 5;
2860  typeorder[kFLOAT] = 6;
2861  typeorder[kDOUBLE] = 7;
2862  typeorder[kTIMESTAMP] = 8;
2863  typeorder[kTIME] = 9;
2864  typeorder[kDATE] = 10;
2865  typeorder[kPOINT] = 11;
2866  typeorder[kLINESTRING] = 11;
2867  typeorder[kPOLYGON] = 11;
2868  typeorder[kMULTIPOLYGON] = 11;
2869  typeorder[kTEXT] = 12;
2870 
2871  // note: b < a instead of a < b because the map is ordered most to least restrictive
2872  return typeorder[b] < typeorder[a];
2873 }
2874 
2876  best_sqltypes = find_best_sqltypes(raw_rows.begin() + 1, raw_rows.end(), copy_params);
2877  best_encodings =
2878  find_best_encodings(raw_rows.begin() + 1, raw_rows.end(), best_sqltypes);
2879  std::vector<SQLTypes> head_types = detect_column_types(raw_rows.at(0));
2880  switch (copy_params.has_header) {
2882  has_headers = detect_headers(head_types, best_sqltypes);
2883  if (has_headers) {
2885  } else {
2887  }
2888  break;
2890  has_headers = false;
2891  break;
2893  has_headers = true;
2894  break;
2895  }
2896 }
2897 
2899  best_sqltypes = find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
2900 }
2901 
2902 std::vector<SQLTypes> Detector::find_best_sqltypes(
2903  const std::vector<std::vector<std::string>>& raw_rows,
2904  const CopyParams& copy_params) {
2905  return find_best_sqltypes(raw_rows.begin(), raw_rows.end(), copy_params);
2906 }
2907 
2908 std::vector<SQLTypes> Detector::find_best_sqltypes(
2909  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
2910  const std::vector<std::vector<std::string>>::const_iterator& row_end,
2911  const CopyParams& copy_params) {
2912  if (raw_rows.size() < 1) {
2913  throw std::runtime_error("No rows found in: " +
2914  boost::filesystem::basename(file_path));
2915  }
2916  auto end_time = std::chrono::steady_clock::now() + timeout;
2917  size_t num_cols = raw_rows.front().size();
2918  std::vector<SQLTypes> best_types(num_cols, kCHAR);
2919  std::vector<size_t> non_null_col_counts(num_cols, 0);
2920  for (auto row = row_begin; row != row_end; row++) {
2921  while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
2922  best_types.push_back(kCHAR);
2923  non_null_col_counts.push_back(0);
2924  }
2925  for (size_t col_idx = 0; col_idx < row->size(); col_idx++) {
2926  // do not count nulls
2927  if (row->at(col_idx) == "" || !row->at(col_idx).compare(copy_params.null_str)) {
2928  continue;
2929  }
2930  SQLTypes t = detect_sqltype(row->at(col_idx));
2931  non_null_col_counts[col_idx]++;
2932  if (!more_restrictive_sqltype(best_types[col_idx], t)) {
2933  best_types[col_idx] = t;
2934  }
2935  }
2936  if (std::chrono::steady_clock::now() > end_time) {
2937  break;
2938  }
2939  }
2940  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
2941  // if we don't have any non-null values for this column make it text to be
2942  // safe b/c that is least restrictive type
2943  if (non_null_col_counts[col_idx] == 0) {
2944  best_types[col_idx] = kTEXT;
2945  }
2946  }
2947 
2948  return best_types;
2949 }
2950 
2951 std::vector<EncodingType> Detector::find_best_encodings(
2952  const std::vector<std::vector<std::string>>::const_iterator& row_begin,
2953  const std::vector<std::vector<std::string>>::const_iterator& row_end,
2954  const std::vector<SQLTypes>& best_types) {
2955  if (raw_rows.size() < 1) {
2956  throw std::runtime_error("No rows found in: " +
2957  boost::filesystem::basename(file_path));
2958  }
2959  size_t num_cols = best_types.size();
2960  std::vector<EncodingType> best_encodes(num_cols, kENCODING_NONE);
2961  std::vector<size_t> num_rows_per_col(num_cols, 1);
2962  std::vector<std::unordered_set<std::string>> count_set(num_cols);
2963  for (auto row = row_begin; row != row_end; row++) {
2964  for (size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
2965  if (IS_STRING(best_types[col_idx])) {
2966  count_set[col_idx].insert(row->at(col_idx));
2967  num_rows_per_col[col_idx]++;
2968  }
2969  }
2970  }
2971  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
2972  if (IS_STRING(best_types[col_idx])) {
2973  float uniqueRatio =
2974  static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
2975  if (uniqueRatio < 0.75) {
2976  best_encodes[col_idx] = kENCODING_DICT;
2977  }
2978  }
2979  }
2980  return best_encodes;
2981 }
2982 
2983 // detect_headers returns true if:
2984 // - all elements of the first argument are kTEXT
2985 // - there is at least one instance where tail_types is more restrictive than head_types
2986 // (ie, not kTEXT)
2987 bool Detector::detect_headers(const std::vector<SQLTypes>& head_types,
2988  const std::vector<SQLTypes>& tail_types) {
2989  if (head_types.size() != tail_types.size()) {
2990  return false;
2991  }
2992  bool has_headers = false;
2993  for (size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
2994  if (head_types[col_idx] != kTEXT) {
2995  return false;
2996  }
2997  has_headers = has_headers || tail_types[col_idx] != kTEXT;
2998  }
2999  return has_headers;
3000 }
3001 
3002 std::vector<std::vector<std::string>> Detector::get_sample_rows(size_t n) {
3003  n = std::min(n, raw_rows.size());
3004  size_t offset = (has_headers && raw_rows.size() > 1) ? 1 : 0;
3005  std::vector<std::vector<std::string>> sample_rows(raw_rows.begin() + offset,
3006  raw_rows.begin() + n);
3007  return sample_rows;
3008 }
3009 
3010 std::vector<std::string> Detector::get_headers() {
3011  std::vector<std::string> headers(best_sqltypes.size());
3012  for (size_t i = 0; i < best_sqltypes.size(); i++) {
3013  if (has_headers && i < raw_rows[0].size()) {
3014  headers[i] = raw_rows[0][i];
3015  } else {
3016  headers[i] = "column_" + std::to_string(i + 1);
3017  }
3018  }
3019  return headers;
3020 }
3021 
3022 void Importer::load(const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3023  size_t row_count) {
3024  if (!loader->loadNoCheckpoint(import_buffers, row_count)) {
3025  load_failed = true;
3026  }
3027 }
3028 
3029 void Importer::checkpoint(const int32_t start_epoch) {
3030  if (load_failed) {
3031  // rollback to starting epoch - undo all the added records
3032  loader->setTableEpoch(start_epoch);
3033  } else {
3034  loader->checkpoint();
3035  }
3036 
3037  if (loader->getTableDesc()->persistenceLevel ==
3038  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3039  auto ms = measure<>::execution([&]() {
3040  if (!load_failed) {
3041  for (auto& p : import_buffers_vec[0]) {
3042  if (!p->stringDictCheckpoint()) {
3043  LOG(ERROR) << "Checkpointing Dictionary for Column "
3044  << p->getColumnDesc()->columnName << " failed.";
3045  load_failed = true;
3046  break;
3047  }
3048  }
3049  }
3050  });
3051  if (DEBUG_TIMING) {
3052  LOG(INFO) << "Dictionary Checkpointing took " << (double)ms / 1000.0 << " Seconds."
3053  << std::endl;
3054  }
3055  }
3056 }
3057 
3059  // in generalized importing scheme, reaching here file_path may
3060  // contain a file path, a url or a wildcard of file paths.
3061  // see CopyTableStmt::execute.
3062  auto file_paths = mapd_glob(file_path);
3063  if (file_paths.size() == 0) {
3064  file_paths.push_back(file_path);
3065  }
3066 
3067  // sum up sizes of all local files -- only for local files. if
3068  // file_path is a s3 url, sizes will be obtained via S3Archive.
3069  for (const auto& file_path : file_paths) {
3071  }
3072 
3073 #ifdef ENABLE_IMPORT_PARQUET
3074  // s3 parquet goes different route because the files do not use libarchive
3075  // but parquet api, and they need to landed like .7z files.
3076  //
3077  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3078  // because for example spark sql users may specify a output url w/o file
3079  // extension like this:
3080  // df.write
3081  // .mode("overwrite")
3082  // .parquet("s3://bucket/folder/parquet/mydata")
3083  // without the parameter, it means plain or compressed csv files.
3084  // note: .ORC and AVRO files should follow a similar path to Parquet?
3085  if (copy_params.file_type == FileType::PARQUET) {
3086  import_parquet(file_paths);
3087  } else
3088 #endif
3089  {
3090  import_compressed(file_paths);
3091  }
3092  return import_status;
3093 }
3094 
3095 #ifdef ENABLE_IMPORT_PARQUET
3096 inline auto open_parquet_table(const std::string& file_path,
3097  std::shared_ptr<arrow::io::ReadableFile>& infile,
3098  std::unique_ptr<parquet::arrow::FileReader>& reader,
3099  std::shared_ptr<arrow::Table>& table) {
3100  using namespace parquet::arrow;
3101  using ReadableFile = arrow::io::ReadableFile;
3102  auto mempool = arrow::default_memory_pool();
3103  PARQUET_THROW_NOT_OK(ReadableFile::Open(file_path, mempool, &infile));
3104  PARQUET_THROW_NOT_OK(OpenFile(infile, mempool, &reader));
3105  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3106  const auto num_row_groups = reader->num_row_groups();
3107  const auto num_columns = table->num_columns();
3108  const auto num_rows = table->num_rows();
3109  LOG(INFO) << "File " << file_path << " has " << num_rows << " rows and " << num_columns
3110  << " columns in " << num_row_groups << " groups.";
3111  return std::make_tuple(num_row_groups, num_columns, num_rows);
3112 }
3113 
3114 void Detector::import_local_parquet(const std::string& file_path) {
3115  std::shared_ptr<arrow::io::ReadableFile> infile;
3116  std::unique_ptr<parquet::arrow::FileReader> reader;
3117  std::shared_ptr<arrow::Table> table;
3118  int num_row_groups, num_columns;
3119  int64_t num_rows;
3120  std::tie(num_row_groups, num_columns, num_rows) =
3121  open_parquet_table(file_path, infile, reader, table);
3122  // make up header line if not yet
3123  if (0 == raw_data.size()) {
3125  copy_params.line_delim = '\n';
3126  copy_params.delimiter = ',';
3127  // must quote values to skip any embedded delimiter
3128  copy_params.quoted = true;
3129  copy_params.quote = '"';
3130  copy_params.escape = '"';
3131  for (int c = 0; c < num_columns; ++c) {
3132  if (c) {
3133  raw_data += copy_params.delimiter;
3134  }
3135  raw_data += table->column(c)->name();
3136  }
3137  raw_data += copy_params.line_delim;
3138  }
3139  // make up raw data... rowwize...
3140  const ColumnDescriptor cd;
3141  for (int g = 0; g < num_row_groups; ++g) {
3142  // data is columnwise
3143  std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3144  std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3145  arrays.resize(num_columns);
3146  for (int c = 0; c < num_columns; ++c) {
3147  PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3148  for (auto chunk : arrays[c]->chunks()) {
3149  getters.push_back(value_getter(*chunk, nullptr, nullptr));
3150  }
3151  }
3152  for (int r = 0; r < num_rows; ++r) {
3153  for (int c = 0; c < num_columns; ++c) {
3154  std::vector<std::string> buffer;
3155  for (auto chunk : arrays[c]->chunks()) {
3156  DataBuffer<std::string> data(&cd, *chunk, buffer, nullptr);
3157  if (c) {
3158  raw_data += copy_params.delimiter;
3159  }
3160  if (!chunk->IsNull(r)) {
3161  raw_data += copy_params.quote;
3162  raw_data += boost::replace_all_copy(
3163  (data << getters[c](*chunk, r)).buffer.front(), "\"", "\"\"");
3164  raw_data += copy_params.quote;
3165  }
3166  }
3167  }
3168  raw_data += copy_params.line_delim;
3169  if (++import_status.rows_completed >= 10000) {
3170  // as if load truncated
3172  load_failed = true;
3173  return;
3174  }
3175  }
3176  }
3177 }
3178 
3179 template <typename DATA_TYPE>
3180 auto TypedImportBuffer::del_values(std::vector<DATA_TYPE>& buffer,
3181  Importer_NS::BadRowsTracker* const bad_rows_tracker) {
3182  const auto old_size = buffer.size();
3183  // erase backward to minimize memory movement overhead
3184  for (auto rit = bad_rows_tracker->rows.crbegin(); rit != bad_rows_tracker->rows.crend();
3185  ++rit) {
3186  buffer.erase(buffer.begin() + *rit);
3187  }
3188  return std::make_tuple(old_size, buffer.size());
3189 }
3190 
3192  BadRowsTracker* const bad_rows_tracker) {
3193  switch (type) {
3194  case kBOOLEAN:
3195  return del_values(*bool_buffer_, bad_rows_tracker);
3196  case kTINYINT:
3197  return del_values(*tinyint_buffer_, bad_rows_tracker);
3198  case kSMALLINT:
3199  return del_values(*smallint_buffer_, bad_rows_tracker);
3200  case kINT:
3201  return del_values(*int_buffer_, bad_rows_tracker);
3202  case kBIGINT:
3203  case kNUMERIC:
3204  case kDECIMAL:
3205  case kTIME:
3206  case kTIMESTAMP:
3207  case kDATE:
3208  return del_values(*bigint_buffer_, bad_rows_tracker);
3209  case kFLOAT:
3210  return del_values(*float_buffer_, bad_rows_tracker);
3211  case kDOUBLE:
3212  return del_values(*double_buffer_, bad_rows_tracker);
3213  case kTEXT:
3214  case kVARCHAR:
3215  case kCHAR:
3216  return del_values(*string_buffer_, bad_rows_tracker);
3217  case kPOINT:
3218  case kLINESTRING:
3219  case kPOLYGON:
3220  case kMULTIPOLYGON:
3221  return del_values(*geo_string_buffer_, bad_rows_tracker);
3222  case kARRAY:
3223  return del_values(*array_buffer_, bad_rows_tracker);
3224  default:
3225  throw std::runtime_error("Invalid Type");
3226  }
3227 }
3228 
3229 void Importer::import_local_parquet(const std::string& file_path) {
3230  std::shared_ptr<arrow::io::ReadableFile> infile;
3231  std::unique_ptr<parquet::arrow::FileReader> reader;
3232  std::shared_ptr<arrow::Table> table;
3233  int num_row_groups, num_columns;
3234  int64_t nrow_in_file;
3235  std::tie(num_row_groups, num_columns, nrow_in_file) =
3236  open_parquet_table(file_path, infile, reader, table);
3237  // column_list has no $deleted
3238  const auto& column_list = get_column_descs();
3239  // for now geo columns expect a wkt string
3240  std::vector<const ColumnDescriptor*> cds;
3241  int num_physical_cols = 0;
3242  for (auto& cd : column_list) {
3243  cds.push_back(cd);
3244  num_physical_cols += cd->columnType.get_physical_cols();
3245  }
3246  arrow_throw_if(num_columns != (int)(column_list.size() - num_physical_cols),
3247  "Unmatched numbers of columns in parquet file " + file_path + ": " +
3248  std::to_string(num_columns) + " columns in file vs " +
3249  std::to_string(column_list.size() - num_physical_cols) +
3250  " columns in table.");
3251  // slice each group to import slower columns faster, eg. geo or string
3253  const int num_slices = std::max<decltype(max_threads)>(max_threads, num_columns);
3254  // init row estimate for this file
3255  const auto filesize = get_filesize(file_path);
3256  size_t nrow_completed{0};
3257  file_offsets.push_back(0);
3258  // map logic column index to physical column index
3259  auto get_physical_col_idx = [&cds](const int logic_col_idx) -> auto {
3260  int physical_col_idx = 0;
3261  for (int i = 0; i < logic_col_idx; ++i) {
3262  physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3263  }
3264  return physical_col_idx;
3265  };
3266  // load a file = nested iteration of row groups, row slices and logical columns
3267  auto ms_load_a_file = measure<>::execution([&]() {
3268  for (int row_group = 0; row_group < num_row_groups && !load_failed; ++row_group) {
3269  // a sliced row group will be handled like a (logic) parquet file, with
3270  // a entirely clean set of bad_rows_tracker, import_buffers_vec, ... etc
3271  import_buffers_vec.resize(num_slices);
3272  for (int slice = 0; slice < num_slices; slice++) {
3273  import_buffers_vec[slice].clear();
3274  for (const auto cd : cds) {
3275  import_buffers_vec[slice].emplace_back(
3276  new TypedImportBuffer(cd, loader->getStringDict(cd)));
3277  }
3278  }
3279  /*
3280  * A caveat here is: Parquet files or arrow data is imported column wise.
3281  * Unlike importing row-wise csv files, a error on any row of any column
3282  * forces to give up entire row group of all columns, unless there is a
3283  * sophisticated method to trace erroneous rows in individual columns so
3284  * that we can union bad rows and drop them from corresponding
3285  * import_buffers_vec; otherwise, we may exceed maximum number of
3286  * truncated rows easily even with very sparse errors in the files.
3287  */
3288  std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3289  for (size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3290  auto& bad_rows_tracker = bad_rows_trackers[slice];
3291  bad_rows_tracker.file_name = file_path;
3292  bad_rows_tracker.row_group = slice;
3293  bad_rows_tracker.importer = this;
3294  }
3295  // process arrow arrays to import buffers
3296  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3297  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3298  const auto cd = cds[physical_col_idx];
3299  std::shared_ptr<arrow::ChunkedArray> array;
3300  PARQUET_THROW_NOT_OK(
3301  reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3302  const size_t array_size = array->length();
3303  const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3304  ThreadController_NS::SimpleThreadController<void> thread_controller(num_slices);
3305  for (int slice = 0; slice < num_slices; ++slice) {
3306  thread_controller.startThread([&, slice] {
3307  const auto slice_offset = slice % num_slices;
3308  ArraySliceRange slice_range(
3309  std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3310  std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3311  auto& bad_rows_tracker = bad_rows_trackers[slice];
3312  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3313  import_buffer->import_buffers = &import_buffers_vec[slice];
3314  import_buffer->col_idx = physical_col_idx + 1;
3315  for (auto chunk : array->chunks()) {
3316  import_buffer->add_arrow_values(
3317  cd, *chunk, false, slice_range, &bad_rows_tracker);
3318  }
3319  });
3320  }
3321  thread_controller.finish();
3322  }
3323  std::vector<size_t> nrow_in_slice_raw(num_slices);
3324  std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3325  // trim bad rows from import buffers
3326  for (int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3327  const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3328  const auto cd = cds[physical_col_idx];
3329  for (int slice = 0; slice < num_slices; ++slice) {
3330  auto& bad_rows_tracker = bad_rows_trackers[slice];
3331  auto& import_buffer = import_buffers_vec[slice][physical_col_idx];
3332  std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3333  import_buffer->del_values(cd->columnType.get_type(), &bad_rows_tracker);
3334  }
3335  }
3336  // flush slices of this row group to chunks
3337  for (int slice = 0; slice < num_slices; ++slice) {
3338  load(import_buffers_vec[slice], nrow_in_slice_successfully_loaded[slice]);
3339  }
3340  // update import stats
3341  const auto nrow_original =
3342  std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3343  const auto nrow_imported =
3344  std::accumulate(nrow_in_slice_successfully_loaded.begin(),
3345  nrow_in_slice_successfully_loaded.end(),
3346  0);
3347  const auto nrow_dropped = nrow_original - nrow_imported;
3348  LOG(INFO) << "row group " << row_group << ": add " << nrow_imported
3349  << " rows, drop " << nrow_dropped << " rows.";
3350  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3351  import_status.rows_completed += nrow_imported;
3352  import_status.rows_rejected += nrow_dropped;
3355  load_failed = true;
3356  LOG(ERROR) << "Maximum (" << copy_params.max_reject
3357  << ") rows rejected exceeded. Halting load.";
3358  }
3359  // row estimate
3360  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3361  nrow_completed += nrow_imported;
3362  file_offsets.back() =
3363  nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3364  // sum up current total file offsets
3365  const auto total_file_offset =
3366  std::accumulate(file_offsets.begin(), file_offsets.end(), 0);
3367  // estimate number of rows per current total file offset
3368  if (total_file_offset) {
3370  (float)total_file_size / total_file_offset * import_status.rows_completed;
3371  VLOG(3) << "rows_completed " << import_status.rows_completed
3372  << ", rows_estimated " << import_status.rows_estimated
3373  << ", total_file_size " << total_file_size << ", total_file_offset "
3374  << total_file_offset;
3375  }
3376  }
3377  });
3378  LOG(INFO) << "Import " << nrow_in_file << " rows of parquet file " << file_path
3379  << " took " << (double)ms_load_a_file / 1000.0 << " secs";
3380 }
3381 
3382 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths) {
3383  auto importer = dynamic_cast<Importer*>(this);
3384  auto start_epoch = importer ? importer->getLoader()->getTableEpoch() : 0;
3385  try {
3386  std::exception_ptr teptr;
3387  // file_paths may contain one local file path, a list of local file paths
3388  // or a s3/hdfs/... url that may translate to 1 or 1+ remote object keys.
3389  for (auto const& file_path : file_paths) {
3390  std::map<int, std::string> url_parts;
3391  Archive::parse_url(file_path, url_parts);
3392 
3393  // for a s3 url we need to know the obj keys that it comprises
3394  std::vector<std::string> objkeys;
3395  std::unique_ptr<S3ParquetArchive> us3arch;
3396  if ("s3" == url_parts[2]) {
3397 #ifdef HAVE_AWS_S3
3398  us3arch.reset(new S3ParquetArchive(file_path,
3404  us3arch->init_for_read();
3405  total_file_size += us3arch->get_total_file_size();
3406  objkeys = us3arch->get_objkeys();
3407 #else
3408  throw std::runtime_error("AWS S3 support not available");
3409 #endif // HAVE_AWS_S3
3410  } else {
3411  objkeys.emplace_back(file_path);
3412  }
3413 
3414  // for each obj key of a s3 url we need to land it before
3415  // importing it like doing with a 'local file'.
3416  for (auto const& objkey : objkeys) {
3417  try {
3418  auto file_path =
3419  us3arch
3420  ? us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this))
3421  : objkey;
3422  import_local_parquet(file_path);
3423  if (us3arch) {
3424  us3arch->vacuum(objkey);
3425  }
3426  } catch (...) {
3427  if (us3arch) {
3428  us3arch->vacuum(objkey);
3429  }
3430  throw;
3431  }
3433  break;
3434  }
3435  }
3436  }
3437  // rethrow any exception happened herebefore
3438  if (teptr) {
3439  std::rethrow_exception(teptr);
3440  }
3441  } catch (...) {
3442  load_failed = true;
3444  throw;
3445  }
3446  }
3447 
3448  if (importer) {
3449  importer->checkpoint(start_epoch);
3450  }
3451 }
3452 #endif // ENABLE_IMPORT_PARQUET
3453 
3454 void DataStreamSink::import_compressed(std::vector<std::string>& file_paths) {
3455  // a new requirement is to have one single input stream into
3456  // Importer::importDelimited, so need to move pipe related
3457  // stuff to the outmost block.
3458  int fd[2];
3459  if (pipe(fd) < 0) {
3460  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3461  }
3462  signal(SIGPIPE, SIG_IGN);
3463 
3464  std::exception_ptr teptr;
3465  // create a thread to read uncompressed byte stream out of pipe and
3466  // feed into importDelimited()
3467  ImportStatus ret;
3468  auto th_pipe_reader = std::thread([&]() {
3469  try {
3470  // importDelimited will read from FILE* p_file
3471  if (0 == (p_file = fdopen(fd[0], "r"))) {
3472  throw std::runtime_error(std::string("failed to open a pipe: ") +
3473  strerror(errno));
3474  }
3475 
3476  // in future, depending on data types of this uncompressed stream
3477  // it can be feed into other function such like importParquet, etc
3478  ret = importDelimited(file_path, true);
3479  } catch (...) {
3480  if (!teptr) { // no replace
3481  teptr = std::current_exception();
3482  }
3483  }
3484 
3485  if (p_file) {
3486  fclose(p_file);
3487  }
3488  p_file = 0;
3489  });
3490 
3491  // create a thread to iterate all files (in all archives) and
3492  // forward the uncompressed byte stream to fd[1] which is
3493  // then feed into importDelimited, importParquet, and etc.
3494  auto th_pipe_writer = std::thread([&]() {
3495  std::unique_ptr<S3Archive> us3arch;
3496  bool stop = false;
3497  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3498  try {
3499  auto file_path = file_paths[fi];
3500  std::unique_ptr<Archive> uarch;
3501  std::map<int, std::string> url_parts;
3502  Archive::parse_url(file_path, url_parts);
3503  const std::string S3_objkey_url_scheme = "s3ok";
3504  if ("file" == url_parts[2] || "" == url_parts[2]) {
3505  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3506  } else if ("s3" == url_parts[2]) {
3507 #ifdef HAVE_AWS_S3
3508  // new a S3Archive with a shared s3client.
3509  // should be safe b/c no wildcard with s3 url
3510  us3arch.reset(new S3Archive(file_path,
3516  us3arch->init_for_read();
3517  total_file_size += us3arch->get_total_file_size();
3518  // not land all files here but one by one in following iterations
3519  for (const auto& objkey : us3arch->get_objkeys()) {
3520  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3521  }
3522  continue;
3523 #else
3524  throw std::runtime_error("AWS S3 support not available");
3525 #endif // HAVE_AWS_S3
3526  } else if (S3_objkey_url_scheme == url_parts[2]) {
3527 #ifdef HAVE_AWS_S3
3528  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3529  auto file_path =
3530  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3531  if (0 == file_path.size()) {
3532  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3533  }
3534  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3535  // file not removed until file closed
3536  us3arch->vacuum(objkey);
3537 #else
3538  throw std::runtime_error("AWS S3 support not available");
3539 #endif // HAVE_AWS_S3
3540  }
3541 #if 0 // TODO(ppan): implement and enable any other archive class
3542  else
3543  if ("hdfs" == url_parts[2])
3544  uarch.reset(new HdfsArchive(file_path));
3545 #endif
3546  else {
3547  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3548  }
3549 
3550  // init the archive for read
3551  auto& arch = *uarch;
3552 
3553  // coming here, the archive of url should be ready to be read, unarchived
3554  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3555  const void* buf;
3556  size_t size;
3557  bool just_saw_archive_header;
3558  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3559  bool first_text_header_skipped = false;
3560  // start reading uncompressed bytes of this archive from libarchive
3561  // note! this archive may contain more than one files!
3562  file_offsets.push_back(0);
3563  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3564  bool insert_line_delim_after_this_file = false;
3565  while (!stop) {
3566  int64_t offset{-1};
3567  auto ok = arch.read_data_block(&buf, &size, &offset);
3568  // can't use (uncompressed) size, so track (max) file offset.
3569  // also we want to capture offset even on e.o.f.
3570  if (offset > 0) {
3571  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3572  file_offsets.back() = offset;
3573  }
3574  if (!ok) {
3575  break;
3576  }
3577  // one subtle point here is now we concatenate all files
3578  // to a single FILE stream with which we call importDelimited
3579  // only once. this would make it misunderstand that only one
3580  // header line is with this 'single' stream, while actually
3581  // we may have one header line for each of the files.
3582  // so we need to skip header lines here instead in importDelimited.
3583  const char* buf2 = (const char*)buf;
3584  int size2 = size;
3586  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3587  while (size2-- > 0) {
3588  if (*buf2++ == copy_params.line_delim) {
3589  break;
3590  }
3591  }
3592  if (size2 <= 0) {
3593  LOG(WARNING) << "No line delimiter in block." << std::endl;
3594  }
3595  just_saw_archive_header = false;
3596  first_text_header_skipped = true;
3597  }
3598  // In very rare occasions the write pipe somehow operates in a mode similar to
3599  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3600  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3601  // loop reading till no bytes left, otherwise the annoying `failed to write
3602  // pipe: Success`...
3603  if (size2 > 0) {
3604  int nremaining = size2;
3605  while (nremaining > 0) {
3606  // try to write the entire remainder of the buffer to the pipe
3607  int nwritten = write(fd[1], buf2, nremaining);
3608  // how did we do?
3609  if (nwritten < 0) {
3610  // something bad happened
3611  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3612  // ignore these, assume nothing written, try again
3613  nwritten = 0;
3614  } else {
3615  // a real error
3616  throw std::runtime_error(
3617  std::string("failed or interrupted write to pipe: ") +
3618  strerror(errno));
3619  }
3620  } else if (nwritten == nremaining) {
3621  // we wrote everything; we're done
3622  break;
3623  }
3624  // only wrote some (or nothing), try again
3625  nremaining -= nwritten;
3626  buf2 += nwritten;
3627  // no exception when too many rejected
3628  // @simon.eves how would this get set? from the other thread? mutex
3629  // needed?
3631  stop = true;
3632  break;
3633  }
3634  }
3635  // check that this file (buf for size) ended with a line delim
3636  if (size > 0) {
3637  const char* plast = static_cast<const char*>(buf) + (size - 1);
3638  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3639  }
3640  }
3641  }
3642  // if that file didn't end with a line delim, we insert one here to terminate
3643  // that file's stream use a loop for the same reason as above
3644  if (insert_line_delim_after_this_file) {
3645  while (true) {
3646  // write the delim char to the pipe
3647  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3648  // how did we do?
3649  if (nwritten < 0) {
3650  // something bad happened
3651  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3652  // ignore these, assume nothing written, try again
3653  nwritten = 0;
3654  } else {
3655  // a real error
3656  throw std::runtime_error(
3657  std::string("failed or interrupted write to pipe: ") +
3658  strerror(errno));
3659  }
3660  } else if (nwritten == 1) {
3661  // we wrote it; we're done
3662  break;
3663  }
3664  }
3665  }
3666  }
3667  } catch (...) {
3668  // when import is aborted because too many data errors or because end of a
3669  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3670  // suppressed.
3671  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3673  break;
3674  }
3675  if (import_status.rows_completed > 0) {
3676  if (nullptr != dynamic_cast<Detector*>(this)) {
3677  break;
3678  }
3679  }
3680  if (!teptr) { // no replace
3681  teptr = std::current_exception();
3682  }
3683  break;
3684  }
3685  }
3686  // close writer end
3687  close(fd[1]);
3688  });
3689 
3690  th_pipe_reader.join();
3691  th_pipe_writer.join();
3692 
3693  // rethrow any exception happened herebefore
3694  if (teptr) {
3695  std::rethrow_exception(teptr);
3696  }
3697 }
3698 
3701 }
3702 
3703 ImportStatus Importer::importDelimited(const std::string& file_path,
3704  const bool decompressed) {
3705  bool load_truncated = false;
3707 
3708  if (!p_file) {
3709  p_file = fopen(file_path.c_str(), "rb");
3710  }
3711  if (!p_file) {
3712  throw std::runtime_error("failed to open file '" + file_path +
3713  "': " + strerror(errno));
3714  }
3715 
3716  if (!decompressed) {
3717  (void)fseek(p_file, 0, SEEK_END);
3718  file_size = ftell(p_file);
3719  }
3720 
3721  if (copy_params.threads == 0) {
3722  max_threads = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
3723  } else {
3724  max_threads = static_cast<size_t>(copy_params.threads);
3725  }
3726 
3727  // deal with small files
3728  size_t alloc_size = copy_params.buffer_size;
3729  if (!decompressed && file_size < alloc_size) {
3730  alloc_size = file_size;
3731  }
3732 
3733  for (size_t i = 0; i < max_threads; i++) {
3734  import_buffers_vec.emplace_back();
3735  for (const auto cd : loader->get_column_descs()) {
3736  import_buffers_vec[i].push_back(std::unique_ptr<TypedImportBuffer>(
3737  new TypedImportBuffer(cd, loader->getStringDict(cd))));
3738  }
3739  }
3740 
3741  auto scratch_buffer = std::make_unique<char[]>(alloc_size);
3742  size_t current_pos = 0;
3743  size_t end_pos;
3744  bool eof_reached = false;
3745  size_t begin_pos = 0;
3746 
3747  (void)fseek(p_file, current_pos, SEEK_SET);
3748  size_t size =
3749  fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size, p_file);
3750 
3751  // make render group analyzers for each poly column
3752  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
3753  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
3754  loader->getTableDesc()->tableId, false, false, false);
3755  for (auto cd : columnDescriptors) {
3756  SQLTypes ct = cd->columnType.get_type();
3757  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
3758  auto rga = std::make_shared<RenderGroupAnalyzer>();
3759  rga->seedFromExistingTableContents(loader, cd->columnName);
3760  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
3761  }
3762  }
3763 
3764  ChunkKey chunkKey = {loader->getCatalog().getCurrentDB().dbId,
3765  loader->getTableDesc()->tableId};
3766  auto start_epoch = loader->getTableEpoch();
3767  {
3768  std::list<std::future<ImportStatus>> threads;
3769 
3770  // use a stack to track thread_ids which must not overlap among threads
3771  // because thread_id is used to index import_buffers_vec[]
3772  std::stack<size_t> stack_thread_ids;
3773  for (size_t i = 0; i < max_threads; i++) {
3774  stack_thread_ids.push(i);
3775  }
3776 
3777  size_t first_row_index_this_buffer = 0;
3778 
3779  while (size > 0) {
3780  CHECK(scratch_buffer);
3781  if (eof_reached) {
3782  end_pos = size;
3783  } else {
3784  end_pos = find_end(scratch_buffer.get(), size, copy_params);
3785  }
3786  // unput residual
3787  int nresidual = size - end_pos;
3788  std::unique_ptr<char[]> unbuf;
3789  if (nresidual > 0) {
3790  unbuf = std::make_unique<char[]>(nresidual);
3791  memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
3792  }
3793 
3794  // added for true row index on error
3795  unsigned int num_rows_this_buffer = 0;
3796  {
3797  // we could multi-thread this, but not worth it
3798  // additional cost here is ~1.4ms per chunk and
3799  // probably free because this thread will spend
3800  // most of its time waiting for the child threads
3801  char* p = scratch_buffer.get() + begin_pos;
3802  char* pend = scratch_buffer.get() + end_pos;
3803  char d = copy_params.line_delim;
3804  while (p < pend) {
3805  if (*p++ == d) {
3806  num_rows_this_buffer++;
3807  }
3808  }
3809  }
3810 
3811  // get a thread_id not in use
3812  auto thread_id = stack_thread_ids.top();
3813  stack_thread_ids.pop();
3814  // LOG(INFO) << " stack_thread_ids.pop " << thread_id << std::endl;
3815 
3816  threads.push_back(std::async(std::launch::async,
3818  thread_id,
3819  this,
3820  std::move(scratch_buffer),
3821  begin_pos,
3822  end_pos,
3823  end_pos,
3824  columnIdToRenderGroupAnalyzerMap,
3825  first_row_index_this_buffer));
3826 
3827  first_row_index_this_buffer += num_rows_this_buffer;
3828 
3829  current_pos += end_pos;
3830  scratch_buffer = std::make_unique<char[]>(alloc_size);
3831  CHECK(scratch_buffer);
3832  memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
3833  size = nresidual + fread(scratch_buffer.get() + nresidual,
3834  1,
3835  copy_params.buffer_size - nresidual,
3836  p_file);
3837  if (size < copy_params.buffer_size && feof(p_file)) {
3838  eof_reached = true;
3839  }
3840 
3841  begin_pos = 0;
3842 
3843  while (threads.size() > 0) {
3844  int nready = 0;
3845  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
3846  it != threads.end();) {
3847  auto& p = *it;
3848  std::chrono::milliseconds span(
3849  0); //(std::distance(it, threads.end()) == 1? 1: 0);
3850  if (p.wait_for(span) == std::future_status::ready) {
3851  auto ret_import_status = p.get();
3852  import_status += ret_import_status;
3853  // sum up current total file offsets
3854  size_t total_file_offset{0};
3855  if (decompressed) {
3856  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3857  for (const auto file_offset : file_offsets) {
3858  total_file_offset += file_offset;
3859  }
3860  }
3861  // estimate number of rows per current total file offset
3862  if (decompressed ? total_file_offset : current_pos) {
3864  (decompressed ? (float)total_file_size / total_file_offset
3865  : (float)file_size / current_pos) *
3867  }
3868  VLOG(3) << "rows_completed " << import_status.rows_completed
3869  << ", rows_estimated " << import_status.rows_estimated
3870  << ", total_file_size " << total_file_size << ", total_file_offset "
3871  << total_file_offset;
3873  // recall thread_id for reuse
3874  stack_thread_ids.push(ret_import_status.thread_id);
3875  threads.erase(it++);
3876  ++nready;
3877  } else {
3878  ++it;
3879  }
3880  }
3881 
3882  if (nready == 0) {
3883  std::this_thread::yield();
3884  }
3885 
3886  // on eof, wait all threads to finish
3887  if (0 == size) {
3888  continue;
3889  }
3890 
3891  // keep reading if any free thread slot
3892  // this is one of the major difference from old threading model !!
3893  if (threads.size() < max_threads) {
3894  break;
3895  }
3896  }
3897 
3899  load_truncated = true;
3900  load_failed = true;
3901  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
3902  break;
3903  }
3904  if (load_failed) {
3905  load_truncated = true;
3906  LOG(ERROR) << "A call to the Loader::load failed, Please review the logs for "
3907  "more details";
3908  break;
3909  }
3910  }
3911 
3912  // join dangling threads in case of LOG(ERROR) above
3913  for (auto& p : threads) {
3914  p.wait();
3915  }
3916  }
3917 
3918  checkpoint(start_epoch);
3919 
3920  // must set import_status.load_truncated before closing this end of pipe
3921  // otherwise, the thread on the other end would throw an unwanted 'write()'
3922  // exception
3923  mapd_lock_guard<mapd_shared_mutex> write_lock(status_mutex);
3924  import_status.load_truncated = load_truncated;
3925 
3926  fclose(p_file);
3927  p_file = nullptr;
3928  return import_status;
3929 }
3930 
3932  if (getTableDesc()->persistenceLevel ==
3933  Data_Namespace::MemoryLevel::DISK_LEVEL) { // only checkpoint disk-resident tables
3934  getCatalog().checkpoint(getTableDesc()->tableId);
3935  }
3936 }
3937 
3939  return getCatalog().getTableEpoch(getCatalog().getCurrentDB().dbId,
3940  getTableDesc()->tableId);
3941 }
3942 
3943 void Loader::setTableEpoch(const int32_t start_epoch) {
3945  getCatalog().getCurrentDB().dbId, getTableDesc()->tableId, start_epoch);
3946 }
3947 
3948 void GDALErrorHandler(CPLErr eErrClass, int err_no, const char* msg) {
3949  CHECK(eErrClass >= CE_None && eErrClass <= CE_Fatal);
3950  static const char* errClassStrings[5] = {
3951  "Info",
3952  "Debug",
3953  "Warning",
3954  "Failure",
3955  "Fatal",
3956  };
3957  std::string log_msg = std::string("GDAL ") + errClassStrings[eErrClass] +
3958  std::string(": ") + msg + std::string(" (") +
3959  std::to_string(err_no) + std::string(")");
3960  if (eErrClass >= CE_Failure) {
3961  throw std::runtime_error(log_msg);
3962  } else {
3963  LOG(INFO) << log_msg;
3964  }
3965 }
3966 
3967 /* static */
3968 std::mutex Importer::init_gdal_mutex;
3969 
3970 /* static */
3972  // this should not be called from multiple threads, but...
3973  std::lock_guard<std::mutex> guard(Importer::init_gdal_mutex);
3974  // init under mutex
3975  static bool gdal_initialized = false;
3976  if (!gdal_initialized) {
3977  // FIXME(andrewseidl): investigate if CPLPushFinderLocation can be public
3978  setenv("GDAL_DATA",
3979  std::string(mapd_root_abs_path() + "/ThirdParty/gdal-data").c_str(),
3980  true);
3981 
3982  // configure SSL certificate path (per S3Archive::init_for_read)
3983  // in a production build, GDAL and Curl will have been built on
3984  // CentOS, so the baked-in system path will be wrong for Ubuntu
3985  // and other Linux distros. Unless the user is deliberately
3986  // overriding it by setting SSL_CERT_FILE explicitly in the server
3987  // environment, we set it to whichever CA bundle directory exists
3988  // on the machine we're running on
3989  std::list<std::string> v_known_ca_paths({
3990  "/etc/ssl/certs/ca-certificates.crt",
3991  "/etc/pki/tls/certs/ca-bundle.crt",
3992  "/usr/share/ssl/certs/ca-bundle.crt",
3993  "/usr/local/share/certs/ca-root.crt",
3994  "/etc/ssl/cert.pem",
3995  "/etc/ssl/ca-bundle.pem",
3996  });
3997  for (const auto& known_ca_path : v_known_ca_paths) {
3998  if (boost::filesystem::exists(known_ca_path)) {
3999  LOG(INFO) << "GDAL SSL Certificate path: " << known_ca_path;
4000  setenv("SSL_CERT_FILE", known_ca_path.c_str(), false); // no overwrite
4001  break;
4002  }
4003  }
4004 
4005  GDALAllRegister();
4006  OGRRegisterAll();
4007  CPLSetErrorHandler(*GDALErrorHandler);
4008  LOG(INFO) << "GDAL Initialized: " << GDALVersionInfo("--version");
4009  gdal_initialized = true;
4010  }
4011 }
4012 
4014  return GetGDALDriverManager()->GetDriverByName("libkml") != nullptr;
4015 }
4016 
4017 /* static */
4019  // for now we only support S3
4020  // @TODO generalize CopyParams to have a dictionary of GDAL tokens
4021  // only set if non-empty to allow GDAL defaults to persist
4022  // explicitly clear if empty to revert to default and not reuse a previous session's
4023  // keys
4024  if (copy_params.s3_region.size()) {
4025 #if DEBUG_AWS_AUTHENTICATION
4026  LOG(INFO) << "GDAL: Setting AWS_REGION to '" << copy_params.s3_region << "'";
4027 #endif
4028  CPLSetConfigOption("AWS_REGION", copy_params.s3_region.c_str());
4029  } else {
4030 #if DEBUG_AWS_AUTHENTICATION
4031  LOG(INFO) << "GDAL: Clearing AWS_REGION";
4032 #endif
4033  CPLSetConfigOption("AWS_REGION", nullptr);
4034  }
4035  if (copy_params.s3_endpoint.size()) {
4036 #if DEBUG_AWS_AUTHENTICATION
4037  LOG(INFO) << "GDAL: Setting AWS_S3_ENDPOINT to '" << copy_params.s3_endpoint << "'";
4038 #endif
4039  CPLSetConfigOption("AWS_S3_ENDPOINT", copy_params.s3_endpoint.c_str());
4040  } else {
4041 #if DEBUG_AWS_AUTHENTICATION
4042  LOG(INFO) << "GDAL: Clearing AWS_S3_ENDPOINT";
4043 #endif
4044  CPLSetConfigOption("AWS_S3_ENDPOINT", nullptr);
4045  }
4046  if (copy_params.s3_access_key.size()) {
4047 #if DEBUG_AWS_AUTHENTICATION
4048  LOG(INFO) << "GDAL: Setting AWS_ACCESS_KEY_ID to '" << copy_params.s3_access_key
4049  << "'";
4050 #endif
4051  CPLSetConfigOption("AWS_ACCESS_KEY_ID", copy_params.s3_access_key.c_str());
4052  } else {
4053 #if DEBUG_AWS_AUTHENTICATION
4054  LOG(INFO) << "GDAL: Clearing AWS_ACCESS_KEY_ID";
4055 #endif
4056  CPLSetConfigOption("AWS_ACCESS_KEY_ID", nullptr);
4057  }
4058  if (copy_params.s3_secret_key.size()) {
4059 #if DEBUG_AWS_AUTHENTICATION
4060  LOG(INFO) << "GDAL: Setting AWS_SECRET_ACCESS_KEY to '" << copy_params.s3_secret_key
4061  << "'";
4062 #endif
4063  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", copy_params.s3_secret_key.c_str());
4064  } else {
4065 #if DEBUG_AWS_AUTHENTICATION
4066  LOG(INFO) << "GDAL: Clearing AWS_SECRET_ACCESS_KEY";
4067 #endif
4068  CPLSetConfigOption("AWS_SECRET_ACCESS_KEY", nullptr);
4069  }
4070 
4071 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4072  // if we haven't set keys, we need to disable signed access
4073  if (copy_params.s3_access_key.size() || copy_params.s3_secret_key.size()) {
4074 #if DEBUG_AWS_AUTHENTICATION
4075  LOG(INFO) << "GDAL: Clearing AWS_NO_SIGN_REQUEST";
4076 #endif
4077  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", nullptr);
4078  } else {
4079 #if DEBUG_AWS_AUTHENTICATION
4080  LOG(INFO) << "GDAL: Setting AWS_NO_SIGN_REQUEST to 'YES'";
4081 #endif
4082  CPLSetConfigOption("AWS_NO_SIGN_REQUEST", "YES");
4083  }
4084 #endif
4085 }
4086 
4087 /* static */
4088 OGRDataSource* Importer::openGDALDataset(const std::string& file_name,
4089  const CopyParams& copy_params) {
4090  // lazy init GDAL
4091  initGDAL();
4092 
4093  // set authorization tokens
4094  setGDALAuthorizationTokens(copy_params);
4095 
4096  // open the file
4097  OGRDataSource* poDS;
4098 #if GDAL_VERSION_MAJOR == 1
4099  poDS = (OGRDataSource*)OGRSFDriverRegistrar::Open(file_name.c_str(), false);
4100 #else
4101  poDS = (OGRDataSource*)GDALOpenEx(
4102  file_name.c_str(), GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4103  if (poDS == nullptr) {
4104  poDS = (OGRDataSource*)GDALOpenEx(
4105  file_name.c_str(), GDAL_OF_READONLY | GDAL_OF_VECTOR, nullptr, nullptr, nullptr);
4106  if (poDS) {
4107  LOG(INFO) << "openGDALDataset had to open as read-only";
4108  }
4109  }
4110 #endif
4111  if (poDS == nullptr) {
4112  LOG(ERROR) << "openGDALDataset Error: " << CPLGetLastErrorMsg();
4113  }
4114  // NOTE(adb): If extending this function, refactor to ensure any errors will not result
4115  // in a memory leak if GDAL successfully opened the input dataset.
4116  return poDS;
4117 }
4118 
4119 namespace {
4120 
4121 OGRLayer& getLayerWithSpecifiedName(const std::string& geo_layer_name,
4122  const OGRDataSourceUqPtr& poDS,
4123  const std::string& file_name) {
4124  // get layer with specified name, or default to first layer
4125  OGRLayer* poLayer = nullptr;
4126  if (geo_layer_name.size()) {
4127  poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4128  if (poLayer == nullptr) {
4129  throw std::runtime_error("Layer '" + geo_layer_name + "' not found in " +
4130  file_name);
4131  }
4132  } else {
4133  poLayer = poDS->GetLayer(0);
4134  if (poLayer == nullptr) {
4135  throw std::runtime_error("No layers found in " + file_name);
4136  }
4137  }
4138  return *poLayer;
4139 }
4140 
4141 } // namespace
4142 
4143 /* static */
4145  const std::string& file_name,
4146  const std::string& geo_column_name,
4147  std::map<std::string, std::vector<std::string>>& metadata,
4148  int rowLimit,
4149  const CopyParams& copy_params) {
4150  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4151  if (poDS == nullptr) {
4152  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4153  file_name);
4154  }
4155 
4156  OGRLayer& layer =
4157  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4158 
4159  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4160  CHECK(poFDefn);
4161 
4162  // typeof GetFeatureCount() is different between GDAL 1.x (int32_t) and 2.x (int64_t)
4163  auto nFeats = layer.GetFeatureCount();
4164  size_t numFeatures =
4165  std::max(static_cast<decltype(nFeats)>(0),
4166  std::min(static_cast<decltype(nFeats)>(rowLimit), nFeats));
4167  for (auto iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4168  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4169  // FIXME(andrewseidl): change this to the faster one used by readVerticesFromGDAL
4170  metadata.emplace(poFieldDefn->GetNameRef(), std::vector<std::string>(numFeatures));
4171  }
4172  metadata.emplace(geo_column_name, std::vector<std::string>(numFeatures));
4173  layer.ResetReading();
4174  size_t iFeature = 0;
4175  while (iFeature < numFeatures) {
4176  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4177  if (!poFeature) {
4178  break;
4179  }
4180 
4181  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4182  if (poGeometry != nullptr) {
4183  // validate geom type (again?)
4184  switch (wkbFlatten(poGeometry->getGeometryType())) {
4185  case wkbPoint:
4186  case wkbLineString:
4187  case wkbPolygon:
4188  case wkbMultiPolygon:
4189  break;
4190  default:
4191  throw std::runtime_error("Unsupported geometry type: " +
4192  std::string(poGeometry->getGeometryName()));
4193  }
4194 
4195  // populate metadata for regular fields
4196  for (auto i : metadata) {
4197  auto iField = poFeature->GetFieldIndex(i.first.c_str());
4198  if (iField >= 0) { // geom is -1
4199  metadata[i.first].at(iFeature) =
4200  std::string(poFeature->GetFieldAsString(iField));
4201  }
4202  }
4203 
4204  // populate metadata for geo column with WKT string
4205  char* wkts = nullptr;
4206  poGeometry->exportToWkt(&wkts);
4207  CHECK(wkts);
4208  metadata[geo_column_name].at(iFeature) = wkts;
4209  CPLFree(wkts);
4210  }
4211  iFeature++;
4212  }
4213 }
4214 
4215 std::pair<SQLTypes, bool> ogr_to_type(const OGRFieldType& ogr_type) {
4216  switch (ogr_type) {
4217  case OFTInteger:
4218  return std::make_pair(kINT, false);
4219  case OFTIntegerList:
4220  return std::make_pair(kINT, true);
4221 #if GDAL_VERSION_MAJOR > 1
4222  case OFTInteger64:
4223  return std::make_pair(kBIGINT, false);
4224  case OFTInteger64List:
4225  return std::make_pair(kBIGINT, true);
4226 #endif
4227  case OFTReal:
4228  return std::make_pair(kDOUBLE, false);
4229  case OFTRealList:
4230  return std::make_pair(kDOUBLE, true);
4231  case OFTString:
4232  return std::make_pair(kTEXT, false);
4233  case OFTStringList:
4234  return std::make_pair(kTEXT, true);
4235  case OFTDate:
4236  return std::make_pair(kDATE, false);
4237  case OFTTime:
4238  return std::make_pair(kTIME, false);
4239  case OFTDateTime:
4240  return std::make_pair(kTIMESTAMP, false);
4241  case OFTBinary:
4242  default:
4243  break;
4244  }
4245  throw std::runtime_error("Unknown OGR field type: " + std::to_string(ogr_type));
4246 }
4247 
4248 SQLTypes ogr_to_type(const OGRwkbGeometryType& ogr_type) {
4249  switch (ogr_type) {
4250  case wkbPoint:
4251  return kPOINT;
4252  case wkbLineString:
4253  return kLINESTRING;
4254  case wkbPolygon:
4255  return kPOLYGON;
4256  case wkbMultiPolygon:
4257  return kMULTIPOLYGON;
4258  default:
4259  break;
4260  }
4261  throw std::runtime_error("Unknown OGR geom type: " + std::to_string(ogr_type));
4262 }
4263 
4264 /* static */
4265 const std::list<ColumnDescriptor> Importer::gdalToColumnDescriptors(
4266  const std::string& file_name,
4267  const std::string& geo_column_name,
4268  const CopyParams& copy_params) {
4269  std::list<ColumnDescriptor> cds;
4270 
4271  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4272  if (poDS == nullptr) {
4273  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4274  file_name);
4275  }
4276 
4277  OGRLayer& layer =
4278  getLayerWithSpecifiedName(copy_params.geo_layer_name, poDS, file_name);
4279 
4280  layer.ResetReading();
4281  // TODO(andrewseidl): support multiple features
4282  OGRFeatureUqPtr poFeature(layer.GetNextFeature());
4283  if (poFeature == nullptr) {
4284  throw std::runtime_error("No features found in " + file_name);
4285  }
4286  // get fields as regular columns
4287  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4288  CHECK(poFDefn);
4289  int iField;
4290  for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4291  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4292  auto typePair = ogr_to_type(poFieldDefn->GetType());
4293  ColumnDescriptor cd;
4294  cd.columnName = poFieldDefn->GetNameRef();
4295  cd.sourceName = poFieldDefn->GetNameRef();
4296  SQLTypeInfo ti;
4297  if (typePair.second) {
4298  ti.set_type(kARRAY);
4299  ti.set_subtype(typePair.first);
4300  } else {
4301  ti.set_type(typePair.first);
4302  }
4303  if (typePair.first == kTEXT) {
4305  ti.set_comp_param(32);
4306  }
4307  ti.set_fixed_size();
4308  cd.columnType = ti;
4309  cds.push_back(cd);
4310  }
4311  // get geo column, if any
4312  OGRGeometry* poGeometry = poFeature->GetGeometryRef();
4313  if (poGeometry) {
4314  ColumnDescriptor cd;
4315  cd.columnName = geo_column_name;
4316  cd.sourceName = geo_column_name;
4317  SQLTypes geoType = ogr_to_type(wkbFlatten(poGeometry->getGeometryType()));
4318  if (PROMOTE_POLYGON_TO_MULTIPOLYGON) {
4319  geoType = (geoType == kPOLYGON) ? kMULTIPOLYGON : geoType;
4320  }
4321  SQLTypeInfo ti;
4322  ti.set_type(geoType);
4323  ti.set_subtype(copy_params.geo_coords_type);
4324  ti.set_input_srid(copy_params.geo_coords_srid);
4325  ti.set_output_srid(copy_params.geo_coords_srid);
4326  ti.set_compression(copy_params.geo_coords_encoding);
4327  ti.set_comp_param(copy_params.geo_coords_comp_param);
4328  cd.columnType = ti;
4329  cds.push_back(cd);
4330  }
4331  return cds;
4332 }
4333 
4334 bool Importer::gdalStatInternal(const std::string& path,
4335  const CopyParams& copy_params,
4336  bool also_dir) {
4337  // lazy init GDAL
4338  initGDAL();
4339 
4340  // set authorization tokens
4341  setGDALAuthorizationTokens(copy_params);
4342 
4343 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
4344  // clear GDAL stat cache
4345  // without this, file existence will be cached, even if authentication changes
4346  // supposed to be available from GDAL 2.2.1 but our CentOS build disagrees
4347  VSICurlClearCache();
4348 #endif
4349 
4350  // stat path
4351  VSIStatBufL sb;
4352  int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
4353  if (result < 0) {
4354  return false;
4355  }
4356 
4357  // exists?
4358  if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
4359  return true;
4360  } else if (VSI_ISREG(sb.st_mode)) {
4361  return true;
4362  }
4363  return false;
4364 }
4365 
4366 /* static */
4367 bool Importer::gdalFileExists(const std::string& path, const CopyParams& copy_params) {
4368  return gdalStatInternal(path, copy_params, false);
4369 }
4370 
4371 /* static */
4372 bool Importer::gdalFileOrDirectoryExists(const std::string& path,
4373  const CopyParams& copy_params) {
4374  return gdalStatInternal(path, copy_params, true);
4375 }
4376 
4377 void gdalGatherFilesInArchiveRecursive(const std::string& archive_path,
4378  std::vector<std::string>& files) {
4379  // prepare to gather subdirectories
4380  std::vector<std::string> subdirectories;
4381 
4382  // get entries
4383  char** entries = VSIReadDir(archive_path.c_str());
4384  if (!entries) {
4385  LOG(WARNING) << "Failed to get file listing at archive: " << archive_path;
4386  return;
4387  }
4388 
4389  // force scope
4390  {
4391  // request clean-up
4392  ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
4393 
4394  // check all the entries
4395  int index = 0;
4396  while (true) {
4397  // get next entry, or drop out if there isn't one
4398  char* entry_c = entries[index++];
4399  if (!entry_c) {
4400  break;
4401  }
4402  std::string entry(entry_c);
4403 
4404  // ignore '.' and '..'
4405  if (entry == "." || entry == "..") {
4406  continue;
4407  }
4408 
4409  // build the full path
4410  std::string entry_path = archive_path + std::string("/") + entry;
4411 
4412  // is it a file or a sub-folder
4413  VSIStatBufL sb;
4414  int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
4415  if (result < 0) {
4416  break;
4417  }
4418 
4419  if (VSI_ISDIR(sb.st_mode)) {
4420  // a directory that ends with .gdb could be a Geodatabase bundle
4421  // arguably dangerous to decide this purely by name, but any further
4422  // validation would be very complex especially at this scope
4423  if (boost::iends_with(entry_path, ".gdb")) {
4424  // add the directory as if it was a file and don't recurse into it
4425  files.push_back(entry_path);
4426  } else {
4427  // add subdirectory to be recursed into
4428  subdirectories.push_back(entry_path);
4429  }
4430  } else {
4431  // add this file
4432  files.push_back(entry_path);
4433  }
4434  }
4435  }
4436 
4437  // recurse into each subdirectories we found
4438  for (const auto& subdirectory : subdirectories) {
4439  gdalGatherFilesInArchiveRecursive(subdirectory, files);
4440  }
4441 }
4442 
4443 /* static */
4444 std::vector<std::string> Importer::gdalGetAllFilesInArchive(
4445  const std::string& archive_path,
4446  const CopyParams& copy_params) {
4447  // lazy init GDAL
4448  initGDAL();
4449 
4450  // set authorization tokens
4451  setGDALAuthorizationTokens(copy_params);
4452 
4453  // prepare to gather files
4454  std::vector<std::string> files;
4455 
4456  // gather the files recursively
4457  gdalGatherFilesInArchiveRecursive(archive_path, files);
4458 
4459  // convert to relative paths inside archive
4460  for (auto& file : files) {
4461  file.erase(0, archive_path.size() + 1); // remove archive_path and the slash
4462  }
4463 
4464  // done
4465  return files;
4466 }
4467 
4468 /* static */
4469 std::vector<Importer::GeoFileLayerInfo> Importer::gdalGetLayersInGeoFile(
4470  const std::string& file_name,
4471  const CopyParams& copy_params) {
4472  // lazy init GDAL
4473  initGDAL();
4474 
4475  // set authorization tokens
4476  setGDALAuthorizationTokens(copy_params);
4477 
4478  // prepare to gather layer info
4479  std::vector<GeoFileLayerInfo> layer_info;
4480 
4481  // open the data set
4482  OGRDataSourceUqPtr poDS(openGDALDataset(file_name, copy_params));
4483  if (poDS == nullptr) {
4484  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4485  file_name);
4486  }
4487 
4488  // enumerate the layers
4489  for (auto&& poLayer : poDS->GetLayers()) {
4491  // prepare to read this layer
4492  poLayer->ResetReading();
4493  // skip layer if empty
4494  if (poLayer->GetFeatureCount() > 0) {
4495  // get first feature
4496  OGRFeatureUqPtr first_feature(poLayer->GetNextFeature());
4497  CHECK(first_feature);
4498  // check feature for geometry
4499  const OGRGeometry* geometry = first_feature->GetGeometryRef();
4500  if (!geometry) {
4501  // layer has no geometry
4502  contents = GeoFileLayerContents::NON_GEO;
4503  } else {
4504  // check the geometry type
4505  const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
4506  switch (wkbFlatten(geometry_type)) {
4507  case wkbPoint:
4508  case wkbLineString:
4509  case wkbPolygon:
4510  case wkbMultiPolygon:
4511  // layer has supported geo
4512  contents = GeoFileLayerContents::GEO;
4513  break;
4514  default:
4515  // layer has unsupported geometry
4517  break;
4518  }
4519  }
4520  }
4521  // store info for this layer
4522  layer_info.emplace_back(poLayer->GetName(), contents);
4523  }
4524 
4525  // done
4526  return layer_info;
4527 }
4528 
4529 /* static */
4531 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 2)
4532  return true;
4533 #else
4534  return false;
4535 #endif
4536 }
4537 
4539  ColumnNameToSourceNameMapType columnNameToSourceNameMap) {
4540  // initial status
4541  bool load_truncated = false;
4543 
4545  if (poDS == nullptr) {
4546  throw std::runtime_error("openGDALDataset Error: Unable to open geo file " +
4547  file_path);
4548  }
4549 
4550  OGRLayer& layer =
4552 
4553  // get the number of features in this layer
4554  size_t numFeatures = layer.GetFeatureCount();
4555 
4556  // build map of metadata field (additional columns) name to index
4557  // use shared_ptr since we need to pass it to the worker
4558  FieldNameToIndexMapType fieldNameToIndexMap;
4559  OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4560  CHECK(poFDefn);
4561  size_t numFields = poFDefn->GetFieldCount();
4562  for (size_t iField = 0; iField < numFields; iField++) {
4563  OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4564  fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
4565  }
4566 
4567  // the geographic spatial reference we want to put everything in
4568  OGRSpatialReferenceUqPtr poGeographicSR(new OGRSpatialReference());
4569  poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
4570 
4571 #if GDAL_VERSION_MAJOR >= 3
4572  // GDAL 3.x (really Proj.4 6.x) now enforces lat, lon order
4573  // this results in X and Y being transposed for angle-based
4574  // coordinate systems. This restores the previous behavior.
4575  poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
4576 #endif
4577 
4578 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4579  // just one "thread"
4580  size_t max_threads = 1;
4581 #else
4582  // how many threads to use
4583  size_t max_threads = 0;
4584  if (copy_params.threads == 0) {
4585  max_threads = sysconf(_SC_NPROCESSORS_CONF);
4586  } else {
4587  max_threads = copy_params.threads;
4588  }
4589 #endif
4590 
4591  // make an import buffer for each thread
4592  CHECK_EQ(import_buffers_vec.size(), 0u);
4593  import_buffers_vec.resize(max_threads);
4594  for (size_t i = 0; i < max_threads; i++) {
4595  for (const auto cd : loader->get_column_descs()) {
4596  import_buffers_vec[i].emplace_back(
4597  new TypedImportBuffer(cd, loader->getStringDict(cd)));
4598  }
4599  }
4600 
4601  // make render group analyzers for each poly column
4602  ColumnIdToRenderGroupAnalyzerMapType columnIdToRenderGroupAnalyzerMap;
4603  auto columnDescriptors = loader->getCatalog().getAllColumnMetadataForTable(
4604  loader->getTableDesc()->tableId, false, false, false);
4605  for (auto cd : columnDescriptors) {
4606  SQLTypes ct = cd->columnType.get_type();
4607  if (ct == kPOLYGON || ct == kMULTIPOLYGON) {
4608  auto rga = std::make_shared<RenderGroupAnalyzer>();
4609  rga->seedFromExistingTableContents(loader, cd->columnName);
4610  columnIdToRenderGroupAnalyzerMap[cd->columnId] = rga;
4611  }
4612  }
4613 
4614 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4615  // threads
4616  std::list<std::future<ImportStatus>> threads;
4617 
4618  // use a stack to track thread_ids which must not overlap among threads
4619  // because thread_id is used to index import_buffers_vec[]
4620  std::stack<size_t> stack_thread_ids;
4621  for (size_t i = 0; i < max_threads; i++) {
4622  stack_thread_ids.push(i);
4623  }
4624 #endif
4625 
4626  // checkpoint the table
4627  auto start_epoch = loader->getTableEpoch();
4628 
4629  // reset the layer
4630  layer.ResetReading();
4631 
4632  static const size_t MAX_FEATURES_PER_CHUNK = 1000;
4633 
4634  // make a features buffer for each thread
4635  std::vector<FeaturePtrVector> features(max_threads);
4636 
4637  // for each feature...
4638  size_t firstFeatureThisChunk = 0;
4639  while (firstFeatureThisChunk < numFeatures) {
4640  // how many features this chunk
4641  size_t numFeaturesThisChunk =
4642  std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
4643 
4644 // get a thread_id not in use
4645 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4646  size_t thread_id = 0;
4647 #else
4648  auto thread_id = stack_thread_ids.top();
4649  stack_thread_ids.pop();
4650  CHECK(thread_id < max_threads);
4651 #endif
4652 
4653  // fill features buffer for new thread
4654  for (size_t i = 0; i < numFeaturesThisChunk; i++) {
4655  features[thread_id].emplace_back(layer.GetNextFeature());
4656  }
4657 
4658 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4659  // call worker function directly
4660  auto ret_import_status = import_thread_shapefile(0,
4661  this,
4662  poGeographicSR.get(),
4663  std::move(features[thread_id]),
4664  firstFeatureThisChunk,
4665  numFeaturesThisChunk,
4666  fieldNameToIndexMap,
4667  columnNameToSourceNameMap,
4668  columnIdToRenderGroupAnalyzerMap);
4669  import_status += ret_import_status;
4670  import_status.rows_estimated = ((float)firstFeatureThisChunk / (float)numFeatures) *
4671  import_status.rows_completed;
4672  set_import_status(import_id, import_status);
4673 #else
4674  // fire up that thread to import this geometry
4675  threads.push_back(std::async(std::launch::async,
4677  thread_id,
4678  this,
4679  poGeographicSR.get(),
4680  std::move(features[thread_id]),
4681  firstFeatureThisChunk,
4682  numFeaturesThisChunk,
4683  fieldNameToIndexMap,
4684  columnNameToSourceNameMap,
4685  columnIdToRenderGroupAnalyzerMap));
4686 
4687  // let the threads run
4688  while (threads.size() > 0) {
4689  int nready = 0;
4690  for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4691  it != threads.end();) {
4692  auto& p = *it;
4693  std::chrono::milliseconds span(
4694  0); //(std::distance(it, threads.end()) == 1? 1: 0);
4695  if (p.wait_for(span) == std::future_status::ready) {
4696  auto ret_import_status = p.get();
4697  import_status += ret_import_status;
4698  import_status.rows_estimated =
4699  ((float)firstFeatureThisChunk / (float)numFeatures) *
4700  import_status.rows_completed;
4701  set_import_status(import_id, import_status);
4702 
4703  // recall thread_id for reuse
4704  stack_thread_ids.push(ret_import_status.thread_id);
4705 
4706  threads.erase(it++);
4707  ++nready;
4708  } else {
4709  ++it;
4710  }
4711  }
4712 
4713  if (nready == 0) {
4714  std::this_thread::yield();
4715  }
4716 
4717  // keep reading if any free thread slot
4718  // this is one of the major difference from old threading model !!
4719  if (threads.size() < max_threads) {
4720  break;
4721  }
4722  }
4723 #endif
4724 
4725  // out of rows?
4726  if (import_status.rows_rejected > copy_params.max_reject) {
4727  load_truncated = true;
4728  load_failed = true;
4729  LOG(ERROR) << "Maximum rows rejected exceeded. Halting load";
4730  break;
4731  }
4732 
4733  // failed?
4734  if (load_failed) {
4735  load_truncated = true;
4736  LOG(ERROR)
4737  << "A call to the Loader::load failed, Please review the logs for more details";
4738  break;
4739  }
4740 
4741  firstFeatureThisChunk += numFeaturesThisChunk;
4742  }
4743 
4744 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
4745  // wait for any remaining threads
4746  if (threads.size()) {
4747  for (auto& p : threads) {
4748  // wait for the thread
4749  p.wait();
4750  // get the result and update the final import status
4751  auto ret_import_status = p.get();
4752  import_status += ret_import_status;
4755  }
4756  }
4757 #endif
4758 
4759  checkpoint(start_epoch);
4760 
4761  // must set import_status.load_truncated before closing this end of pipe
4762  // otherwise, the thread on the other end would throw an unwanted 'write()'
4763  // exception
4764  import_status.load_truncated = load_truncated;
4765  return import_status;
4766 }
4767 
4768 //
4769 // class RenderGroupAnalyzer
4770 //
4771 
4773  const std::unique_ptr<Loader>& loader,
4774  const std::string& geoColumnBaseName) {
4775  // start timer
4776  auto seedTimer = timer_start();
4777 
4778  // get the table descriptor
4779  const auto& cat = loader->getCatalog();
4780  const std::string& tableName = loader->getTableDesc()->tableName;
4781  const auto td = cat.getMetadataForTable(tableName);
4782  CHECK(td);
4783  CHECK(td->fragmenter);
4784 
4785  // start with a fresh tree
4786  _rtree = nullptr;
4787  _numRenderGroups = 0;
4788 
4789  // if the table is empty, just make an empty tree
4790  if (td->fragmenter->getFragmentsForQuery().getPhysicalNumTuples() == 0) {
4792  LOG(INFO) << "DEBUG: Table is empty!";
4793  }
4794  _rtree = std::make_unique<RTree>();
4795  CHECK(_rtree);
4796  return;
4797  }
4798 
4799  // no seeding possible without these two columns
4800  const auto cd_bounds =
4801  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_bounds");
4802  const auto cd_render_group =
4803  cat.getMetadataForColumn(td->tableId, geoColumnBaseName + "_render_group");
4804  if (!cd_bounds || !cd_render_group) {
4805  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4806  " doesn't have bounds or render_group columns!");
4807  }
4808 
4809  // and validate their types
4810  if (cd_bounds->columnType.get_type() != kARRAY ||
4811  cd_bounds->columnType.get_subtype() != kDOUBLE) {
4812  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4813  " bounds column is wrong type!");
4814  }
4815  if (cd_render_group->columnType.get_type() != kINT) {
4816  throw std::runtime_error("RenderGroupAnalyzer: Table " + tableName +
4817  " render_group column is wrong type!");
4818  }
4819 
4820  // get chunk accessor table
4821  auto chunkAccessorTable = getChunkAccessorTable(
4822  cat, td, {geoColumnBaseName + "_bounds", geoColumnBaseName + "_render_group"});
4823  const auto table_count = std::get<0>(chunkAccessorTable.back());
4824 
4826  LOG(INFO) << "DEBUG: Scanning existing table geo column set '" << geoColumnBaseName
4827  << "'";
4828  }
4829 
4830  std::vector<Node> nodes;
4831  try {
4832  nodes.resize(table_count);
4833  } catch (const std::exception& e) {
4834  throw std::runtime_error("RenderGroupAnalyzer failed to reserve memory for " +
4835  std::to_string(table_count) + " rows");
4836  }
4837 
4838  for (size_t row = 0; row < table_count; row++) {
4839  ArrayDatum ad;
4840  VarlenDatum vd;
4841  bool is_end;
4842 
4843  // get ChunkIters and fragment row offset
4844  size_t rowOffset = 0;
4845  auto& chunkIters = getChunkItersAndRowOffset(chunkAccessorTable, row, rowOffset);
4846  auto& boundsChunkIter = chunkIters[0];
4847  auto& renderGroupChunkIter = chunkIters[1];
4848 
4849  // get bounds values
4850  ChunkIter_get_nth(&boundsChunkIter, row - rowOffset, &ad, &is_end);
4851  CHECK(!is_end);
4852  CHECK(ad.pointer);
4853  int numBounds = (int)(ad.length / sizeof(double));
4854  CHECK(numBounds == 4);
4855 
4856  // convert to bounding box
4857  double* bounds = reinterpret_cast<double*>(ad.pointer);
4858  BoundingBox bounding_box;
4859  boost::geometry::assign_inverse(bounding_box);
4860  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4861  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
4862 
4863  // get render group
4864  ChunkIter_get_nth(&renderGroupChunkIter, row - rowOffset, false, &vd, &is_end);
4865  CHECK(!is_end);
4866  CHECK(vd.pointer);
4867  int renderGroup = *reinterpret_cast<int32_t*>(vd.pointer);
4868  CHECK_GE(renderGroup, 0);
4869 
4870  // store
4871  nodes[row] = std::make_pair(bounding_box, renderGroup);
4872 
4873  // how many render groups do we have now?
4874  if (renderGroup >= _numRenderGroups) {
4875  _numRenderGroups = renderGroup + 1;
4876  }
4877 
4879  LOG(INFO) << "DEBUG: Existing row " << row << " has Render Group " << renderGroup;
4880  }
4881  }
4882 
4883  // bulk-load the tree
4884  auto bulk_load_timer = timer_start();
4885  _rtree = std::make_unique<RTree>(nodes);
4886  CHECK(_rtree);
4887  LOG(INFO) << "Scanning render groups of poly column '" << geoColumnBaseName
4888  << "' of table '" << tableName << "' took " << timer_stop(seedTimer) << "ms ("
4889  << timer_stop(bulk_load_timer) << " ms for tree)";
4890 
4892  LOG(INFO) << "DEBUG: Done! Now have " << _numRenderGroups << " Render Groups";
4893  }
4894 }
4895 
4897  const std::vector<double>& bounds) {
4898  // validate
4899  CHECK(bounds.size() == 4);
4900 
4901  // get bounds
4902  BoundingBox bounding_box;
4903  boost::geometry::assign_inverse(bounding_box);
4904  boost::geometry::expand(bounding_box, Point(bounds[0], bounds[1]));
4905  boost::geometry::expand(bounding_box, Point(bounds[2], bounds[3]));
4906 
4907  // remainder under mutex to allow this to be multi-threaded
4908  std::lock_guard<std::mutex> guard(_rtreeMutex);
4909 
4910  // get the intersecting nodes
4911  std::vector<Node> intersects;
4912  _rtree->query(boost::geometry::index::intersects(bounding_box),
4913  std::back_inserter(intersects));
4914 
4915  // build bitset of render groups of the intersecting rectangles
4916  // clear bit means available, allows use of find_first()
4917  boost::dynamic_bitset<> bits(_numRenderGroups);
4918  bits.set();
4919  for (const auto& intersection : intersects) {
4920  CHECK(intersection.second < _numRenderGroups);
4921  bits.reset(intersection.second);
4922  }
4923 
4924  // find first available group
4925  int firstAvailableRenderGroup = 0;
4926  size_t firstSetBit = bits.find_first();
4927  if (firstSetBit == boost::dynamic_bitset<>::npos) {
4928  // all known groups represented, add a new one
4929  firstAvailableRenderGroup = _numRenderGroups;
4930  _numRenderGroups++;
4931  } else {
4932  firstAvailableRenderGroup = (int)firstSetBit;
4933  }
4934 
4935  // insert new node
4936  _rtree->insert(std::make_pair(bounding_box, firstAvailableRenderGroup));
4937 
4938  // return it
4939  return firstAvailableRenderGroup;
4940 }
4941 
4942 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
4944  const ExecutorDeviceType dt)
4945  : QueryRunner(std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, "")) {}
4946 
4947 void ImportDriver::importGeoTable(const std::string& file_path,
4948  const std::string& table_name,
4949  const bool compression,
4950  const bool create_table) {
4952  const std::string geo_column_name(OMNISCI_GEO_PREFIX);
4953 
4954  CopyParams copy_params;
4955  if (compression) {
4957  copy_params.geo_coords_comp_param = 32;
4958  } else {
4960  copy_params.geo_coords_comp_param = 0;
4961  }
4962 
4963  auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
4964  std::map<std::string, std::string> colname_to_src;
4965  for (auto& cd : cds) {
4966  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
4967  const auto ret =
4968  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
4969  CHECK(ret.second);
4970  cd.columnName = col_name_sanitized;
4971  }
4972 
4973  auto& cat = session_info_->getCatalog();
4974 
4975  if (create_table) {
4976  const auto td = cat.getMetadataForTable(table_name);
4977  if (td != nullptr) {
4978  throw std::runtime_error("Error: Table " + table_name +
4979  " already exists. Possible failure to correctly re-create "
4980  "mapd_data directory.");
4981  }
4982  if (table_name != ImportHelpers::sanitize_name(table_name)) {
4983  throw std::runtime_error("Invalid characters in table name: " + table_name);
4984  }
4985 
4986  std::string stmt{"CREATE TABLE " + table_name};
4987  std::vector<std::string> col_stmts;
4988 
4989  for (auto& cd : cds) {
4990  if (cd.columnType.get_type() == SQLTypes::kINTERVAL_DAY_TIME ||
4991  cd.columnType.get_type() == SQLTypes::kINTERVAL_YEAR_MONTH) {
4992  throw std::runtime_error(
4993  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
4994  cd.columnName + " (table: " + table_name + ")");
4995  }
4996 
4997  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
4998  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
4999  cd.columnType.set_precision(14);
5000  cd.columnType.set_scale(7);
5001  }
5002  }
5003 
5004  std::string col_stmt;
5005  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
5006 
5007  if (cd.columnType.get_compression() != EncodingType::kENCODING_NONE) {
5008  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
5009  } else {
5010  if (cd.columnType.is_string()) {
5011  col_stmt.append("ENCODING NONE");
5012  } else if (cd.columnType.is_geometry()) {
5013  if (cd.columnType.get_output_srid() == 4326) {
5014  col_stmt.append("ENCODING NONE");
5015  }
5016  }
5017  }
5018  col_stmts.push_back(col_stmt);
5019  }
5020 
5021  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
5022  runDDLStatement(stmt);
5023 
5024  LOG(INFO) << "Created table: " << table_name;
5025  } else {
5026  LOG(INFO) << "Not creating table: " << table_name;
5027  }
5028 
5029  const auto td = cat.getMetadataForTable(table_name);
5030  if (td == nullptr) {
5031  throw std::runtime_error("Error: Failed to create table " + table_name);
5032  }
5033 
5034  Importer_NS::Importer importer(cat, td, file_path, copy_params);
5035  auto ms = measure<>::execution([&]() { importer.importGDAL(colname_to_src); });
5036  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
5037 }
5038 
5039 } // namespace Importer_NS
static std::map< std::string, ImportStatus > import_status_map
Definition: Importer.cpp:144
int8_t tinyintval
Definition: sqltypes.h:123
std::unique_ptr< Loader > loader
Definition: Importer.h:971
static std::vector< GeoFileLayerInfo > gdalGetLayersInGeoFile(const std::string &file_name, const CopyParams &copy_params)
Definition: Importer.cpp:4469
static void setGDALAuthorizationTokens(const CopyParams &copy_params)
Definition: Importer.cpp:4018
std::pair< SQLTypes, bool > ogr_to_type(const OGRFieldType &ogr_type)
Definition: Importer.cpp:4215
#define CHECK_EQ(x, y)
Definition: Logger.h:195
void distributeToShards(std::vector< OneShardBuffers > &all_shard_import_buffers, std::vector< size_t > &all_shard_row_counts, const OneShardBuffers &import_buffers, const size_t row_count, const size_t shard_count)
Definition: Importer.cpp:2346
static ImportStatus get_import_status(const std::string &id)
Definition: Importer.cpp:205
#define NULL_DOUBLE
Definition: sqltypes.h:176
static const bool is_eol(const char &p, const std::string &line_delims)
Definition: Importer.cpp:229
std::vector< EncodingType > find_best_encodings(const std::vector< std::vector< std::string >>::const_iterator &row_begin, const std::vector< std::vector< std::string >>::const_iterator &row_end, const std::vector< SQLTypes > &best_types)
Definition: Importer.cpp:2951
void d(const SQLTypes expected_type, const std::string &str)
Definition: ImportTest.cpp:268
std::vector< size_t > file_offsets
Definition: Importer.h:758
int8_t * getStringDictBuffer() const
Definition: Importer.h:478
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3454
auto getLoader() const
Definition: Importer.h:956
bool is_time() const
Definition: sqltypes.h:456
ChunkAccessorTable getChunkAccessorTable(const Catalog_Namespace::Catalog &cat, const TableDescriptor *td, const std::vector< std::string > &columnNames)
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
const int8_t const int64_t * num_rows
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::string null_str
Definition: Importer.h:100
Definition: sqltypes.h:51
ImportStatus importDelimited(const std::string &file_path, const bool decompressed) override
Definition: Importer.cpp:2648
SQLTypes
Definition: sqltypes.h:40
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:138
size_t add_arrow_values(const ColumnDescriptor *cd, const arrow::Array &data, const bool exact_type_match, const ArraySliceRange &slice_range, BadRowsTracker *bad_rows_tracker)
Definition: Importer.cpp:959
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:139
std::unique_ptr< OGRSpatialReference, OGRSpatialReferenceDeleter > OGRSpatialReferenceUqPtr
Definition: Importer.cpp:109
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:74
std::string error_context(const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
Definition: ArrowImporter.h:75
virtual void checkpoint()
Definition: Importer.cpp:3931
ExecutorDeviceType
static SQLTypes detect_sqltype(const std::string &str)
Definition: Importer.cpp:2768
#define NULL_ARRAY_DOUBLE
Definition: sqltypes.h:184
T g(const TargetValue &r)
Definition: TestHelpers.h:118
int64_t int_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2299
size_t convert_arrow_val_to_import_buffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, const ArraySliceRange &slice_range, BadRowsTracker *const bad_rows_tracker)
#define LOG(tag)
Definition: Logger.h:182
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
bool boolval
Definition: sqltypes.h:122
int insertBoundsAndReturnRenderGroup(const std::vector< double > &bounds)
Definition: Importer.cpp:4896
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4538
std::chrono::steady_clock::time_point end
Definition: Importer.h:710
~Importer() override
Definition: Importer.cpp:193
std::string mapd_root_abs_path()
Definition: mapdpath.h:30
void c(const std::string &query_string, const ExecutorDeviceType device_type)
bool try_cast(const std::string &str)
Definition: Importer.cpp:2747
std::string join(T const &container, std::string const &delim)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Importer.cpp:1546
SQLTypes geo_coords_type
Definition: Importer.h:128
void set_input_srid(int d)
Definition: sqltypes.h:417
const CopyParams & get_copy_params() const
Definition: Importer.h:887
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:522
#define UNREACHABLE()
Definition: Logger.h:231
#define CHECK_GE(x, y)
Definition: Logger.h:200
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2123
virtual bool load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, const size_t row_count)
Definition: Importer.cpp:2292
std::unique_ptr< bool[]> is_array_a
Definition: Importer.h:972
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:413
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
std::vector< std::unique_ptr< TypedImportBuffer > > OneShardBuffers
Definition: Importer.h:683
void set_fixed_size()
Definition: sqltypes.h:422
boost::geometry::model::point< double, 2, boost::geometry::cs::cartesian > Point
Definition: Importer.h:864
auto value_getter(const Array &array, const ColumnDescriptor *cd, Importer_NS::BadRowsTracker *const bad_rows_tracker)
std::unique_ptr< OGRDataSource, OGRDataSourceDeleter > OGRDataSourceUqPtr
Definition: Importer.cpp:90
const std::list< const ColumnDescriptor * > & get_column_descs() const
Definition: Importer.h:888
std::unique_ptr< OGRFeature, OGRFeatureDeleter > OGRFeatureUqPtr
Definition: Importer.cpp:99
std::chrono::steady_clock::time_point start
Definition: Importer.h:709
virtual bool loadImpl(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count, bool checkpoint)
Definition: Importer.cpp:2494
#define CHECK_GT(x, y)
Definition: Logger.h:199
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
void set_compression(EncodingType c)
Definition: sqltypes.h:423
DEVICE void ChunkIter_get_nth(ChunkIter *it, int n, bool uncompress, VarlenDatum *result, bool *is_end)
Definition: ChunkIter.cpp:181
int32_t intval
Definition: sqltypes.h:125
static const std::string trim_space(const char *field, const size_t len)
Definition: Importer.cpp:217
std::string sourceName
ArrayDatum StringToArray(const std::string &s, const SQLTypeInfo &ti, const CopyParams &copy_params)
Definition: Importer.cpp:441
std::string to_string(char const *&&v)
void set_output_srid(int s)
Definition: sqltypes.h:419
int8_t * pointer
Definition: sqltypes.h:72
EncodingType geo_coords_encoding
Definition: Importer.h:126
static bool gdalFileOrDirectoryExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4372
#define DEBUG_TIMING
Definition: Importer.cpp:135
static const std::list< ColumnDescriptor > gdalToColumnDescriptors(const std::string &fileName, const std::string &geoColumnName, const CopyParams &copy_params)
Definition: Importer.cpp:4265
static size_t find_end(const char *buffer, size_t size, const CopyParams &copy_params)
Definition: Importer.cpp:2269
char * try_strptimes(const char *str, const std::vector< std::string > &formats)
Definition: Importer.cpp:2756
Datum NullDatum(SQLTypeInfo &ti)
Definition: Importer.cpp:359
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
float floatval
Definition: sqltypes.h:127
virtual void setTableEpoch(const int32_t new_epoch)
Definition: Importer.cpp:3943
int8_t * appendDatum(int8_t *buf, Datum d, const SQLTypeInfo &ti)
Definition: Importer.cpp:323
std::chrono::duration< size_t, std::milli > elapsed
Definition: Importer.h:714
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
HOST DEVICE void set_subtype(SQLTypes st)
Definition: sqltypes.h:414
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:143
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:2927
Catalog_Namespace::Catalog & getCatalog()
Definition: Importer.h:932
ChunkIterVector & getChunkItersAndRowOffset(ChunkAccessorTable &table, size_t rowid, size_t &rowOffset)
OGRLayer & getLayerWithSpecifiedName(const std::string &geo_layer_name, const OGRDataSourceUqPtr &poDS, const std::string &file_name)
Definition: Importer.cpp:4121
std::vector< std::vector< std::string > > get_sample_rows(size_t n)
Definition: Importer.cpp:3002
void seedFromExistingTableContents(const std::unique_ptr< Loader > &loader, const std::string &geoColumnBaseName)
Definition: Importer.cpp:4772
std::vector< std::vector< std::unique_ptr< TypedImportBuffer > > > import_buffers_vec
Definition: Importer.h:970
boost::geometry::model::box< Point > BoundingBox
Definition: Importer.h:865
std::string s3_access_key
Definition: Importer.h:115
static bool hasGDALLibKML()
Definition: Importer.cpp:4013
static bool gdalSupportsNetworkFileAccess()
Definition: Importer.cpp:4530
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression=true, const bool create_table=true)
Definition: Importer.cpp:4947
DEVICE uint64_t compress_longitude_coord_geoint32(const double coord)
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
static const char * get_row(const char *buf, const char *buf_end, const char *entire_buf_end, const CopyParams &copy_params, bool is_begin, const bool *is_array, std::vector< std::string > &row, bool &try_single_thread)
Definition: Importer.cpp:238
std::string import_id
Definition: Importer.h:966
std::shared_timed_mutex mapd_shared_mutex
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
bool is_decimal() const
Definition: sqltypes.h:453
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:324
ImportStatus archivePlumber()
Definition: Importer.cpp:3058
int64_t bigintval
Definition: sqltypes.h:126
#define NULL_FLOAT
Definition: sqltypes.h:175
uint64_t compress_coord(double coord, const SQLTypeInfo &ti, bool x)
Definition: Importer.cpp:1538
virtual void runDDLStatement(const std::string &)
int16_t smallintval
Definition: sqltypes.h:124
std::string sanitize_name(const std::string &name)
virtual void insertDataNoCheckpoint(InsertData &insertDataStruct)=0
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
ImportHeaderRow has_header
Definition: Importer.h:101
specifies the content in-memory of a row in the column metadata table
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:632
static ImportStatus import_thread_shapefile(int thread_id, Importer *importer, OGRSpatialReference *poGeographicSR, const FeaturePtrVector &features, size_t firstFeature, size_t numFeatures, const FieldNameToIndexMapType &fieldNameToIndexMap, const ColumnNameToSourceNameMapType &columnNameToSourceNameMap, const ColumnIdToRenderGroupAnalyzerMapType &columnIdToRenderGroupAnalyzerMap)
Definition: Importer.cpp:2021
std::vector< std::string > get_headers()
Definition: Importer.cpp:3010
DEVICE uint64_t compress_lattitude_coord_geoint32(const double coord)
void add_value(const ColumnDescriptor *cd, const std::string &val, const bool is_null, const CopyParams &copy_params, const int64_t replicate_count=0)
Definition: Importer.cpp:615
#define DEBUG_RENDER_GROUP_ANALYZER
Definition: Importer.cpp:136
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:268
static void set_geo_physical_import_buffer_columnar(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< std::vector< double >> &coords_column, std::vector< std::vector< double >> &bounds_column, std::vector< std::vector< int >> &ring_sizes_column, std::vector< std::vector< int >> &poly_rings_column, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1665
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const int64_t replicate_count=0)
Definition: Importer.cpp:1581
std::map< int, std::shared_ptr< RenderGroupAnalyzer > > ColumnIdToRenderGroupAnalyzerMapType
Definition: Importer.cpp:132
virtual int32_t getTableEpoch()
Definition: Importer.cpp:3938
static OGRDataSource * openGDALDataset(const std::string &fileName, const CopyParams &copy_params)
Definition: Importer.cpp:4088
static bool parseStringArray(const std::string &s, const CopyParams &copy_params, std::vector< std::string > &string_vec)
Definition: Importer.h:814
std::string s3_endpoint
Definition: Importer.h:118
std::mutex file_offsets_mutex
Definition: Importer.h:759
std::vector< SQLTypes > detect_column_types(const std::vector< std::string > &row)
Definition: Importer.cpp:2845
bool importGeoFromLonLat(double lon, double lat, std::vector< double > &coords)
Definition: Importer.cpp:1519
#define CHECK_LT(x, y)
Definition: Logger.h:197
Definition: sqltypes.h:54
Definition: sqltypes.h:55
static bool gdalFileExists(const std::string &path, const CopyParams &copy_params)
Definition: Importer.cpp:4367
static bool more_restrictive_sqltype(const SQLTypes a, const SQLTypes b)
Definition: Importer.cpp:2853
HOST DEVICE int get_output_srid() const
Definition: sqltypes.h:329
float float_value_at(const TypedImportBuffer &import_buffer, const size_t index)
Definition: Importer.cpp:2330
#define CHECK_LE(x, y)
Definition: Logger.h:198
void startThread(FuncType &&func, Args &&... args)
bool is_null(const T &v, const SQLTypeInfo &t)
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2153
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
Definition: Datum.cpp:284