25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
49 #include <unordered_map>
50 #include <unordered_set>
58 #ifdef ENABLE_IMPORT_PARQUET
61 #if defined(ENABLE_IMPORT_PARQUET)
65 #ifdef ENABLE_IMPORT_PARQUET
94 #include "gen-cpp/Heavy.h"
101 #define TIMER_STOP(t) \
102 (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
117 boost::filesystem::path boost_file_path{file_path};
118 boost::system::error_code ec;
120 return ec ? 0 : filesize;
128 executor->getSessionLock());
129 return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
138 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
140 for (
size_t i = 0; i < row.size(); ++i) {
141 out << (i ?
", " :
"") << row[i];
149 namespace import_export {
154 std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
157 #define DEBUG_TIMING false
158 #define DEBUG_RENDER_GROUP_ANALYZER 0
159 #define DEBUG_AWS_AUTHENTICATION 0
161 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
173 const std::string&
f,
187 auto is_array = std::unique_ptr<bool[]>(
new bool[
loader->get_column_descs().size()]);
189 bool has_array =
false;
191 int skip_physical_cols = 0;
192 for (
auto& p :
loader->get_column_descs()) {
194 if (skip_physical_cols-- > 0) {
199 if (p->isVirtualCol || p->isDeletedCol) {
202 skip_physical_cols = p->columnType.get_physical_cols();
203 if (p->columnType.get_type() ==
kARRAY) {
204 is_array.get()[i] =
true;
207 is_array.get()[i] =
false;
212 is_array_a = std::unique_ptr<bool[]>(is_array.release());
214 is_array_a = std::unique_ptr<bool[]>(
nullptr);
222 if (
buffer[0] !=
nullptr) {
225 if (
buffer[1] !=
nullptr) {
234 throw std::runtime_error(
"Import status not found for id: " + import_id);
241 is.
end = std::chrono::steady_clock::now();
242 is.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.
end - is.
start);
249 while (i < j && (field[i] ==
' ' || field[i] ==
'\r')) {
252 while (i < j && (field[j - 1] ==
' ' || field[j - 1] ==
'\r')) {
255 return std::string(field + i, j - i);
308 throw std::runtime_error(
"Internal error: geometry type in NullArrayDatum.");
310 throw std::runtime_error(
"Internal error: invalid type in NullArrayDatum.");
319 if (s == copy_params.
null_str || s ==
"NULL" || s.empty()) {
326 std::vector<std::string> elem_strs;
328 for (
size_t i = s.find(copy_params.
array_delim, 1); i != std::string::npos;
330 elem_strs.push_back(s.substr(last, i - last));
333 if (last + 1 <= s.size()) {
334 elem_strs.push_back(s.substr(last, s.size() - 1 - last));
336 if (elem_strs.size() == 1) {
337 auto str = elem_strs.front();
338 auto str_trimmed =
trim_space(str.c_str(), str.length());
339 if (str_trimmed ==
"") {
344 size_t len = elem_strs.size() * elem_ti.
get_size();
345 std::unique_ptr<int8_t, FreeDeleter> buf(
347 int8_t* p = buf.get();
348 for (
auto& es : elem_strs) {
355 if (!isdigit(e[0]) && e[0] !=
'-') {
383 while ((p - buf) < len) {
387 CHECK((p - buf) == len);
404 const size_t len = compressed_null_coords.size();
406 memcpy(buf, compressed_null_coords.data(), len);
409 auto modified_ti = coords_ti;
415 const auto& arr = datum.val.arr_val;
416 for (
const auto& elem_datum : arr) {
417 string_vec.push_back(elem_datum.val.str_val);
461 throw std::runtime_error(
"Internal error: geometry type in TDatumToDatum.");
463 throw std::runtime_error(
"Internal error: invalid type in TDatumToDatum.");
477 size_t len = datum.val.arr_val.size() * elem_ti.
get_size();
480 for (
auto& e : datum.val.arr_val) {
490 std::vector<std::string_view> string_view_vec;
491 string_view_vec.reserve(string_vec.size());
492 for (
const auto& str : string_vec) {
494 std::ostringstream oss;
496 <<
" a string was detected too long for encoding, string length = "
497 << str.size() <<
", first 100 characters are '" << str.substr(0, 100) <<
"'";
498 throw std::runtime_error(oss.str());
500 string_view_vec.push_back(str);
519 }
catch (std::exception& e) {
520 std::ostringstream oss;
522 <<
" : " << e.what();
524 throw std::runtime_error(oss.str());
529 const std::string_view val,
532 const bool check_not_null) {
538 throw std::runtime_error(
"NULL for column " + cd->
columnName);
549 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
555 throw std::runtime_error(
"NULL for column " + cd->
columnName);
562 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
568 throw std::runtime_error(
"NULL for column " + cd->
columnName);
575 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
581 throw std::runtime_error(
"NULL for column " + cd->
columnName);
588 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
594 throw std::runtime_error(
"NULL for column " + cd->
columnName);
610 throw std::runtime_error(
"NULL for column " + cd->
columnName);
617 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
618 addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
621 throw std::runtime_error(
"NULL for column " + cd->
columnName);
627 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
628 addDouble(std::atof(std::string(val).c_str()));
631 throw std::runtime_error(
"NULL for column " + cd->
columnName);
642 throw std::runtime_error(
"NULL for column " + cd->
columnName);
647 throw std::runtime_error(
"String too long for column " + cd->
columnName +
658 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
664 throw std::runtime_error(
"NULL for column " + cd->
columnName);
671 throw std::runtime_error(
"NULL for column " + cd->
columnName);
675 std::vector<std::string> string_vec;
678 std::string(val), copy_params, string_vec);
682 size_t expected_size = ti.
get_size() / sti.get_size();
683 size_t actual_size = string_vec.size();
684 if (actual_size != expected_size) {
685 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
687 " values, received " +
702 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
703 " has incorrect length: " + std::string(val));
722 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
778 CHECK(
false) <<
"TypedImportBuffer::pop_value() does not support type " <<
type;
783 using std::runtime_error::runtime_error;
787 template <
typename DATA_TYPE>
791 std::vector<DATA_TYPE>& buffer,
795 std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
796 auto f_value_getter =
value_getter(array, cd, bad_rows_tracker);
797 std::function<void(const int64_t)> f_add_geo_phy_cols = [&](
const int64_t row) {};
798 if (bad_rows_tracker && cd->columnType.is_geometry()) {
799 f_add_geo_phy_cols = [&](
const int64_t row) {
801 std::vector<double> coords, bounds;
802 std::vector<int> ring_sizes, poly_rings;
803 int render_group = 0;
810 if (array.IsNull(row)) {
812 import_ti, coords, bounds, ring_sizes, poly_rings,
false);
814 arrow_throw_if<GeoImportException>(
823 arrow_throw_if<GeoImportException>(
824 cd->columnType.get_type() != ti.
get_type(),
825 error_context(cd, bad_rows_tracker) +
"Geometry type mismatch");
827 auto col_idx_workpad =
col_idx;
840 }
catch (std::runtime_error& e) {
842 }
catch (
const std::exception& e) {
849 auto f_mark_a_bad_row = [&](
const auto row) {
850 std::unique_lock<std::mutex> lck(bad_rows_tracker->
mutex);
851 bad_rows_tracker->
rows.insert(row - slice_range.first);
853 buffer.reserve(slice_range.second - slice_range.first);
854 for (
size_t row = slice_range.first; row < slice_range.second; ++row) {
856 *data << (array.IsNull(row) ?
nullptr : f_value_getter(array, row));
857 f_add_geo_phy_cols(row);
859 f_mark_a_bad_row(row);
862 if (bad_rows_tracker) {
864 f_mark_a_bad_row(row);
870 return buffer.size();
875 const bool exact_type_match,
886 if (exact_type_match) {
887 arrow_throw_if(col.type_id() != Type::BOOL,
"Expected boolean type");
892 if (exact_type_match) {
893 arrow_throw_if(col.type_id() != Type::INT8,
"Expected int8 type");
898 if (exact_type_match) {
899 arrow_throw_if(col.type_id() != Type::INT16,
"Expected int16 type");
904 if (exact_type_match) {
905 arrow_throw_if(col.type_id() != Type::INT32,
"Expected int32 type");
908 cd, col, *
int_buffer_, slice_range, bad_rows_tracker);
912 if (exact_type_match) {
913 arrow_throw_if(col.type_id() != Type::INT64,
"Expected int64 type");
918 if (exact_type_match) {
919 arrow_throw_if(col.type_id() != Type::FLOAT,
"Expected float type");
924 if (exact_type_match) {
925 arrow_throw_if(col.type_id() != Type::DOUBLE,
"Expected double type");
932 if (exact_type_match) {
934 "Expected string type");
939 if (exact_type_match) {
940 arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
941 "Expected time32 or time64 type");
946 if (exact_type_match) {
947 arrow_throw_if(col.type_id() != Type::TIMESTAMP,
"Expected timestamp type");
952 if (exact_type_match) {
953 arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
954 "Expected date32 or date64 type");
965 "Expected string type");
969 throw std::runtime_error(
"Arrow array appends not yet supported");
971 throw std::runtime_error(
"Invalid Type");
980 if (
std::any_of(col.nulls.begin(), col.nulls.end(), [](
int i) {
return i != 0; })) {
981 throw std::runtime_error(
"NULL for column " + cd->
columnName);
987 dataSize = col.data.int_col.size();
989 for (
size_t i = 0; i < dataSize; i++) {
999 dataSize = col.data.int_col.size();
1001 for (
size_t i = 0; i < dataSize; i++) {
1011 dataSize = col.data.int_col.size();
1013 for (
size_t i = 0; i < dataSize; i++) {
1023 dataSize = col.data.int_col.size();
1025 for (
size_t i = 0; i < dataSize; i++) {
1029 int_buffer_->push_back((int32_t)col.data.int_col[i]);
1037 dataSize = col.data.int_col.size();
1039 for (
size_t i = 0; i < dataSize; i++) {
1049 dataSize = col.data.real_col.size();
1051 for (
size_t i = 0; i < dataSize; i++) {
1061 dataSize = col.data.real_col.size();
1063 for (
size_t i = 0; i < dataSize; i++) {
1076 dataSize = col.data.str_col.size();
1078 for (
size_t i = 0; i < dataSize; i++) {
1090 dataSize = col.data.int_col.size();
1092 for (
size_t i = 0; i < dataSize; i++) {
1096 bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1107 dataSize = col.data.str_col.size();
1109 for (
size_t i = 0; i < dataSize; i++) {
1120 dataSize = col.data.arr_col.size();
1122 for (
size_t i = 0; i < dataSize; i++) {
1124 if (!col.nulls[i]) {
1125 size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1126 for (
size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1127 string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1135 for (
size_t i = 0; i < dataSize; i++) {
1139 size_t len = col.data.arr_col[i].data.int_col.size();
1140 size_t byteSize = len *
sizeof(int8_t);
1143 for (
size_t j = 0; j < len; ++j) {
1147 if (col.data.arr_col[i].nulls[j]) {
1148 *p =
static_cast<int8_t
>(
1151 *(
bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1161 for (
size_t i = 0; i < dataSize; i++) {
1165 size_t len = col.data.arr_col[i].data.int_col.size();
1166 size_t byteSize = len *
sizeof(int8_t);
1169 for (
size_t j = 0; j < len; ++j) {
1170 *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1171 p +=
sizeof(int8_t);
1179 for (
size_t i = 0; i < dataSize; i++) {
1183 size_t len = col.data.arr_col[i].data.int_col.size();
1184 size_t byteSize = len *
sizeof(int16_t);
1187 for (
size_t j = 0; j < len; ++j) {
1189 static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1190 p +=
sizeof(int16_t);
1198 for (
size_t i = 0; i < dataSize; i++) {
1202 size_t len = col.data.arr_col[i].data.int_col.size();
1203 size_t byteSize = len *
sizeof(int32_t);
1206 for (
size_t j = 0; j < len; ++j) {
1208 static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1209 p +=
sizeof(int32_t);
1219 for (
size_t i = 0; i < dataSize; i++) {
1223 size_t len = col.data.arr_col[i].data.int_col.size();
1224 size_t byteSize = len *
sizeof(int64_t);
1227 for (
size_t j = 0; j < len; ++j) {
1229 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1230 p +=
sizeof(int64_t);
1238 for (
size_t i = 0; i < dataSize; i++) {
1242 size_t len = col.data.arr_col[i].data.real_col.size();
1243 size_t byteSize = len *
sizeof(float);
1246 for (
size_t j = 0; j < len; ++j) {
1247 *(
float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1256 for (
size_t i = 0; i < dataSize; i++) {
1260 size_t len = col.data.arr_col[i].data.real_col.size();
1261 size_t byteSize = len *
sizeof(double);
1264 for (
size_t j = 0; j < len; ++j) {
1265 *(
double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1266 p +=
sizeof(double);
1276 for (
size_t i = 0; i < dataSize; i++) {
1280 size_t len = col.data.arr_col[i].data.int_col.size();
1281 size_t byteWidth =
sizeof(int64_t);
1282 size_t byteSize = len * byteWidth;
1285 for (
size_t j = 0; j < len; ++j) {
1286 *
reinterpret_cast<int64_t*
>(p) =
1287 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1288 p +=
sizeof(int64_t);
1296 throw std::runtime_error(
"Invalid Array Type");
1302 throw std::runtime_error(
"Invalid Type");
1308 const TDatum& datum,
1316 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1329 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1339 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1346 addInt((int32_t)datum.val.int_val);
1349 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1359 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1366 addFloat((
float)datum.val.real_val);
1369 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1379 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1390 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1405 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1413 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1434 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1442 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
1509 static_cast<float>(std::atof(std::string(val).c_str())));
1516 double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1528 throw std::runtime_error(
"String too long for column " + cd->
columnName +
1547 std::vector<std::string> string_vec;
1550 std::string(val), cp, string_vec);
1553 if (ti.get_size() > 0) {
1554 auto sti = ti.get_elem_type();
1555 size_t expected_size = ti.get_size() / sti.get_size();
1556 size_t actual_size = string_vec.size();
1557 if (actual_size != expected_size) {
1558 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1560 " values, received " +
1566 if (ti.get_size() > 0) {
1568 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1569 " currently cannot accept NULL arrays");
1581 if (ti.get_size() > 0 &&
static_cast<size_t>(ti.get_size()) != d.length) {
1582 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
1583 " has incorrect length: " + std::string(val));
1602 CHECK(
false) <<
"TypedImportBuffer::addDefaultValues() does not support type "
1609 std::vector<double>& coords,
1611 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1616 if (!pt.transform(ti)) {
1622 coords.push_back(lon);
1623 coords.push_back(lat);
1630 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1632 std::vector<double>& coords,
1633 std::vector<double>& bounds,
1634 std::vector<int>& ring_sizes,
1635 std::vector<int>& poly_rings,
1637 const bool force_null) {
1639 const auto col_type = col_ti.
get_type();
1642 bool is_null_geo =
false;
1644 if (!col_ti.get_notnull()) {
1647 is_null_point =
true;
1650 is_null_geo = coords.empty();
1651 if (is_null_point) {
1656 is_null_geo =
false;
1668 tdd_coords.val.arr_val.reserve(compressed_coords.size());
1669 for (
auto cc : compressed_coords) {
1670 tdd_coords.val.arr_val.emplace_back();
1671 tdd_coords.val.arr_val.back().val.int_val = cc;
1674 tdd_coords.is_null = is_null_geo;
1675 import_buffers[col_idx++]->add_value(cd_coords, tdd_coords,
false);
1680 TDatum tdd_ring_sizes;
1681 tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1683 for (
auto ring_size : ring_sizes) {
1684 tdd_ring_sizes.val.arr_val.emplace_back();
1685 tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1688 tdd_ring_sizes.is_null = is_null_geo;
1689 import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1695 TDatum tdd_poly_rings;
1696 tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1698 for (
auto num_rings : poly_rings) {
1699 tdd_poly_rings.val.arr_val.emplace_back();
1700 tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1703 tdd_poly_rings.is_null = is_null_geo;
1704 import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1711 tdd_bounds.val.arr_val.reserve(bounds.size());
1713 for (
auto b : bounds) {
1714 tdd_bounds.val.arr_val.emplace_back();
1715 tdd_bounds.val.arr_val.back().val.real_val = b;
1718 tdd_bounds.is_null = is_null_geo;
1719 import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds,
false);
1725 TDatum td_render_group;
1726 td_render_group.val.int_val = render_group;
1727 td_render_group.is_null = is_null_geo;
1728 import_buffers[col_idx++]->add_value(cd_render_group, td_render_group,
false);
1735 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1737 std::vector<std::vector<double>>& coords_column,
1738 std::vector<std::vector<double>>& bounds_column,
1739 std::vector<std::vector<int>>& ring_sizes_column,
1740 std::vector<std::vector<int>>& poly_rings_column,
1741 std::vector<int>& render_groups_column) {
1743 const auto col_type = col_ti.
get_type();
1746 auto coords_row_count = coords_column.size();
1748 for (
auto& coords : coords_column) {
1749 bool is_null_geo =
false;
1751 if (!col_ti.get_notnull()) {
1754 is_null_point =
true;
1757 is_null_geo = coords.empty();
1758 if (is_null_point) {
1763 is_null_geo =
false;
1766 std::vector<TDatum> td_coords_data;
1768 std::vector<uint8_t> compressed_coords =
1770 for (
auto const& cc : compressed_coords) {
1772 td_byte.val.int_val = cc;
1773 td_coords_data.push_back(td_byte);
1777 tdd_coords.val.arr_val = td_coords_data;
1778 tdd_coords.is_null = is_null_geo;
1779 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
1784 if (ring_sizes_column.size() != coords_row_count) {
1785 CHECK(
false) <<
"Geometry import columnar: ring sizes column size mismatch";
1789 for (
auto const& ring_sizes : ring_sizes_column) {
1790 bool is_null_geo =
false;
1791 if (!col_ti.get_notnull()) {
1793 is_null_geo = ring_sizes.empty();
1795 std::vector<TDatum> td_ring_sizes;
1796 for (
auto const& ring_size : ring_sizes) {
1797 TDatum td_ring_size;
1798 td_ring_size.val.int_val = ring_size;
1799 td_ring_sizes.push_back(td_ring_size);
1801 TDatum tdd_ring_sizes;
1802 tdd_ring_sizes.val.arr_val = td_ring_sizes;
1803 tdd_ring_sizes.is_null = is_null_geo;
1804 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1810 if (poly_rings_column.size() != coords_row_count) {
1811 CHECK(
false) <<
"Geometry import columnar: poly rings column size mismatch";
1815 for (
auto const& poly_rings : poly_rings_column) {
1816 bool is_null_geo =
false;
1817 if (!col_ti.get_notnull()) {
1819 is_null_geo = poly_rings.empty();
1821 std::vector<TDatum> td_poly_rings;
1822 for (
auto const& num_rings : poly_rings) {
1823 TDatum td_num_rings;
1824 td_num_rings.val.int_val = num_rings;
1825 td_poly_rings.push_back(td_num_rings);
1827 TDatum tdd_poly_rings;
1828 tdd_poly_rings.val.arr_val = td_poly_rings;
1829 tdd_poly_rings.is_null = is_null_geo;
1830 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1837 if (bounds_column.size() != coords_row_count) {
1838 CHECK(
false) <<
"Geometry import columnar: bounds column size mismatch";
1841 for (
auto const& bounds : bounds_column) {
1842 bool is_null_geo =
false;
1843 if (!col_ti.get_notnull()) {
1847 std::vector<TDatum> td_bounds_data;
1848 for (
auto const& b : bounds) {
1850 td_double.val.real_val = b;
1851 td_bounds_data.push_back(td_double);
1854 tdd_bounds.val.arr_val = td_bounds_data;
1855 tdd_bounds.is_null = is_null_geo;
1856 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
1864 for (
auto const& render_group : render_groups_column) {
1865 TDatum td_render_group;
1866 td_render_group.val.int_val = render_group;
1867 td_render_group.is_null =
false;
1868 import_buffers[col_idx]->add_value(cd_render_group, td_render_group,
false);
1877 const std::list<const ColumnDescriptor*>& col_descs) {
1881 int collection_col_idx = -1;
1883 std::string collection_col_name;
1885 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1886 auto const& cd = *cd_it;
1887 auto const col_type = cd->columnType.get_type();
1889 if (collection_col_idx >= 0) {
1890 throw std::runtime_error(
1891 "Explode Collections: Found more than one destination column");
1893 collection_col_idx = col_idx;
1894 collection_child_type = col_type;
1895 collection_col_name = cd->columnName;
1897 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1902 if (collection_col_idx < 0) {
1903 throw std::runtime_error(
1904 "Explode Collections: Failed to find a supported column type to explode "
1907 return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1911 OGRGeometry* ogr_geometry,
1913 const std::string& collection_col_name,
1914 size_t row_or_feature_idx,
1915 std::function<
void(OGRGeometry*)> execute_import_lambda) {
1916 auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1917 bool is_collection =
false;
1918 switch (collection_child_type) {
1920 switch (ogr_geometry_type) {
1922 is_collection =
true;
1927 throw std::runtime_error(
1928 "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1932 switch (ogr_geometry_type) {
1933 case wkbMultiLineString:
1934 is_collection =
true;
1939 throw std::runtime_error(
1940 "Explode Collections: Source geo type must be MULTILINESTRING or "
1945 switch (ogr_geometry_type) {
1946 case wkbMultiPolygon:
1947 is_collection =
true;
1952 throw std::runtime_error(
1953 "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1957 CHECK(
false) <<
"Unsupported geo child type " << collection_child_type;
1963 if (is_collection) {
1965 OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1966 CHECK(collection_geometry);
1968 #if LOG_EXPLODE_COLLECTIONS
1970 LOG(
INFO) <<
"Exploding row/feature " << row_or_feature_idx <<
" for column '"
1971 << explode_col_name <<
"' into " << collection_geometry->getNumGeometries()
1976 uint32_t child_geometry_count = 0;
1977 auto child_geometry_it = collection_geometry->begin();
1978 while (child_geometry_it != collection_geometry->end()) {
1980 OGRGeometry* import_geometry = *child_geometry_it;
1982 [&] { execute_import_lambda(import_geometry); });
1985 child_geometry_it++;
1986 child_geometry_count++;
1991 [&] { execute_import_lambda(ogr_geometry); });
2003 std::unique_ptr<
char[]> scratch_buffer,
2008 size_t first_row_index_this_buffer,
2010 Executor* executor) {
2012 int64_t total_get_row_time_us = 0;
2013 int64_t total_str_to_val_time_us = 0;
2014 auto query_session = session_info ? session_info->
get_session_id() :
"";
2015 CHECK(scratch_buffer);
2016 auto buffer = scratch_buffer.get();
2023 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2026 const char* thread_buf = buffer + begin_pos + begin;
2027 const char* thread_buf_end = buffer + end_pos;
2028 const char* buf_end = buffer + total_size;
2029 bool try_single_thread =
false;
2030 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2035 for (
const auto cd : col_descs) {
2036 const auto& col_ti = cd->columnType;
2037 phys_cols += col_ti.get_physical_cols();
2038 if (cd->columnType.get_type() ==
kPOINT) {
2042 auto num_cols = col_descs.size() - phys_cols;
2043 for (
const auto& p : import_buffers) {
2046 std::vector<std::string_view> row;
2047 size_t row_index_plus_one = 0;
2048 for (
const char* p = thread_buf; p < thread_buf_end; p++) {
2050 std::vector<std::unique_ptr<char[]>>
2064 total_get_row_time_us += us;
2076 row_index_plus_one++;
2078 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2080 LOG(
ERROR) <<
"Incorrect Row (expected " << num_cols <<
" columns, has "
2092 auto execute_import_row = [&](OGRGeometry* import_geometry) {
2093 size_t import_idx = 0;
2096 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2098 const auto& col_ti = cd->columnType;
2111 if (!cd->columnType.is_string() && row[import_idx].empty()) {
2114 if (!cd->columnType.is_string() && !copy_params.
trim_spaces) {
2116 row[import_idx] =
sv_strip(row[import_idx]);
2119 if (col_ti.get_physical_cols() == 0) {
2122 import_buffers[col_idx]->add_value(
2123 cd, row[import_idx], is_null, copy_params);
2132 import_buffers[col_idx]->add_value(
2133 cd, copy_params.
null_str,
true, copy_params);
2136 auto const& geo_string = row[import_idx];
2142 SQLTypes col_type = col_ti.get_type();
2145 std::vector<double> coords;
2146 std::vector<double> bounds;
2147 std::vector<int> ring_sizes;
2148 std::vector<int> poly_rings;
2149 int render_group = 0;
2154 if (col_type ==
kPOINT && !is_null && geo_string.size() > 0 &&
2155 (geo_string[0] ==
'.' || isdigit(geo_string[0]) ||
2156 geo_string[0] ==
'-') &&
2157 geo_string.find_first_of(
"ABCDEFabcdef") == std::string::npos) {
2158 double lon = std::atof(std::string(geo_string).c_str());
2160 auto lat_str = row[import_idx];
2162 if (lat_str.size() > 0 &&
2163 (lat_str[0] ==
'.' || isdigit(lat_str[0]) || lat_str[0] ==
'-')) {
2164 lat = std::atof(std::string(lat_str).c_str());
2167 if (!copy_params.
lonlat) {
2178 import_ti.get_output_srid() == 4326) {
2182 import_ti.set_input_srid(srid0);
2186 throw std::runtime_error(
2187 "Cannot read lon/lat to insert into POINT column " +
2195 import_ti.get_output_srid() == 4326) {
2199 import_ti.set_input_srid(srid0);
2203 if (col_ti.get_notnull()) {
2204 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2214 if (import_geometry) {
2225 "Failed to extract valid geometry from exploded row " +
2227 row_index_plus_one) +
2228 " for column " + cd->columnName;
2229 throw std::runtime_error(msg);
2234 std::string(geo_string),
2241 std::string msg =
"Failed to extract valid geometry from row " +
2243 row_index_plus_one) +
2244 " for column " + cd->columnName;
2245 throw std::runtime_error(msg);
2250 if (col_type != import_ti.get_type()) {
2254 throw std::runtime_error(
2255 "Imported geometry doesn't match the type of column " +
2262 if (columnIdToRenderGroupAnalyzerMap.size()) {
2264 if (ring_sizes.size()) {
2266 auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2267 CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2269 (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2290 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2299 "Table load was cancelled via Query Interrupt";
2303 }
catch (
const std::exception& e) {
2304 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2305 import_buffers[col_idx_to_pop]->pop_value();
2308 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
2311 LOG(
ERROR) <<
"Load was cancelled due to max reject rows being reached";
2314 "Load was cancelled due to max reject rows being reached";
2321 auto const [collection_col_idx, collection_child_type, collection_col_name] =
2324 CHECK_LT(collection_col_idx, (
int)row.size()) <<
"column index out of range";
2325 auto const& collection_geo_string = row[collection_col_idx];
2327 OGRGeometry* ogr_geometry =
nullptr;
2330 OGRGeometryFactory::destroyGeometry(ogr_geometry);
2334 std::string(collection_geo_string));
2337 collection_child_type,
2338 collection_col_name,
2339 first_row_index_this_buffer + row_index_plus_one,
2340 execute_import_row);
2344 [&] { execute_import_row(
nullptr); });
2351 total_str_to_val_time_us += us;
2361 LOG(
INFO) <<
"Thread" << std::this_thread::get_id() <<
":"
2363 << (double)ms / 1000.0 <<
"sec, Insert Time: " << (
double)load_ms / 1000.0
2364 <<
"sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2365 <<
"sec, str_to_val: " << (
double)total_str_to_val_time_us / 1000000.0
2366 <<
"sec" << std::endl;
2369 return thread_import_status;
2375 : std::runtime_error(
"Column '" + column_name +
"' is not a geo column") {}
2381 OGRCoordinateTransformation* coordinate_transformation,
2383 size_t firstFeature,
2393 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2394 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2396 auto query_session = session_info ? session_info->
get_session_id() :
"";
2397 for (
const auto& p : import_buffers) {
2404 for (
size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2406 if (!features[iFeature]) {
2413 OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2414 if (pGeometry && coordinate_transformation) {
2415 pGeometry->transform(coordinate_transformation);
2422 auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2428 thread_import_status.
load_msg =
"Table load was cancelled via Query Interrupt";
2432 uint32_t field_column_count{0u};
2433 uint32_t metadata_column_count{0u};
2435 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2439 const auto& col_ti = cd->columnType;
2440 if (col_ti.is_geometry()) {
2445 SQLTypes col_type = col_ti.get_type();
2448 import_buffers[col_idx]->add_value(
2449 cd, copy_params.
null_str,
true, copy_params);
2453 std::vector<double> coords;
2454 std::vector<double> bounds;
2455 std::vector<int> ring_sizes;
2456 std::vector<int> poly_rings;
2457 int render_group = 0;
2461 bool is_null_geo = !import_geometry;
2463 if (col_ti.get_notnull()) {
2464 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2482 std::string msg =
"Failed to extract valid geometry from feature " +
2484 " for column " + cd->columnName;
2485 throw std::runtime_error(msg);
2489 if (col_type != import_ti.get_type()) {
2493 throw std::runtime_error(
2494 "Imported geometry doesn't match the type of column " +
2500 if (columnIdToRenderGroupAnalyzerMap.size()) {
2502 if (ring_sizes.size()) {
2504 auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2505 CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2507 (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2517 auto cd_coords = *cd_it;
2518 std::vector<TDatum> td_coord_data;
2520 std::vector<uint8_t> compressed_coords =
2522 for (
auto cc : compressed_coords) {
2524 td_byte.val.int_val = cc;
2525 td_coord_data.push_back(td_byte);
2529 tdd_coords.val.arr_val = td_coord_data;
2530 tdd_coords.is_null = is_null_geo;
2531 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
2538 auto cd_ring_sizes = *cd_it;
2539 std::vector<TDatum> td_ring_sizes;
2541 for (
auto ring_size : ring_sizes) {
2542 TDatum td_ring_size;
2543 td_ring_size.val.int_val = ring_size;
2544 td_ring_sizes.push_back(td_ring_size);
2547 TDatum tdd_ring_sizes;
2548 tdd_ring_sizes.val.arr_val = td_ring_sizes;
2549 tdd_ring_sizes.is_null = is_null_geo;
2550 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
2557 auto cd_poly_rings = *cd_it;
2558 std::vector<TDatum> td_poly_rings;
2560 for (
auto num_rings : poly_rings) {
2561 TDatum td_num_rings;
2562 td_num_rings.val.int_val = num_rings;
2563 td_poly_rings.push_back(td_num_rings);
2566 TDatum tdd_poly_rings;
2567 tdd_poly_rings.val.arr_val = td_poly_rings;
2568 tdd_poly_rings.is_null = is_null_geo;
2569 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
2578 auto cd_bounds = *cd_it;
2579 std::vector<TDatum> td_bounds_data;
2581 for (
auto b : bounds) {
2583 td_double.val.real_val = b;
2584 td_bounds_data.push_back(td_double);
2588 tdd_bounds.val.arr_val = td_bounds_data;
2589 tdd_bounds.is_null = is_null_geo;
2590 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
2597 auto cd_render_group = *cd_it;
2598 TDatum td_render_group;
2599 td_render_group.val.int_val = render_group;
2600 td_render_group.is_null = is_null_geo;
2601 import_buffers[col_idx]->add_value(cd_render_group, td_render_group,
false);
2604 }
else if (field_column_count < fieldNameToIndexMap.size()) {
2608 auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2609 CHECK(cit != columnNameToSourceNameMap.end());
2610 auto const& field_name = cit->second;
2612 auto const fit = fieldNameToIndexMap.find(field_name);
2613 if (fit == fieldNameToIndexMap.end()) {
2617 auto const& field_index = fit->second;
2618 CHECK(field_index < fieldNameToIndexMap.size());
2620 auto const& feature = features[iFeature];
2622 auto field_defn = feature->GetFieldDefnRef(field_index);
2629 std::string value_string;
2630 int array_index = 0, array_size = 0;
2632 auto stringify_numeric_list = [&](
auto* values) {
2634 while (array_index < array_size) {
2635 auto separator = (array_index > 0) ?
"," :
"";
2639 value_string +=
"}";
2642 auto field_type = field_defn->GetType();
2643 switch (field_type) {
2652 value_string = feature->GetFieldAsString(field_index);
2654 case OFTIntegerList: {
2655 auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2656 stringify_numeric_list(values);
2658 case OFTInteger64List: {
2659 auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2660 stringify_numeric_list(values);
2663 auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2664 stringify_numeric_list(values);
2666 case OFTStringList: {
2667 auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2669 if (array_of_strings) {
2670 while (
auto* this_string = array_of_strings[array_index]) {
2671 auto separator = (array_index > 0) ?
"," :
"";
2672 value_string +=
separator + std::string(this_string);
2676 value_string +=
"}";
2679 throw std::runtime_error(
"Unsupported geo file field type (" +
2684 import_buffers[col_idx]->add_value(cd, value_string,
false, copy_params);
2686 field_column_count++;
2687 }
else if (metadata_column_count < metadata_column_infos.size()) {
2691 auto const& mci = metadata_column_infos[metadata_column_count];
2692 if (mci.column_descriptor.columnName != cd->columnName) {
2693 throw std::runtime_error(
"Metadata column name mismatch");
2695 import_buffers[col_idx]->add_value(cd, mci.value,
false, copy_params);
2697 metadata_column_count++;
2699 throw std::runtime_error(
"Column count mismatch");
2708 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Aborting import.";
2709 throw std::runtime_error(e.what());
2710 }
catch (
const std::exception& e) {
2711 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2712 import_buffers[col_idx_to_pop]->pop_value();
2715 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Row discarded.";
2721 auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2724 collection_child_type,
2725 collection_col_name,
2726 firstFeature + iFeature + 1,
2727 execute_import_feature);
2730 execute_import_feature(pGeometry);
2736 float load_s = 0.0f;
2744 LOG(
INFO) <<
"DEBUG: Process " << convert_s <<
"s";
2745 LOG(
INFO) <<
"DEBUG: Load " << load_s <<
"s";
2746 LOG(
INFO) <<
"DEBUG: Total " << (convert_s + load_s) <<
"s";
2751 return thread_import_status;
2755 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2758 return loadImpl(import_buffers, row_count,
false, session_info);
2761 bool Loader::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2764 return loadImpl(import_buffers, row_count,
true, session_info);
2771 const int8_t* values_buffer{
nullptr};
2772 if (ti.is_string()) {
2778 CHECK(values_buffer);
2779 const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2780 switch (logical_size) {
2782 return values_buffer[index];
2785 return reinterpret_cast<const int16_t*
>(values_buffer)[index];
2788 return reinterpret_cast<const int32_t*
>(values_buffer)[index];
2791 return reinterpret_cast<const int64_t*
>(values_buffer)[index];
2794 LOG(
FATAL) <<
"Unexpected size for shard key: " << logical_size;
2803 const auto values_buffer = import_buffer.
getAsBytes();
2804 return reinterpret_cast<const float*
>(may_alias_ptr(values_buffer))[index];
2810 const auto values_buffer = import_buffer.
getAsBytes();
2811 return reinterpret_cast<const double*
>(may_alias_ptr(values_buffer))[index];
2819 for (
size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2820 const auto& input_buffer = import_buffers[col_idx];
2821 const auto& col_ti = input_buffer->getTypeInfo();
2827 shard_output_buffers[col_idx]->addBoolean(
int_value_at(*input_buffer, row_index));
2830 shard_output_buffers[col_idx]->addTinyint(
int_value_at(*input_buffer, row_index));
2833 shard_output_buffers[col_idx]->addSmallint(
2837 shard_output_buffers[col_idx]->addInt(
int_value_at(*input_buffer, row_index));
2840 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2843 shard_output_buffers[col_idx]->addFloat(
float_value_at(*input_buffer, row_index));
2846 shard_output_buffers[col_idx]->addDouble(
2852 CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2853 shard_output_buffers[col_idx]->addString(
2854 (*input_buffer->getStringBuffer())[row_index]);
2860 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2864 CHECK(input_buffer->getStringArrayBuffer());
2865 CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2866 const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2867 shard_output_buffers[col_idx]->addStringArray(input_arr);
2869 shard_output_buffers[col_idx]->addArray(
2870 (*input_buffer->getArrayBuffer())[row_index]);
2879 CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2880 shard_output_buffers[col_idx]->addGeoString(
2881 (*input_buffer->getGeoStringBuffer())[row_index]);
2891 std::vector<OneShardBuffers>& all_shard_import_buffers,
2892 std::vector<size_t>& all_shard_row_counts,
2894 const size_t row_count,
2895 const size_t shard_count,
2902 shard_col_desc = col_desc;
2906 CHECK(shard_col_desc);
2909 const auto& shard_col_ti = shard_col_desc->columnType;
2910 CHECK(shard_col_ti.is_integer() ||
2911 (shard_col_ti.is_string() && shard_col_ti.get_compression() ==
kENCODING_DICT) ||
2912 shard_col_ti.is_time());
2913 if (shard_col_ti.is_string()) {
2914 const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2915 CHECK(payloads_ptr);
2916 shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2919 for (
size_t i = 0; i < row_count; ++i) {
2920 const size_t shard =
2922 auto& shard_output_buffers = all_shard_import_buffers[shard];
2924 ++all_shard_row_counts[shard];
2929 std::vector<OneShardBuffers>& all_shard_import_buffers,
2930 std::vector<size_t>& all_shard_row_counts,
2932 const size_t row_count,
2933 const size_t shard_count,
2936 CHECK(shard_tds.size() == shard_count);
2938 for (
size_t shard = 0; shard < shard_count; ++shard) {
2939 auto& shard_output_buffers = all_shard_import_buffers[shard];
2940 if (row_count != 0) {
2945 all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2950 std::vector<size_t>& all_shard_row_counts,
2952 const size_t row_count,
2953 const size_t shard_count,
2955 all_shard_row_counts.resize(shard_count);
2956 for (
size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2957 all_shard_import_buffers.emplace_back();
2958 for (
const auto& typed_import_buffer : import_buffers) {
2959 all_shard_import_buffers.back().emplace_back(
2961 typed_import_buffer->getStringDictionary()));
2967 all_shard_row_counts,
2974 all_shard_row_counts,
2983 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2992 std::vector<OneShardBuffers> all_shard_import_buffers;
2993 std::vector<size_t> all_shard_row_counts;
2996 all_shard_row_counts,
2999 shard_tables.size(),
3001 bool success =
true;
3002 for (
size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3003 success = success &&
loadToShard(all_shard_import_buffers[shard_idx],
3004 all_shard_row_counts[shard_idx],
3005 shard_tables[shard_idx],
3015 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3017 std::vector<std::pair<const size_t, std::future<int8_t*>>>
3018 encoded_data_block_ptrs_futures;
3020 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
3023 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
3026 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3029 import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3035 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
3042 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
3052 auto geo_payload_ptr =
import_buffers[buf_idx]->getGeoStringBuffer();
3069 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3070 result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3076 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3084 bool success =
false;
3087 }
catch (std::exception& e) {
3088 std::ostringstream oss;
3089 oss <<
"Exception when loading Table " << shard_table->
tableName <<
", issue was "
3100 for (
auto& buffer : import_buffers) {
3101 ins_data.
columnIds.push_back(buffer->getColumnDesc()->columnId);
3109 loader_lock.unlock();
3114 shard_table->
fragmenter->insertData(ins_data);
3116 shard_table->
fragmenter->insertDataNoCheckpoint(ins_data);
3118 }
catch (std::exception& e) {
3119 std::ostringstream oss;
3120 oss <<
"Fragmenter Insert Exception when processing Table "
3121 << shard_table->
tableName <<
" issue was " << e.what();
3133 std::vector<const TableDescriptor*> table_descs(1,
table_desc_);
3137 for (
auto table_desc : table_descs) {
3138 table_desc->fragmenter->dropColumns(columnIds);
3148 CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3151 dict_map_[cd->columnId] = dd->stringDict.get();
3164 const std::string& file_path,
3165 const bool decompressed,
3172 throw std::runtime_error(
"failed to open file '" + file_path +
3173 "': " + strerror(errno));
3179 line.reserve(1 * 1024 * 1024);
3180 auto end_time = std::chrono::steady_clock::now() +
3181 timeout * (boost::istarts_with(file_path,
"s3://") ? 3 : 1);
3187 if (n++ >= line.capacity()) {
3199 if (
line1.empty()) {
3201 }
else if (line ==
line1) {
3210 if (std::chrono::steady_clock::now() >
end_time) {
3217 }
catch (std::exception& e) {
3236 if (boost::filesystem::extension(
file_path) ==
".tsv") {
3243 const char* buf =
raw_data.c_str();
3244 const char* buf_end = buf +
raw_data.size();
3245 bool try_single_thread =
false;
3246 for (
const char* p = buf; p < buf_end; p++) {
3247 std::vector<std::string> row;
3248 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3259 if (try_single_thread) {
3263 if (try_single_thread) {
3266 for (
const char* p = buf; p < buf_end; p++) {
3267 std::vector<std::string> row;
3268 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3286 boost::lexical_cast<
T>(str);
3287 }
catch (
const boost::bad_lexical_cast& e) {
3295 if (try_cast<double>(str)) {
3300 if (try_cast<int16_t>(str)) {
3302 }
else if (try_cast<int32_t>(str)) {
3304 }
else if (try_cast<int64_t>(str)) {
3306 }
else if (try_cast<float>(str)) {
3312 if (type ==
kTEXT) {
3314 std::string str_upper_case = str;
3316 str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3319 if (str_upper_case.find(
"POINT") == 0) {
3321 }
else if (str_upper_case.find(
"MULTIPOINT") == 0) {
3323 }
else if (str_upper_case.find(
"LINESTRING") == 0) {
3325 }
else if (str_upper_case.find(
"MULTILINESTRING") == 0) {
3327 }
else if (str_upper_case.find(
"POLYGON") == 0) {
3333 }
else if (str_upper_case.find(
"MULTIPOLYGON") == 0) {
3335 }
else if (str_upper_case.find_first_not_of(
"0123456789ABCDEF") ==
3336 std::string::npos &&
3337 (str_upper_case.size() % 2) == 0) {
3339 if (str_upper_case.size() >= 10) {
3344 auto first_five_bytes = str_upper_case.substr(0, 10);
3345 if (first_five_bytes ==
"0000000001" || first_five_bytes ==
"0101000000") {
3347 }
else if (first_five_bytes ==
"0000000004" || first_five_bytes ==
"0104000000") {
3349 }
else if (first_five_bytes ==
"0000000002" || first_five_bytes ==
"0102000000") {
3351 }
else if (first_five_bytes ==
"0000000005" || first_five_bytes ==
"0105000000") {
3353 }
else if (first_five_bytes ==
"0000000003" || first_five_bytes ==
"0103000000") {
3355 }
else if (first_five_bytes ==
"0000000006" || first_five_bytes ==
"0106000000") {
3369 if (type ==
kTEXT) {
3384 std::vector<SQLTypes> types(row.size());
3385 for (
size_t i = 0; i < row.size(); i++) {
3392 static std::array<int, kSQLTYPE_LAST> typeorder;
3393 typeorder[
kCHAR] = 0;
3396 typeorder[
kINT] = 4;
3401 typeorder[
kTIME] = 9;
3402 typeorder[
kDATE] = 10;
3409 typeorder[
kTEXT] = 12;
3412 return typeorder[b] < typeorder[
a];
3443 const std::vector<std::vector<std::string>>& raw_rows,
3449 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3450 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3453 throw std::runtime_error(
"No rows found in: " +
3454 boost::filesystem::basename(
file_path));
3457 size_t num_cols =
raw_rows.front().size();
3458 std::vector<SQLTypes> best_types(num_cols,
kCHAR);
3459 std::vector<size_t> non_null_col_counts(num_cols, 0);
3460 for (
auto row = row_begin; row != row_end; row++) {
3461 while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3462 best_types.push_back(
kCHAR);
3463 non_null_col_counts.push_back(0);
3465 for (
size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3467 if (row->at(col_idx) ==
"" || !row->at(col_idx).compare(copy_params.
null_str)) {
3471 non_null_col_counts[col_idx]++;
3473 best_types[col_idx] = t;
3476 if (std::chrono::steady_clock::now() >
end_time) {
3480 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3483 if (non_null_col_counts[col_idx] == 0) {
3484 best_types[col_idx] =
kTEXT;
3492 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3493 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3494 const std::vector<SQLTypes>& best_types) {
3496 throw std::runtime_error(
"No rows found in: " +
3497 boost::filesystem::basename(
file_path));
3499 size_t num_cols = best_types.size();
3500 std::vector<EncodingType> best_encodes(num_cols,
kENCODING_NONE);
3501 std::vector<size_t> num_rows_per_col(num_cols, 1);
3502 std::vector<std::unordered_set<std::string>> count_set(num_cols);
3503 for (
auto row = row_begin; row != row_end; row++) {
3504 for (
size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3506 count_set[col_idx].insert(row->at(col_idx));
3507 num_rows_per_col[col_idx]++;
3511 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3514 static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3515 if (uniqueRatio < 0.75) {
3520 return best_encodes;
3528 const std::vector<SQLTypes>& tail_types) {
3529 if (head_types.size() != tail_types.size()) {
3533 for (
size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3534 if (head_types[col_idx] !=
kTEXT) {
3537 has_headers = has_headers || tail_types[col_idx] !=
kTEXT;
3543 #if defined(ENABLE_IMPORT_PARQUET)
3544 if (data_preview_.has_value()) {
3545 return data_preview_.value().sample_rows;
3551 std::vector<std::vector<std::string>> sample_rows(
raw_rows.begin() + offset,
3558 #if defined(ENABLE_IMPORT_PARQUET)
3559 if (data_preview_.has_value()) {
3560 return data_preview_.value().column_names;
3577 #if defined(ENABLE_IMPORT_PARQUET)
3578 if (data_preview_.has_value()) {
3579 return data_preview_.value().column_types;
3583 std::vector<SQLTypeInfo> types;
3592 void Importer::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3595 if (!
loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3603 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3608 loader->setTableEpochs(table_epochs);
3614 if (
loader->getTableDesc()->persistenceLevel ==
3621 if (!p->stringDictCheckpoint()) {
3622 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
3623 << p->getColumnDesc()->columnName <<
" failed.";
3632 LOG(
INFO) <<
"Dictionary Checkpointing took " << (double)ms / 1000.0 <<
" Seconds."
3644 std::vector<std::string> file_paths;
3658 for (
const auto&
file_path : file_paths) {
3674 #ifdef ENABLE_IMPORT_PARQUET
3675 import_parquet(file_paths, session_info);
3677 throw std::runtime_error(
"Parquet not supported!");
3687 #ifdef ENABLE_IMPORT_PARQUET
3690 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3693 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3711 const CopyParams& copy_params) {
3716 auto [foreign_server, user_mapping, foreign_table] =
3719 std::shared_ptr<arrow::fs::FileSystem> file_system;
3720 auto& server_options = foreign_server->options;
3725 file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3727 }
else if (server_options
3732 create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3738 auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3739 foreign_table.get(), file_system);
3750 #ifdef ENABLE_IMPORT_PARQUET
3752 !g_enable_legacy_parquet_import) {
3753 data_preview_ = get_parquet_data_preview(fp.string(), cp);
3762 #ifdef ENABLE_IMPORT_PARQUET
3764 std::shared_ptr<arrow::io::ReadableFile>& infile,
3765 std::unique_ptr<parquet::arrow::FileReader>& reader,
3766 std::shared_ptr<arrow::Table>& table) {
3767 using namespace parquet::arrow;
3768 auto file_result = arrow::io::ReadableFile::Open(file_path);
3769 PARQUET_THROW_NOT_OK(file_result.status());
3770 infile = file_result.ValueOrDie();
3772 PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3773 PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3774 const auto num_row_groups = reader->num_row_groups();
3775 const auto num_columns = table->num_columns();
3776 const auto num_rows = table->num_rows();
3777 LOG(
INFO) <<
"File " << file_path <<
" has " << num_rows <<
" rows and " << num_columns
3778 <<
" columns in " << num_row_groups <<
" groups.";
3779 return std::make_tuple(num_row_groups, num_columns, num_rows);
3782 void Detector::import_local_parquet(
const std::string& file_path,
3785 std::shared_ptr<arrow::io::ReadableFile> infile;
3786 std::unique_ptr<parquet::arrow::FileReader> reader;
3787 std::shared_ptr<arrow::Table> table;
3788 int num_row_groups, num_columns;
3790 std::tie(num_row_groups, num_columns, num_rows) =
3795 copy_params.line_delim =
'\n';
3796 copy_params.delimiter =
',';
3798 copy_params.quoted =
true;
3799 copy_params.quote =
'"';
3800 copy_params.escape =
'"';
3801 for (
int c = 0; c < num_columns; ++c) {
3805 raw_data += table->ColumnNames().at(c);
3807 raw_data += copy_params.line_delim;
3811 for (
int g = 0; g < num_row_groups; ++g) {
3813 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3814 std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3815 arrays.resize(num_columns);
3816 for (
int c = 0; c < num_columns; ++c) {
3817 PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3818 for (
auto chunk : arrays[c]->chunks()) {
3819 getters.push_back(
value_getter(*chunk,
nullptr,
nullptr));
3822 for (
int r = 0; r < num_rows; ++r) {
3823 for (
int c = 0; c < num_columns; ++c) {
3824 std::vector<std::string> buffer;
3825 for (
auto chunk : arrays[c]->chunks()) {
3826 DataBuffer<std::string> data(&cd, *chunk, buffer,
nullptr);
3830 if (!chunk->IsNull(r)) {
3832 raw_data += boost::replace_all_copy(
3833 (data << getters[c](*chunk, r)).buffer.front(),
"\"",
"\"\"");
3838 raw_data += copy_params.line_delim;
3850 template <
typename DATA_TYPE>
3852 std::vector<DATA_TYPE>& buffer,
3854 const auto old_size = buffer.size();
3856 for (
auto rit = bad_rows_tracker->
rows.crbegin(); rit != bad_rows_tracker->
rows.crend();
3858 buffer.erase(buffer.begin() + *rit);
3860 return std::make_tuple(old_size, buffer.size());
3864 BadRowsTracker*
const bad_rows_tracker) {
3899 throw std::runtime_error(
"Invalid Type");
3903 void Importer::import_local_parquet(
const std::string& file_path,
3905 std::shared_ptr<arrow::io::ReadableFile> infile;
3906 std::unique_ptr<parquet::arrow::FileReader> reader;
3907 std::shared_ptr<arrow::Table> table;
3908 int num_row_groups, num_columns;
3909 int64_t nrow_in_file;
3910 std::tie(num_row_groups, num_columns, nrow_in_file) =
3915 std::vector<const ColumnDescriptor*> cds;
3917 int num_physical_cols = 0;
3918 for (
auto& cd : column_list) {
3922 arrow_throw_if(num_columns != (
int)(column_list.size() - num_physical_cols),
3923 "Unmatched numbers of columns in parquet file " + file_path +
": " +
3926 " columns in table.");
3930 const int num_slices = std::max<decltype(max_threads)>(
max_threads, num_columns);
3933 size_t nrow_completed{0};
3936 auto get_physical_col_idx = [&cds](
const int logic_col_idx) ->
auto {
3937 int physical_col_idx = 0;
3938 for (
int i = 0; i < logic_col_idx; ++i) {
3939 physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3941 return physical_col_idx;
3944 auto query_session = session_info ? session_info->
get_session_id() :
"";
3946 for (
int row_group = 0; row_group < num_row_groups; ++row_group) {
3961 for (
int slice = 0; slice < num_slices; slice++) {
3963 for (
const auto cd : cds) {
3965 new TypedImportBuffer(cd,
loader->getStringDict(cd)));
3977 std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3978 for (
size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3979 auto& bad_rows_tracker = bad_rows_trackers[slice];
3981 bad_rows_tracker.row_group = slice;
3982 bad_rows_tracker.importer =
this;
3985 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3986 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3987 const auto cd = cds[physical_col_idx];
3988 std::shared_ptr<arrow::ChunkedArray> array;
3989 PARQUET_THROW_NOT_OK(
3990 reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3991 const size_t array_size = array->length();
3992 const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3994 for (
int slice = 0; slice < num_slices; ++slice) {
4000 thread_controller.startThread([&, slice] {
4001 const auto slice_offset = slice % num_slices;
4003 std::min<size_t>((slice_offset + 0) * slice_size, array_size),
4004 std::min<size_t>((slice_offset + 1) * slice_size, array_size));
4005 auto& bad_rows_tracker = bad_rows_trackers[slice];
4008 import_buffer->col_idx = physical_col_idx + 1;
4009 for (
auto chunk : array->chunks()) {
4010 import_buffer->add_arrow_values(
4011 cd, *chunk,
false, slice_range, &bad_rows_tracker);
4015 thread_controller.finish();
4017 std::vector<size_t> nrow_in_slice_raw(num_slices);
4018 std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
4020 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
4021 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
4022 const auto cd = cds[physical_col_idx];
4023 for (
int slice = 0; slice < num_slices; ++slice) {
4024 auto& bad_rows_tracker = bad_rows_trackers[slice];
4026 std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
4031 for (
int slice = 0; slice < num_slices; ++slice) {
4033 nrow_in_slice_successfully_loaded[slice],
4037 const auto nrow_original =
4038 std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
4039 const auto nrow_imported =
4041 nrow_in_slice_successfully_loaded.end(),
4043 const auto nrow_dropped = nrow_original - nrow_imported;
4044 LOG(
INFO) <<
"row group " << row_group <<
": add " << nrow_imported
4045 <<
" rows, drop " << nrow_dropped <<
" rows.";
4053 ") rows rejected exceeded. Halting load.";
4054 LOG(
ERROR) <<
"Maximum (" << copy_params.max_reject
4055 <<
") rows rejected exceeded. Halting load.";
4060 nrow_completed += nrow_imported;
4062 nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
4064 const auto total_file_offset =
4067 if (total_file_offset) {
4073 << total_file_offset;
4077 LOG(
INFO) <<
"Import " << nrow_in_file <<
" rows of parquet file " << file_path
4078 <<
" took " << (double)ms_load_a_file / 1000.0 <<
" secs";
4081 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4083 auto importer =
dynamic_cast<Importer*
>(
this);
4084 auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4085 : std::vector<Catalog_Namespace::TableEpochInfo>{};
4087 std::exception_ptr teptr;
4090 for (
auto const& file_path : file_paths) {
4091 std::map<int, std::string> url_parts;
4095 std::vector<std::string> objkeys;
4096 std::unique_ptr<S3ParquetArchive> us3arch;
4097 if (
"s3" == url_parts[2]) {
4100 copy_params.s3_access_key,
4101 copy_params.s3_secret_key,
4102 copy_params.s3_session_token,
4103 copy_params.s3_region,
4104 copy_params.s3_endpoint,
4105 copy_params.plain_text,
4106 copy_params.regex_path_filter,
4107 copy_params.file_sort_order_by,
4108 copy_params.file_sort_regex));
4109 us3arch->init_for_read();
4111 objkeys = us3arch->get_objkeys();
4113 throw std::runtime_error(
"AWS S3 support not available");
4114 #endif // HAVE_AWS_S3
4116 objkeys.emplace_back(file_path);
4121 for (
auto const& objkey : objkeys) {
4125 ? us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this))
4127 import_local_parquet(file_path, session_info);
4129 us3arch->vacuum(objkey);
4133 us3arch->vacuum(objkey);
4145 std::rethrow_exception(teptr);
4152 }
catch (
const std::exception& e) {
4159 importer->checkpoint(table_epochs);
4162 #endif // ENABLE_IMPORT_PARQUET
4165 std::vector<std::string>& file_paths,
4175 _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4177 auto pipe_res = pipe(fd);
4180 throw std::runtime_error(std::string(
"failed to create a pipe: ") + strerror(errno));
4183 signal(SIGPIPE, SIG_IGN);
4186 std::exception_ptr teptr;
4190 auto th_pipe_reader = std::thread([&]() {
4193 if (0 == (
p_file = fdopen(fd[0],
"r"))) {
4194 throw std::runtime_error(std::string(
"failed to open a pipe: ") +
4204 teptr = std::current_exception();
4217 auto th_pipe_writer = std::thread([&]() {
4218 std::unique_ptr<S3Archive> us3arch;
4220 for (
size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4222 auto file_path = file_paths[fi];
4223 std::unique_ptr<Archive> uarch;
4224 std::map<int, std::string> url_parts;
4226 const std::string S3_objkey_url_scheme =
"s3ok";
4227 if (
"file" == url_parts[2] ||
"" == url_parts[2]) {
4229 }
else if (
"s3" == url_parts[2]) {
4234 copy_params.s3_access_key,
4235 copy_params.s3_secret_key,
4236 copy_params.s3_session_token,
4237 copy_params.s3_region,
4238 copy_params.s3_endpoint,
4239 copy_params.plain_text,
4240 copy_params.regex_path_filter,
4241 copy_params.file_sort_order_by,
4242 copy_params.file_sort_regex));
4243 us3arch->init_for_read();
4246 for (
const auto& objkey : us3arch->get_objkeys()) {
4247 file_paths.emplace_back(std::string(S3_objkey_url_scheme) +
"://" + objkey);
4251 throw std::runtime_error(
"AWS S3 support not available");
4252 #endif // HAVE_AWS_S3
4253 }
else if (S3_objkey_url_scheme == url_parts[2]) {
4255 auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4257 us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this));
4258 if (0 == file_path.size()) {
4259 throw std::runtime_error(std::string(
"failed to land s3 object: ") + objkey);
4263 us3arch->vacuum(objkey);
4265 throw std::runtime_error(
"AWS S3 support not available");
4266 #endif // HAVE_AWS_S3
4268 #if 0 // TODO(ppan): implement and enable any other archive class
4270 if (
"hdfs" == url_parts[2])
4271 uarch.reset(
new HdfsArchive(file_path));
4274 throw std::runtime_error(std::string(
"unsupported archive url: ") + file_path);
4278 auto& arch = *uarch;
4284 bool just_saw_archive_header;
4285 bool is_detecting =
nullptr !=
dynamic_cast<Detector*
>(
this);
4286 bool first_text_header_skipped =
false;
4290 size_t num_block_read = 0;
4291 while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4292 bool insert_line_delim_after_this_file =
false;
4295 auto ok = arch.read_data_block(&buf, &size, &offset);
4311 const char* buf2 = (
const char*)buf;
4314 just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4315 while (size2-- > 0) {
4316 if (*buf2++ == copy_params.line_delim) {
4321 LOG(
WARNING) <<
"No line delimiter in block." << std::endl;
4323 just_saw_archive_header =
false;
4324 first_text_header_skipped =
true;
4333 int nremaining = size2;
4334 while (nremaining > 0) {
4336 int nwritten =
write(fd[1], buf2, nremaining);
4340 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4343 }
else if (errno == EPIPE &&
4350 throw std::runtime_error(
4351 std::string(
"failed or interrupted write to pipe: ") +
4354 }
else if (nwritten == nremaining) {
4359 nremaining -= nwritten;
4370 const char* plast =
static_cast<const char*
>(buf) + (size - 1);
4371 insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4379 if (insert_line_delim_after_this_file) {
4382 int nwritten =
write(fd[1], ©_params.line_delim, 1);
4386 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4389 }
else if (errno == EPIPE &&
4396 throw std::runtime_error(
4397 std::string(
"failed or interrupted write to pipe: ") +
4400 }
else if (nwritten == 1) {
4416 if (
nullptr != dynamic_cast<Detector*>(
this)) {
4421 teptr = std::current_exception();
4430 th_pipe_reader.join();
4431 th_pipe_writer.join();
4435 std::rethrow_exception(teptr);
4444 const std::string& file_path,
4445 const bool decompressed,
4448 auto query_session = session_info ? session_info->
get_session_id() :
"";
4454 throw std::runtime_error(
"failed to open file '" + file_path +
4455 "': " + strerror(errno));
4458 if (!decompressed) {
4459 (void)fseek(
p_file, 0, SEEK_END);
4467 size_t alloc_size = copy_params.buffer_size;
4468 if (!decompressed &&
file_size < alloc_size) {
4474 for (
const auto cd :
loader->get_column_descs()) {
4476 std::make_unique<TypedImportBuffer>(cd,
loader->getStringDict(cd)));
4480 auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4481 size_t current_pos = 0;
4483 size_t begin_pos = 0;
4485 (void)fseek(
p_file, current_pos, SEEK_SET);
4487 fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size,
p_file);
4491 if (copy_params.geo_assign_render_groups) {
4494 auto* td =
loader->getTableDesc();
4496 auto column_descriptors =
4497 cat.getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
4498 for (
auto const& cd : column_descriptors) {
4500 auto rga = std::make_shared<RenderGroupAnalyzer>();
4502 columnIdToRenderGroupAnalyzerMap[cd->
columnId] = rga;
4507 throw std::runtime_error(
4508 "Render Group Assignment requested in CopyParams but disabled in Server "
4509 "Config. Set enable_assign_render_groups=true in Server Config to override.");
4514 loader->getTableDesc()->tableId};
4515 auto table_epochs =
loader->getTableEpochs();
4518 std::list<std::future<ImportStatus>> threads;
4522 std::stack<size_t> stack_thread_ids;
4524 stack_thread_ids.push(i);
4527 size_t first_row_index_this_buffer = 0;
4530 unsigned int num_rows_this_buffer = 0;
4531 CHECK(scratch_buffer);
4536 first_row_index_this_buffer,
4537 num_rows_this_buffer,
4541 int nresidual = size - end_pos;
4542 std::unique_ptr<char[]> unbuf;
4543 if (nresidual > 0) {
4544 unbuf = std::make_unique<char[]>(nresidual);
4545 memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4549 auto thread_id = stack_thread_ids.top();
4550 stack_thread_ids.pop();
4557 std::move(scratch_buffer),
4561 columnIdToRenderGroupAnalyzerMap,
4562 first_row_index_this_buffer,
4566 first_row_index_this_buffer += num_rows_this_buffer;
4568 current_pos += end_pos;
4569 scratch_buffer = std::make_unique<char[]>(alloc_size);
4570 CHECK(scratch_buffer);
4571 memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4573 fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual,
p_file);
4576 while (threads.size() > 0) {
4578 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4579 it != threads.end();) {
4581 std::chrono::milliseconds span(0);
4582 if (p.wait_for(span) == std::future_status::ready) {
4583 auto ret_import_status = p.get();
4587 if (ret_import_status.load_failed) {
4592 size_t total_file_offset{0};
4596 total_file_offset += file_offset;
4600 if (decompressed ? total_file_offset : current_pos) {
4609 << total_file_offset;
4612 stack_thread_ids.push(ret_import_status.thread_id);
4613 threads.erase(it++);
4621 std::this_thread::yield();
4644 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";
4654 for (
auto& p : threads) {
4680 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4686 const std::string& file_name,
4695 throw std::runtime_error(
"Unexpected CopyParams.source_type (" +
4706 const std::string& file_name) {
4708 OGRLayer* poLayer =
nullptr;
4709 if (geo_layer_name.size()) {
4710 poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4711 if (poLayer ==
nullptr) {
4712 throw std::runtime_error(
"Layer '" + geo_layer_name +
"' not found in " +
4716 poLayer = poDS->GetLayer(0);
4717 if (poLayer ==
nullptr) {
4718 throw std::runtime_error(
"No layers found in " + file_name);
4728 const std::string& file_name,
4729 const std::string& geo_column_name,
4730 std::map<std::string, std::vector<std::string>>& sample_data,
4735 if (datasource ==
nullptr) {
4736 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4743 auto const* feature_defn = layer.GetLayerDefn();
4744 CHECK(feature_defn);
4747 auto const metadata_column_infos =
4752 auto const feature_count =
static_cast<uint64_t
>(layer.GetFeatureCount());
4753 auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4756 for (
int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4757 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4759 sample_data[column_name] = {};
4761 sample_data[geo_column_name] = {};
4762 for (
auto const& mci : metadata_column_infos) {
4763 sample_data[mci.column_descriptor.columnName] = {};
4767 layer.ResetReading();
4770 uint64_t feature_index{0u};
4771 while (feature_index < num_features) {
4779 auto const* geometry = feature->GetGeometryRef();
4780 if (geometry ==
nullptr) {
4785 switch (wkbFlatten(geometry->getGeometryType())) {
4789 case wkbMultiLineString:
4791 case wkbMultiPolygon:
4794 throw std::runtime_error(
"Unsupported geometry type: " +
4795 std::string(geometry->getGeometryName()));
4799 for (
int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4800 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4801 sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4805 for (
auto const& mci : metadata_column_infos) {
4806 sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4810 char* wkts =
nullptr;
4811 geometry->exportToWkt(&wkts);
4813 sample_data[geo_column_name].push_back(wkts);
4826 return std::make_pair(
kINT,
false);
4827 case OFTIntegerList:
4828 return std::make_pair(
kINT,
true);
4829 #if GDAL_VERSION_MAJOR > 1
4831 return std::make_pair(
kBIGINT,
false);
4832 case OFTInteger64List:
4833 return std::make_pair(
kBIGINT,
true);
4836 return std::make_pair(
kDOUBLE,
false);
4838 return std::make_pair(
kDOUBLE,
true);
4840 return std::make_pair(
kTEXT,
false);
4842 return std::make_pair(
kTEXT,
true);
4844 return std::make_pair(
kDATE,
false);
4846 return std::make_pair(
kTIME,
false);
4854 return std::make_pair(
kTINYINT,
true);
4858 throw std::runtime_error(
"Unknown OGR field type: " +
std::to_string(ogr_type));
4869 case wkbMultiLineString:
4873 case wkbMultiPolygon:
4878 throw std::runtime_error(
"Unknown OGR geom type: " +
std::to_string(ogr_type));
4883 switch (raster_point_type) {
4905 switch (raster_point_transform) {
4923 const std::string& file_name,
4924 const bool is_raster,
4925 const std::string& geo_column_name,
4935 const std::string& file_name,
4936 const std::string& geo_column_name,
4947 auto metadata_column_infos =
4960 metadata_column_infos);
4963 std::list<ColumnDescriptor> cds;
4969 for (
auto const& [col_name, sql_type] : point_names_and_sql_types) {
4974 if (sql_type ==
kPOINT) {
4988 for (
auto const& [band_name, sql_type] : band_names_and_types) {
4997 for (
auto& mci : metadata_column_infos) {
4998 cds.push_back(std::move(mci.column_descriptor));
5007 const std::string& file_name,
5008 const std::string& geo_column_name,
5010 std::list<ColumnDescriptor> cds;
5013 if (poDS ==
nullptr) {
5014 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5017 if (poDS->GetLayerCount() == 0) {
5018 throw std::runtime_error(
"gdalToColumnDescriptors Error: Geo file " + file_name +
5025 layer.ResetReading();
5028 if (poFeature ==
nullptr) {
5029 throw std::runtime_error(
"No features found in " + file_name);
5032 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5035 for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5036 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5037 auto typePair =
ogr_to_type(poFieldDefn->GetType());
5040 cd.sourceName = poFieldDefn->GetNameRef();
5042 if (typePair.second) {
5048 if (typePair.first ==
kTEXT) {
5057 auto ogr_type = wkbFlatten(layer.GetGeomType());
5058 if (ogr_type == wkbUnknown) {
5061 CHECK(first_feature);
5062 auto const* ogr_geometry = first_feature->GetGeometryRef();
5064 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5070 if (ogr_type != wkbNone) {
5077 if (ogr_type == wkbMultiPolygon) {
5078 ogr_type = wkbPolygon;
5079 }
else if (ogr_type == wkbMultiLineString) {
5080 ogr_type = wkbLineString;
5081 }
else if (ogr_type == wkbMultiPoint) {
5082 ogr_type = wkbPoint;
5109 auto metadata_column_infos =
5111 for (
auto& mci : metadata_column_infos) {
5112 cds.push_back(std::move(mci.column_descriptor));
5129 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5133 VSICurlClearCache();
5138 int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5144 if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5146 }
else if (VSI_ISREG(sb.st_mode)) {
5164 std::vector<std::string>& files) {
5166 std::vector<std::string> subdirectories;
5169 char** entries = VSIReadDir(archive_path.c_str());
5171 LOG(
WARNING) <<
"Failed to get file listing at archive: " << archive_path;
5178 ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5184 char* entry_c = entries[index++];
5188 std::string entry(entry_c);
5191 if (entry ==
"." || entry ==
"..") {
5196 std::string entry_path = archive_path + std::string(
"/") + entry;
5200 int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5205 if (VSI_ISDIR(sb.st_mode)) {
5209 if (boost::iends_with(entry_path,
".gdb")) {
5211 files.push_back(entry_path);
5214 subdirectories.push_back(entry_path);
5218 files.push_back(entry_path);
5224 for (
const auto& subdirectory : subdirectories) {
5231 const std::string& archive_path,
5242 std::vector<std::string> files;
5248 for (
auto& file : files) {
5249 file.erase(0, archive_path.size() + 1);
5258 const std::string& file_name,
5269 std::vector<GeoFileLayerInfo> layer_info;
5273 if (poDS ==
nullptr) {
5274 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5279 for (
auto&& poLayer : poDS->GetLayers()) {
5282 poLayer->ResetReading();
5284 if (poLayer->GetFeatureCount() > 0) {
5286 auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5287 if (ogr_type == wkbUnknown) {
5290 CHECK(first_feature);
5291 auto const* ogr_geometry = first_feature->GetGeometryRef();
5293 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5306 case wkbMultiLineString:
5308 case wkbMultiPolygon:
5319 layer_info.emplace_back(poLayer->GetName(), contents);
5329 const bool is_raster) {
5333 return importGDALGeo(columnNameToSourceNameMap, session_info);
5342 if (poDS ==
nullptr) {
5343 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5351 size_t numFeatures = layer.GetFeatureCount();
5356 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5358 size_t numFields = poFDefn->GetFieldCount();
5359 for (
size_t iField = 0; iField < numFields; iField++) {
5360 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5361 fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5366 poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5368 #if GDAL_VERSION_MAJOR >= 3
5372 poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5375 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5385 auto const metadata_column_infos =
5392 auto query_session = session_info ? session_info->
get_session_id() :
"";
5393 auto query_submitted_time =
::toString(std::chrono::system_clock::now());
5395 auto is_session_already_registered =
false;
5398 executor->getSessionLock());
5399 is_session_already_registered =
5400 executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5403 !is_session_already_registered) {
5404 executor->enrollQuerySession(query_session,
5406 query_submitted_time,
5408 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5410 ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5413 executor->clearQuerySessionStatus(query_session, query_submitted_time);
5421 for (
const auto cd :
loader->get_column_descs()) {
5429 if (copy_params.geo_assign_render_groups) {
5432 auto* td =
loader->getTableDesc();
5434 auto column_descriptors =
5435 cat.getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5436 for (
auto const& cd : column_descriptors) {
5438 auto rga = std::make_shared<RenderGroupAnalyzer>();
5440 columnIdToRenderGroupAnalyzerMap[cd->
columnId] = rga;
5444 throw std::runtime_error(
5445 "Render Group Assignment requested in CopyParams but disabled in Server "
5446 "Config. Set enable_assign_render_groups=true in Server Config to override.");
5450 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5452 std::list<std::future<ImportStatus>> threads;
5456 std::stack<size_t> stack_thread_ids;
5458 stack_thread_ids.push(i);
5463 auto table_epochs =
loader->getTableEpochs();
5466 layer.ResetReading();
5468 static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5471 std::vector<FeaturePtrVector> features(max_threads);
5474 std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(