25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
49 #include <unordered_map>
50 #include <unordered_set>
58 #ifdef ENABLE_IMPORT_PARQUET
61 #if defined(ENABLE_IMPORT_PARQUET)
65 #ifdef ENABLE_IMPORT_PARQUET
93 #include "gen-cpp/Heavy.h"
100 #define TIMER_STOP(t) \
101 (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
116 boost::filesystem::path boost_file_path{file_path};
119 return ec ? 0 : filesize;
127 executor->getSessionLock());
128 return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
137 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
139 for (
size_t i = 0; i < row.size(); ++i) {
140 out << (i ?
", " :
"") << row[i];
148 namespace import_export {
154 #define DEBUG_TIMING false
155 #define DEBUG_RENDER_GROUP_ANALYZER 0
156 #define DEBUG_AWS_AUTHENTICATION 0
158 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
174 const std::string&
f,
188 auto is_array = std::unique_ptr<bool[]>(
new bool[
loader->get_column_descs().size()]);
190 bool has_array =
false;
192 int skip_physical_cols = 0;
193 for (
auto& p :
loader->get_column_descs()) {
195 if (skip_physical_cols-- > 0) {
200 if (p->isVirtualCol || p->isDeletedCol) {
203 skip_physical_cols = p->columnType.get_physical_cols();
204 if (p->columnType.get_type() ==
kARRAY) {
205 is_array.get()[i] =
true;
208 is_array.get()[i] =
false;
213 is_array_a = std::unique_ptr<bool[]>(is_array.release());
215 is_array_a = std::unique_ptr<bool[]>(
nullptr);
223 if (
buffer[0] !=
nullptr) {
226 if (
buffer[1] !=
nullptr) {
235 throw std::runtime_error(
"Import status not found for id: " + import_id);
242 is.
end = std::chrono::steady_clock::now();
243 is.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.
end - is.
start);
250 while (i < j && (field[i] ==
' ' || field[i] ==
'\r')) {
253 while (i < j && (field[j - 1] ==
' ' || field[j - 1] ==
'\r')) {
256 return std::string(field + i, j - i);
309 throw std::runtime_error(
"Internal error: geometry type in NullArrayDatum.");
311 throw std::runtime_error(
"Internal error: invalid type in NullArrayDatum.");
320 if (s == copy_params.
null_str || s ==
"NULL" || s.empty()) {
327 std::vector<std::string> elem_strs;
329 for (
size_t i = s.find(copy_params.
array_delim, 1); i != std::string::npos;
331 elem_strs.push_back(s.substr(last, i - last));
334 if (last + 1 <= s.size()) {
335 elem_strs.push_back(s.substr(last, s.size() - 1 - last));
337 if (elem_strs.size() == 1) {
338 auto str = elem_strs.front();
339 auto str_trimmed =
trim_space(str.c_str(), str.length());
340 if (str_trimmed ==
"") {
345 size_t len = elem_strs.size() * elem_ti.
get_size();
346 std::unique_ptr<int8_t, FreeDeleter> buf(
348 int8_t* p = buf.get();
349 for (
auto& es : elem_strs) {
356 if (!isdigit(e[0]) && e[0] !=
'-') {
384 while ((p - buf) < len) {
388 CHECK((p - buf) == len);
405 const size_t len = compressed_null_coords.size();
407 memcpy(buf, compressed_null_coords.data(), len);
410 auto modified_ti = coords_ti;
416 const auto& arr = datum.val.arr_val;
417 for (
const auto& elem_datum : arr) {
418 string_vec.push_back(elem_datum.val.str_val);
462 throw std::runtime_error(
"Internal error: geometry type in TDatumToDatum.");
464 throw std::runtime_error(
"Internal error: invalid type in TDatumToDatum.");
478 size_t len = datum.val.arr_val.size() * elem_ti.
get_size();
481 for (
auto& e : datum.val.arr_val) {
491 std::vector<std::string_view> string_view_vec;
492 string_view_vec.reserve(string_vec.size());
493 for (
const auto& str : string_vec) {
495 std::ostringstream oss;
497 <<
" a string was detected too long for encoding, string length = "
498 << str.size() <<
", first 100 characters are '" << str.substr(0, 100) <<
"'";
499 throw std::runtime_error(oss.str());
501 string_view_vec.push_back(str);
520 }
catch (std::exception& e) {
521 std::ostringstream oss;
523 <<
" : " << e.what();
525 throw std::runtime_error(oss.str());
530 const std::string_view val,
533 const bool check_not_null) {
539 throw std::runtime_error(
"NULL for column " + cd->
columnName);
550 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
556 throw std::runtime_error(
"NULL for column " + cd->
columnName);
563 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
569 throw std::runtime_error(
"NULL for column " + cd->
columnName);
576 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
582 throw std::runtime_error(
"NULL for column " + cd->
columnName);
589 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
595 throw std::runtime_error(
"NULL for column " + cd->
columnName);
611 throw std::runtime_error(
"NULL for column " + cd->
columnName);
618 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
619 addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
622 throw std::runtime_error(
"NULL for column " + cd->
columnName);
628 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
629 addDouble(std::atof(std::string(val).c_str()));
632 throw std::runtime_error(
"NULL for column " + cd->
columnName);
643 throw std::runtime_error(
"NULL for column " + cd->
columnName);
648 throw std::runtime_error(
"String too long for column " + cd->
columnName +
659 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
665 throw std::runtime_error(
"NULL for column " + cd->
columnName);
672 throw std::runtime_error(
"NULL for column " + cd->
columnName);
676 std::vector<std::string> string_vec;
679 std::string(val), copy_params, string_vec);
683 size_t expected_size = ti.
get_size() / sti.get_size();
684 size_t actual_size = string_vec.size();
685 if (actual_size != expected_size) {
686 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
688 " values, received " +
703 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
704 " has incorrect length: " + std::string(val));
723 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
779 CHECK(
false) <<
"TypedImportBuffer::pop_value() does not support type " <<
type;
784 using std::runtime_error::runtime_error;
788 template <
typename DATA_TYPE>
792 std::vector<DATA_TYPE>& buffer,
796 std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
797 auto f_value_getter =
value_getter(array, cd, bad_rows_tracker);
798 std::function<void(const int64_t)> f_add_geo_phy_cols = [&](
const int64_t row) {};
799 if (bad_rows_tracker && cd->columnType.is_geometry()) {
800 f_add_geo_phy_cols = [&](
const int64_t row) {
802 std::vector<double> coords, bounds;
803 std::vector<int> ring_sizes, poly_rings;
810 if (array.IsNull(row)) {
812 import_ti, coords, bounds, ring_sizes, poly_rings);
814 const bool validate_with_geos_if_available =
false;
815 arrow_throw_if<GeoImportException>(
823 validate_with_geos_if_available),
825 arrow_throw_if<GeoImportException>(
826 cd->columnType.get_type() != ti.
get_type(),
827 error_context(cd, bad_rows_tracker) +
"Geometry type mismatch");
829 auto col_idx_workpad =
col_idx;
841 }
catch (std::runtime_error& e) {
843 }
catch (
const std::exception& e) {
850 auto f_mark_a_bad_row = [&](
const auto row) {
851 std::unique_lock<std::mutex> lck(bad_rows_tracker->
mutex);
852 bad_rows_tracker->
rows.insert(row - slice_range.first);
854 buffer.reserve(slice_range.second - slice_range.first);
855 for (
size_t row = slice_range.first; row < slice_range.second; ++row) {
857 *data << (array.IsNull(row) ?
nullptr : f_value_getter(array, row));
858 f_add_geo_phy_cols(row);
860 f_mark_a_bad_row(row);
863 if (bad_rows_tracker) {
865 f_mark_a_bad_row(row);
871 return buffer.size();
876 const bool exact_type_match,
887 if (exact_type_match) {
888 arrow_throw_if(col.type_id() != Type::BOOL,
"Expected boolean type");
893 if (exact_type_match) {
894 arrow_throw_if(col.type_id() != Type::INT8,
"Expected int8 type");
899 if (exact_type_match) {
900 arrow_throw_if(col.type_id() != Type::INT16,
"Expected int16 type");
905 if (exact_type_match) {
906 arrow_throw_if(col.type_id() != Type::INT32,
"Expected int32 type");
909 cd, col, *
int_buffer_, slice_range, bad_rows_tracker);
913 if (exact_type_match) {
914 arrow_throw_if(col.type_id() != Type::INT64,
"Expected int64 type");
919 if (exact_type_match) {
920 arrow_throw_if(col.type_id() != Type::FLOAT,
"Expected float type");
925 if (exact_type_match) {
926 arrow_throw_if(col.type_id() != Type::DOUBLE,
"Expected double type");
933 if (exact_type_match) {
935 "Expected string type");
940 if (exact_type_match) {
941 arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
942 "Expected time32 or time64 type");
947 if (exact_type_match) {
948 arrow_throw_if(col.type_id() != Type::TIMESTAMP,
"Expected timestamp type");
953 if (exact_type_match) {
954 arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
955 "Expected date32 or date64 type");
966 "Expected string type");
970 throw std::runtime_error(
"Arrow array appends not yet supported");
972 throw std::runtime_error(
"Invalid Type");
981 if (
std::any_of(col.nulls.begin(), col.nulls.end(), [](
int i) {
return i != 0; })) {
982 throw std::runtime_error(
"NULL for column " + cd->
columnName);
988 dataSize = col.data.int_col.size();
990 for (
size_t i = 0; i < dataSize; i++) {
1000 dataSize = col.data.int_col.size();
1002 for (
size_t i = 0; i < dataSize; i++) {
1012 dataSize = col.data.int_col.size();
1014 for (
size_t i = 0; i < dataSize; i++) {
1024 dataSize = col.data.int_col.size();
1026 for (
size_t i = 0; i < dataSize; i++) {
1030 int_buffer_->push_back((int32_t)col.data.int_col[i]);
1038 dataSize = col.data.int_col.size();
1040 for (
size_t i = 0; i < dataSize; i++) {
1050 dataSize = col.data.real_col.size();
1052 for (
size_t i = 0; i < dataSize; i++) {
1062 dataSize = col.data.real_col.size();
1064 for (
size_t i = 0; i < dataSize; i++) {
1077 dataSize = col.data.str_col.size();
1079 for (
size_t i = 0; i < dataSize; i++) {
1091 dataSize = col.data.int_col.size();
1093 for (
size_t i = 0; i < dataSize; i++) {
1097 bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1108 dataSize = col.data.str_col.size();
1110 for (
size_t i = 0; i < dataSize; i++) {
1121 dataSize = col.data.arr_col.size();
1123 for (
size_t i = 0; i < dataSize; i++) {
1125 if (!col.nulls[i]) {
1126 size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1127 for (
size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1128 string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1136 for (
size_t i = 0; i < dataSize; i++) {
1140 size_t len = col.data.arr_col[i].data.int_col.size();
1141 size_t byteSize = len *
sizeof(int8_t);
1144 for (
size_t j = 0; j < len; ++j) {
1148 if (col.data.arr_col[i].nulls[j]) {
1149 *p =
static_cast<int8_t
>(
1152 *(
bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1162 for (
size_t i = 0; i < dataSize; i++) {
1166 size_t len = col.data.arr_col[i].data.int_col.size();
1167 size_t byteSize = len *
sizeof(int8_t);
1170 for (
size_t j = 0; j < len; ++j) {
1171 *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1172 p +=
sizeof(int8_t);
1180 for (
size_t i = 0; i < dataSize; i++) {
1184 size_t len = col.data.arr_col[i].data.int_col.size();
1185 size_t byteSize = len *
sizeof(int16_t);
1188 for (
size_t j = 0; j < len; ++j) {
1190 static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1191 p +=
sizeof(int16_t);
1199 for (
size_t i = 0; i < dataSize; i++) {
1203 size_t len = col.data.arr_col[i].data.int_col.size();
1204 size_t byteSize = len *
sizeof(int32_t);
1207 for (
size_t j = 0; j < len; ++j) {
1209 static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1210 p +=
sizeof(int32_t);
1220 for (
size_t i = 0; i < dataSize; i++) {
1224 size_t len = col.data.arr_col[i].data.int_col.size();
1225 size_t byteSize = len *
sizeof(int64_t);
1228 for (
size_t j = 0; j < len; ++j) {
1230 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1231 p +=
sizeof(int64_t);
1239 for (
size_t i = 0; i < dataSize; i++) {
1243 size_t len = col.data.arr_col[i].data.real_col.size();
1244 size_t byteSize = len *
sizeof(float);
1247 for (
size_t j = 0; j < len; ++j) {
1248 *(
float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1257 for (
size_t i = 0; i < dataSize; i++) {
1261 size_t len = col.data.arr_col[i].data.real_col.size();
1262 size_t byteSize = len *
sizeof(double);
1265 for (
size_t j = 0; j < len; ++j) {
1266 *(
double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1267 p +=
sizeof(double);
1277 for (
size_t i = 0; i < dataSize; i++) {
1281 size_t len = col.data.arr_col[i].data.int_col.size();
1282 size_t byteWidth =
sizeof(int64_t);
1283 size_t byteSize = len * byteWidth;
1286 for (
size_t j = 0; j < len; ++j) {
1287 *
reinterpret_cast<int64_t*
>(p) =
1288 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1289 p +=
sizeof(int64_t);
1297 throw std::runtime_error(
"Invalid Array Type");
1303 throw std::runtime_error(
"Invalid Type");
1309 const TDatum& datum,
1317 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1330 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1340 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1347 addInt((int32_t)datum.val.int_val);
1350 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1360 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1367 addFloat((
float)datum.val.real_val);
1370 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1380 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1391 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1406 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1414 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1435 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1443 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
1510 static_cast<float>(std::atof(std::string(val).c_str())));
1517 double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1528 if (val.length() > ti.get_max_strlen()) {
1529 throw std::runtime_error(
"String too long for column " + cd->
columnName +
1548 std::vector<std::string> string_vec;
1551 std::string(val), cp, string_vec);
1554 if (ti.get_size() > 0) {
1555 auto sti = ti.get_elem_type();
1556 size_t expected_size = ti.get_size() / sti.get_size();
1557 size_t actual_size = string_vec.size();
1558 if (actual_size != expected_size) {
1559 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1561 " values, received " +
1567 if (ti.get_size() > 0) {
1569 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1570 " currently cannot accept NULL arrays");
1582 if (ti.get_size() > 0 &&
static_cast<size_t>(ti.get_size()) != d.length) {
1583 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
1584 " has incorrect length: " + std::string(val));
1603 CHECK(
false) <<
"TypedImportBuffer::addDefaultValues() does not support type "
1610 std::vector<double>& coords,
1611 std::vector<double>& bounds,
1613 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1618 if (!pt.transform(ti)) {
1623 coords.push_back(lon);
1624 coords.push_back(lat);
1629 bounds.push_back(coords[0]);
1630 bounds.push_back(coords[1]);
1631 bounds.push_back(coords[0]);
1632 bounds.push_back(coords[1]);
1639 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1641 std::vector<double>& coords,
1642 std::vector<double>& bounds,
1643 std::vector<int>& ring_sizes,
1644 std::vector<int>& poly_rings,
1645 const bool force_null) {
1647 const auto col_type = col_ti.
get_type();
1650 bool is_null_geo =
false;
1652 if (!col_ti.get_notnull()) {
1655 is_null_point =
true;
1658 is_null_geo = coords.empty();
1659 if (is_null_point) {
1664 is_null_geo =
false;
1676 tdd_coords.val.arr_val.reserve(compressed_coords.size());
1677 for (
auto cc : compressed_coords) {
1678 tdd_coords.val.arr_val.emplace_back();
1679 tdd_coords.val.arr_val.back().val.int_val = cc;
1682 tdd_coords.is_null = is_null_geo;
1683 import_buffers[col_idx++]->add_value(cd_coords, tdd_coords,
false);
1688 TDatum tdd_ring_sizes;
1689 tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1691 for (
auto ring_size : ring_sizes) {
1692 tdd_ring_sizes.val.arr_val.emplace_back();
1693 tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1696 tdd_ring_sizes.is_null = is_null_geo;
1697 import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1703 TDatum tdd_poly_rings;
1704 tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1706 for (
auto num_rings : poly_rings) {
1707 tdd_poly_rings.val.arr_val.emplace_back();
1708 tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1711 tdd_poly_rings.is_null = is_null_geo;
1712 import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1719 tdd_bounds.val.arr_val.reserve(bounds.size());
1721 for (
auto b : bounds) {
1722 tdd_bounds.val.arr_val.emplace_back();
1723 tdd_bounds.val.arr_val.back().val.real_val = b;
1726 tdd_bounds.is_null = is_null_geo;
1727 import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds,
false);
1734 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1736 std::vector<std::vector<double>>& coords_column,
1737 std::vector<std::vector<double>>& bounds_column,
1738 std::vector<std::vector<int>>& ring_sizes_column,
1739 std::vector<std::vector<int>>& poly_rings_column) {
1741 const auto col_type = col_ti.
get_type();
1744 auto coords_row_count = coords_column.size();
1746 for (
auto& coords : coords_column) {
1747 bool is_null_geo =
false;
1749 if (!col_ti.get_notnull()) {
1752 is_null_point =
true;
1755 is_null_geo = coords.empty();
1756 if (is_null_point) {
1761 is_null_geo =
false;
1764 std::vector<TDatum> td_coords_data;
1766 std::vector<uint8_t> compressed_coords =
1768 for (
auto const& cc : compressed_coords) {
1770 td_byte.val.int_val = cc;
1771 td_coords_data.push_back(td_byte);
1775 tdd_coords.val.arr_val = td_coords_data;
1776 tdd_coords.is_null = is_null_geo;
1777 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
1782 if (ring_sizes_column.size() != coords_row_count) {
1783 CHECK(
false) <<
"Geometry import columnar: ring sizes column size mismatch";
1787 for (
auto const& ring_sizes : ring_sizes_column) {
1788 bool is_null_geo =
false;
1789 if (!col_ti.get_notnull()) {
1791 is_null_geo = ring_sizes.empty();
1793 std::vector<TDatum> td_ring_sizes;
1794 for (
auto const& ring_size : ring_sizes) {
1795 TDatum td_ring_size;
1796 td_ring_size.val.int_val = ring_size;
1797 td_ring_sizes.push_back(td_ring_size);
1799 TDatum tdd_ring_sizes;
1800 tdd_ring_sizes.val.arr_val = td_ring_sizes;
1801 tdd_ring_sizes.is_null = is_null_geo;
1802 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1808 if (poly_rings_column.size() != coords_row_count) {
1809 CHECK(
false) <<
"Geometry import columnar: poly rings column size mismatch";
1813 for (
auto const& poly_rings : poly_rings_column) {
1814 bool is_null_geo =
false;
1815 if (!col_ti.get_notnull()) {
1817 is_null_geo = poly_rings.empty();
1819 std::vector<TDatum> td_poly_rings;
1820 for (
auto const& num_rings : poly_rings) {
1821 TDatum td_num_rings;
1822 td_num_rings.val.int_val = num_rings;
1823 td_poly_rings.push_back(td_num_rings);
1825 TDatum tdd_poly_rings;
1826 tdd_poly_rings.val.arr_val = td_poly_rings;
1827 tdd_poly_rings.is_null = is_null_geo;
1828 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1835 if (bounds_column.size() != coords_row_count) {
1836 CHECK(
false) <<
"Geometry import columnar: bounds column size mismatch";
1839 for (
auto const& bounds : bounds_column) {
1840 bool is_null_geo =
false;
1841 if (!col_ti.get_notnull()) {
1845 std::vector<TDatum> td_bounds_data;
1846 for (
auto const& b : bounds) {
1848 td_double.val.real_val = b;
1849 td_bounds_data.push_back(td_double);
1852 tdd_bounds.val.arr_val = td_bounds_data;
1853 tdd_bounds.is_null = is_null_geo;
1854 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
1863 const std::list<const ColumnDescriptor*>& col_descs) {
1867 int collection_col_idx = -1;
1869 std::string collection_col_name;
1871 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1872 auto const& cd = *cd_it;
1873 auto const col_type = cd->columnType.get_type();
1875 if (collection_col_idx >= 0) {
1876 throw std::runtime_error(
1877 "Explode Collections: Found more than one destination column");
1879 collection_col_idx = col_idx;
1880 collection_child_type = col_type;
1881 collection_col_name = cd->columnName;
1883 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1888 if (collection_col_idx < 0) {
1889 throw std::runtime_error(
1890 "Explode Collections: Failed to find a supported column type to explode "
1893 return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1897 OGRGeometry* ogr_geometry,
1899 const std::string& collection_col_name,
1900 size_t row_or_feature_idx,
1901 std::function<
void(OGRGeometry*)> execute_import_lambda) {
1902 auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1903 bool is_collection =
false;
1904 switch (collection_child_type) {
1906 switch (ogr_geometry_type) {
1908 is_collection =
true;
1913 throw std::runtime_error(
1914 "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1918 switch (ogr_geometry_type) {
1919 case wkbMultiLineString:
1920 is_collection =
true;
1925 throw std::runtime_error(
1926 "Explode Collections: Source geo type must be MULTILINESTRING or "
1931 switch (ogr_geometry_type) {
1932 case wkbMultiPolygon:
1933 is_collection =
true;
1938 throw std::runtime_error(
1939 "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1943 CHECK(
false) <<
"Unsupported geo child type " << collection_child_type;
1949 if (is_collection) {
1951 OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1952 CHECK(collection_geometry);
1954 #if LOG_EXPLODE_COLLECTIONS
1956 LOG(
INFO) <<
"Exploding row/feature " << row_or_feature_idx <<
" for column '"
1957 << explode_col_name <<
"' into " << collection_geometry->getNumGeometries()
1962 uint32_t child_geometry_count = 0;
1963 auto child_geometry_it = collection_geometry->begin();
1964 while (child_geometry_it != collection_geometry->end()) {
1966 OGRGeometry* import_geometry = *child_geometry_it;
1968 [&] { execute_import_lambda(import_geometry); });
1971 child_geometry_it++;
1972 child_geometry_count++;
1977 [&] { execute_import_lambda(ogr_geometry); });
1989 std::unique_ptr<
char[]> scratch_buffer,
1993 size_t first_row_index_this_buffer,
1995 Executor* executor) {
1997 int64_t total_get_row_time_us = 0;
1998 int64_t total_str_to_val_time_us = 0;
1999 auto query_session = session_info ? session_info->
get_session_id() :
"";
2000 CHECK(scratch_buffer);
2001 auto buffer = scratch_buffer.get();
2008 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2011 const char* thread_buf = buffer + begin_pos + begin;
2012 const char* thread_buf_end = buffer + end_pos;
2013 const char* buf_end = buffer + total_size;
2014 bool try_single_thread =
false;
2015 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2020 for (
const auto cd : col_descs) {
2021 const auto& col_ti = cd->columnType;
2022 phys_cols += col_ti.get_physical_cols();
2023 if (cd->columnType.get_type() ==
kPOINT ||
2028 auto num_cols = col_descs.size() - phys_cols;
2029 for (
const auto& p : import_buffers) {
2032 std::vector<std::string_view> row;
2033 size_t row_index_plus_one = 0;
2034 for (
const char* p = thread_buf; p < thread_buf_end; p++) {
2036 std::vector<std::unique_ptr<char[]>>
2038 row_index_plus_one++;
2051 total_get_row_time_us += us;
2064 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2066 LOG(
ERROR) <<
"Incorrect Row (expected " << num_cols <<
" columns, has "
2078 auto execute_import_row = [&](OGRGeometry* import_geometry) {
2079 size_t import_idx = 0;
2082 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2084 const auto& col_ti = cd->columnType;
2097 if (!cd->columnType.is_string() && row[import_idx].empty()) {
2100 if (!cd->columnType.is_string() && !copy_params.
trim_spaces) {
2102 row[import_idx] =
sv_strip(row[import_idx]);
2105 if (col_ti.get_physical_cols() == 0) {
2108 import_buffers[col_idx]->add_value(
2109 cd, row[import_idx], is_null, copy_params);
2118 import_buffers[col_idx]->add_value(
2119 cd, copy_params.
null_str,
true, copy_params);
2122 auto const& geo_string = row[import_idx];
2128 SQLTypes col_type = col_ti.get_type();
2131 std::vector<double> coords;
2132 std::vector<double> bounds;
2133 std::vector<int> ring_sizes;
2134 std::vector<int> poly_rings;
2140 geo_string.size() > 0 &&
2141 (geo_string[0] ==
'.' || isdigit(geo_string[0]) ||
2142 geo_string[0] ==
'-') &&
2143 geo_string.find_first_of(
"ABCDEFabcdef") == std::string::npos) {
2144 double lon = std::atof(std::string(geo_string).c_str());
2146 auto lat_str = row[import_idx];
2148 if (lat_str.size() > 0 &&
2149 (lat_str[0] ==
'.' || isdigit(lat_str[0]) || lat_str[0] ==
'-')) {
2150 lat = std::atof(std::string(lat_str).c_str());
2153 if (!copy_params.
lonlat) {
2164 import_ti.get_output_srid() == 4326) {
2168 import_ti.set_input_srid(srid0);
2172 throw std::runtime_error(
2173 "Cannot read lon/lat to insert into POINT/MULTIPOINT column " +
2181 import_ti.get_output_srid() == 4326) {
2185 import_ti.set_input_srid(srid0);
2189 if (col_ti.get_notnull()) {
2190 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2193 import_ti, coords, bounds, ring_sizes, poly_rings);
2195 if (import_geometry) {
2206 "Failed to extract valid geometry from exploded row " +
2208 row_index_plus_one) +
2209 " for column " + cd->columnName;
2210 throw std::runtime_error(msg);
2215 std::string(geo_string),
2222 std::string msg =
"Failed to extract valid geometry from row " +
2224 row_index_plus_one) +
2225 " for column " + cd->columnName;
2226 throw std::runtime_error(msg);
2232 throw std::runtime_error(
2233 "Imported geometry doesn't match the type of column " +
2250 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2259 "Table load was cancelled via Query Interrupt";
2263 }
catch (
const std::exception& e) {
2264 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2265 import_buffers[col_idx_to_pop]->pop_value();
2268 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
2271 LOG(
ERROR) <<
"Load was cancelled due to max reject rows being reached";
2274 "Load was cancelled due to max reject rows being reached";
2281 auto const [collection_col_idx, collection_child_type, collection_col_name] =
2284 CHECK_LT(collection_col_idx, (
int)row.size()) <<
"column index out of range";
2285 auto const& collection_geo_string = row[collection_col_idx];
2287 OGRGeometry* ogr_geometry =
nullptr;
2290 OGRGeometryFactory::destroyGeometry(ogr_geometry);
2297 collection_child_type,
2298 collection_col_name,
2299 first_row_index_this_buffer + row_index_plus_one,
2300 execute_import_row);
2304 [&] { execute_import_row(
nullptr); });
2311 total_str_to_val_time_us += us;
2321 LOG(
INFO) <<
"Thread" << std::this_thread::get_id() <<
":"
2323 << (double)ms / 1000.0 <<
"sec, Insert Time: " << (
double)load_ms / 1000.0
2324 <<
"sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2325 <<
"sec, str_to_val: " << (
double)total_str_to_val_time_us / 1000000.0
2326 <<
"sec" << std::endl;
2329 return thread_import_status;
2335 : std::runtime_error(
"Column '" + column_name +
"' is not a geo column") {}
2341 OGRCoordinateTransformation* coordinate_transformation,
2343 size_t firstFeature,
2352 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2353 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2355 auto query_session = session_info ? session_info->
get_session_id() :
"";
2356 for (
const auto& p : import_buffers) {
2363 for (
size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2365 if (!features[iFeature]) {
2372 OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2373 if (pGeometry && coordinate_transformation) {
2374 pGeometry->transform(coordinate_transformation);
2381 auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2387 thread_import_status.
load_msg =
"Table load was cancelled via Query Interrupt";
2391 uint32_t field_column_count{0u};
2392 uint32_t metadata_column_count{0u};
2394 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2398 const auto& col_ti = cd->columnType;
2399 if (col_ti.is_geometry()) {
2404 SQLTypes col_type = col_ti.get_type();
2407 import_buffers[col_idx]->add_value(
2408 cd, copy_params.
null_str,
true, copy_params);
2412 std::vector<double> coords;
2413 std::vector<double> bounds;
2414 std::vector<int> ring_sizes;
2415 std::vector<int> poly_rings;
2419 bool is_null_geo = !import_geometry;
2421 if (col_ti.get_notnull()) {
2422 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2425 import_ti, coords, bounds, ring_sizes, poly_rings);
2435 std::string msg =
"Failed to extract valid geometry from feature " +
2437 " for column " + cd->columnName;
2438 throw std::runtime_error(msg);
2443 throw std::runtime_error(
2444 "Imported geometry doesn't match the type of column " +
2451 auto cd_coords = *cd_it;
2452 std::vector<TDatum> td_coord_data;
2454 std::vector<uint8_t> compressed_coords =
2456 for (
auto cc : compressed_coords) {
2458 td_byte.val.int_val = cc;
2459 td_coord_data.push_back(td_byte);
2463 tdd_coords.val.arr_val = td_coord_data;
2464 tdd_coords.is_null = is_null_geo;
2465 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
2472 auto cd_ring_sizes = *cd_it;
2473 std::vector<TDatum> td_ring_sizes;
2475 for (
auto ring_size : ring_sizes) {
2476 TDatum td_ring_size;
2477 td_ring_size.val.int_val = ring_size;
2478 td_ring_sizes.push_back(td_ring_size);
2481 TDatum tdd_ring_sizes;
2482 tdd_ring_sizes.val.arr_val = td_ring_sizes;
2483 tdd_ring_sizes.is_null = is_null_geo;
2484 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
2491 auto cd_poly_rings = *cd_it;
2492 std::vector<TDatum> td_poly_rings;
2494 for (
auto num_rings : poly_rings) {
2495 TDatum td_num_rings;
2496 td_num_rings.val.int_val = num_rings;
2497 td_poly_rings.push_back(td_num_rings);
2500 TDatum tdd_poly_rings;
2501 tdd_poly_rings.val.arr_val = td_poly_rings;
2502 tdd_poly_rings.is_null = is_null_geo;
2503 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
2512 auto cd_bounds = *cd_it;
2513 std::vector<TDatum> td_bounds_data;
2515 for (
auto b : bounds) {
2517 td_double.val.real_val = b;
2518 td_bounds_data.push_back(td_double);
2522 tdd_bounds.val.arr_val = td_bounds_data;
2523 tdd_bounds.is_null = is_null_geo;
2524 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
2527 }
else if (field_column_count < fieldNameToIndexMap.size()) {
2531 auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2532 CHECK(cit != columnNameToSourceNameMap.end());
2533 auto const& field_name = cit->second;
2535 auto const fit = fieldNameToIndexMap.find(field_name);
2536 if (fit == fieldNameToIndexMap.end()) {
2540 auto const& field_index = fit->second;
2541 CHECK(field_index < fieldNameToIndexMap.size());
2543 auto const& feature = features[iFeature];
2545 auto field_defn = feature->GetFieldDefnRef(field_index);
2552 std::string value_string;
2553 int array_index = 0, array_size = 0;
2555 auto stringify_numeric_list = [&](
auto* values) {
2557 while (array_index < array_size) {
2558 auto separator = (array_index > 0) ?
"," :
"";
2562 value_string +=
"}";
2565 auto field_type = field_defn->GetType();
2566 switch (field_type) {
2575 value_string = feature->GetFieldAsString(field_index);
2577 case OFTIntegerList: {
2578 auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2579 stringify_numeric_list(values);
2581 case OFTInteger64List: {
2582 auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2583 stringify_numeric_list(values);
2586 auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2587 stringify_numeric_list(values);
2589 case OFTStringList: {
2590 auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2592 if (array_of_strings) {
2593 while (
auto* this_string = array_of_strings[array_index]) {
2594 auto separator = (array_index > 0) ?
"," :
"";
2595 value_string +=
separator + std::string(this_string);
2599 value_string +=
"}";
2602 throw std::runtime_error(
"Unsupported geo file field type (" +
2607 import_buffers[col_idx]->add_value(cd, value_string,
false, copy_params);
2609 field_column_count++;
2610 }
else if (metadata_column_count < metadata_column_infos.size()) {
2614 auto const& mci = metadata_column_infos[metadata_column_count];
2615 if (mci.column_descriptor.columnName != cd->columnName) {
2616 throw std::runtime_error(
"Metadata column name mismatch");
2618 import_buffers[col_idx]->add_value(cd, mci.value,
false, copy_params);
2620 metadata_column_count++;
2622 throw std::runtime_error(
"Column count mismatch");
2631 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Aborting import.";
2632 throw std::runtime_error(e.what());
2633 }
catch (
const std::exception& e) {
2634 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2635 import_buffers[col_idx_to_pop]->pop_value();
2638 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Row discarded.";
2644 auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2647 collection_child_type,
2648 collection_col_name,
2649 firstFeature + iFeature + 1,
2650 execute_import_feature);
2653 execute_import_feature(pGeometry);
2659 float load_s = 0.0f;
2667 LOG(
INFO) <<
"DEBUG: Process " << convert_s <<
"s";
2668 LOG(
INFO) <<
"DEBUG: Load " << load_s <<
"s";
2669 LOG(
INFO) <<
"DEBUG: Total " << (convert_s + load_s) <<
"s";
2674 return thread_import_status;
2678 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2681 return loadImpl(import_buffers, row_count,
false, session_info);
2684 bool Loader::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2687 return loadImpl(import_buffers, row_count,
true, session_info);
2694 const int8_t* values_buffer{
nullptr};
2695 if (ti.is_string()) {
2701 CHECK(values_buffer);
2702 const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2703 switch (logical_size) {
2705 return values_buffer[index];
2708 return reinterpret_cast<const int16_t*
>(values_buffer)[index];
2711 return reinterpret_cast<const int32_t*
>(values_buffer)[index];
2714 return reinterpret_cast<const int64_t*
>(values_buffer)[index];
2717 LOG(
FATAL) <<
"Unexpected size for shard key: " << logical_size;
2726 const auto values_buffer = import_buffer.
getAsBytes();
2727 return reinterpret_cast<const float*
>(may_alias_ptr(values_buffer))[index];
2733 const auto values_buffer = import_buffer.
getAsBytes();
2734 return reinterpret_cast<const double*
>(may_alias_ptr(values_buffer))[index];
2742 for (
size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2743 const auto& input_buffer = import_buffers[col_idx];
2744 const auto& col_ti = input_buffer->getTypeInfo();
2750 shard_output_buffers[col_idx]->addBoolean(
int_value_at(*input_buffer, row_index));
2753 shard_output_buffers[col_idx]->addTinyint(
int_value_at(*input_buffer, row_index));
2756 shard_output_buffers[col_idx]->addSmallint(
2760 shard_output_buffers[col_idx]->addInt(
int_value_at(*input_buffer, row_index));
2763 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2766 shard_output_buffers[col_idx]->addFloat(
float_value_at(*input_buffer, row_index));
2769 shard_output_buffers[col_idx]->addDouble(
2775 CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2776 shard_output_buffers[col_idx]->addString(
2777 (*input_buffer->getStringBuffer())[row_index]);
2783 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2787 CHECK(input_buffer->getStringArrayBuffer());
2788 CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2789 const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2790 shard_output_buffers[col_idx]->addStringArray(input_arr);
2792 shard_output_buffers[col_idx]->addArray(
2793 (*input_buffer->getArrayBuffer())[row_index]);
2802 CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2803 shard_output_buffers[col_idx]->addGeoString(
2804 (*input_buffer->getGeoStringBuffer())[row_index]);
2814 std::vector<OneShardBuffers>& all_shard_import_buffers,
2815 std::vector<size_t>& all_shard_row_counts,
2817 const size_t row_count,
2818 const size_t shard_count,
2825 shard_col_desc = col_desc;
2829 CHECK(shard_col_desc);
2832 const auto& shard_col_ti = shard_col_desc->columnType;
2833 CHECK(shard_col_ti.is_integer() ||
2834 (shard_col_ti.is_string() && shard_col_ti.get_compression() ==
kENCODING_DICT) ||
2835 shard_col_ti.is_time());
2836 if (shard_col_ti.is_string()) {
2837 const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2838 CHECK(payloads_ptr);
2839 shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2842 for (
size_t i = 0; i < row_count; ++i) {
2843 const size_t shard =
2845 auto& shard_output_buffers = all_shard_import_buffers[shard];
2847 ++all_shard_row_counts[shard];
2852 std::vector<OneShardBuffers>& all_shard_import_buffers,
2853 std::vector<size_t>& all_shard_row_counts,
2855 const size_t row_count,
2856 const size_t shard_count,
2859 CHECK(shard_tds.size() == shard_count);
2861 for (
size_t shard = 0; shard < shard_count; ++shard) {
2862 auto& shard_output_buffers = all_shard_import_buffers[shard];
2863 if (row_count != 0) {
2868 all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2873 std::vector<size_t>& all_shard_row_counts,
2875 const size_t row_count,
2876 const size_t shard_count,
2878 all_shard_row_counts.resize(shard_count);
2879 for (
size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2880 all_shard_import_buffers.emplace_back();
2881 for (
const auto& typed_import_buffer : import_buffers) {
2882 all_shard_import_buffers.back().emplace_back(
2884 typed_import_buffer->getStringDictionary()));
2890 all_shard_row_counts,
2897 all_shard_row_counts,
2906 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2915 std::vector<OneShardBuffers> all_shard_import_buffers;
2916 std::vector<size_t> all_shard_row_counts;
2919 all_shard_row_counts,
2922 shard_tables.size(),
2924 bool success =
true;
2925 for (
size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
2926 success = success &&
loadToShard(all_shard_import_buffers[shard_idx],
2927 all_shard_row_counts[shard_idx],
2928 shard_tables[shard_idx],
2938 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
2940 std::vector<std::pair<const size_t, std::future<int8_t*>>>
2941 encoded_data_block_ptrs_futures;
2943 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
2946 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
2949 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
2952 import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
2958 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
2965 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
2975 auto geo_payload_ptr =
import_buffers[buf_idx]->getGeoStringBuffer();
2992 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
2993 result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
2999 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3007 bool success =
false;
3010 }
catch (std::exception& e) {
3011 std::ostringstream oss;
3012 oss <<
"Exception when loading Table " << shard_table->
tableName <<
", issue was "
3023 for (
auto& buffer : import_buffers) {
3024 ins_data.
columnIds.push_back(buffer->getColumnDesc()->columnId);
3032 loader_lock.unlock();
3037 shard_table->
fragmenter->insertData(ins_data);
3039 shard_table->
fragmenter->insertDataNoCheckpoint(ins_data);
3041 }
catch (std::exception& e) {
3042 std::ostringstream oss;
3043 oss <<
"Fragmenter Insert Exception when processing Table "
3044 << shard_table->
tableName <<
" issue was " << e.what();
3056 std::vector<const TableDescriptor*> table_descs(1,
table_desc_);
3060 for (
auto table_desc : table_descs) {
3061 table_desc->fragmenter->dropColumns(columnIds);
3071 CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3074 dict_map_[cd->columnId] = dd->stringDict.get();
3087 const std::string& file_path,
3088 const bool decompressed,
3095 throw std::runtime_error(
"failed to open file '" + file_path +
3096 "': " + strerror(errno));
3102 line.reserve(1 * 1024 * 1024);
3103 auto end_time = std::chrono::steady_clock::now() +
3104 timeout * (boost::istarts_with(file_path,
"s3://") ? 3 : 1);
3110 if (n++ >= line.capacity()) {
3122 if (
line1.empty()) {
3124 }
else if (line ==
line1) {
3133 if (std::chrono::steady_clock::now() >
end_time) {
3140 }
catch (std::exception& e) {
3159 if (boost::filesystem::path(
file_path).extension() ==
".tsv") {
3166 const char* buf =
raw_data.c_str();
3167 const char* buf_end = buf +
raw_data.size();
3168 bool try_single_thread =
false;
3169 for (
const char* p = buf; p < buf_end; p++) {
3170 std::vector<std::string> row;
3171 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3182 if (try_single_thread) {
3186 if (try_single_thread) {
3189 for (
const char* p = buf; p < buf_end; p++) {
3190 std::vector<std::string> row;
3191 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3209 boost::lexical_cast<
T>(str);
3210 }
catch (
const boost::bad_lexical_cast& e) {
3218 if (try_cast<double>(str)) {
3223 if (try_cast<int16_t>(str)) {
3225 }
else if (try_cast<int32_t>(str)) {
3227 }
else if (try_cast<int64_t>(str)) {
3229 }
else if (try_cast<float>(str)) {
3235 if (type ==
kTEXT) {
3237 std::string str_upper_case = str;
3239 str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3242 if (str_upper_case.find(
"POINT") == 0) {
3245 }
else if (str_upper_case.find(
"MULTIPOINT") == 0) {
3247 }
else if (str_upper_case.find(
"LINESTRING") == 0) {
3250 }
else if (str_upper_case.find(
"MULTILINESTRING") == 0) {
3252 }
else if (str_upper_case.find(
"POLYGON") == 0) {
3255 }
else if (str_upper_case.find(
"MULTIPOLYGON") == 0) {
3257 }
else if (str_upper_case.find_first_not_of(
"0123456789ABCDEF") ==
3258 std::string::npos &&
3259 (str_upper_case.size() % 2) == 0) {
3261 if (str_upper_case.size() >= 10) {
3266 auto first_five_bytes = str_upper_case.substr(0, 10);
3267 if (first_five_bytes ==
"0000000001" || first_five_bytes ==
"0101000000") {
3269 }
else if (first_five_bytes ==
"0000000004" || first_five_bytes ==
"0104000000") {
3271 }
else if (first_five_bytes ==
"0000000002" || first_five_bytes ==
"0102000000") {
3273 }
else if (first_five_bytes ==
"0000000005" || first_five_bytes ==
"0105000000") {
3275 }
else if (first_five_bytes ==
"0000000003" || first_five_bytes ==
"0103000000") {
3277 }
else if (first_five_bytes ==
"0000000006" || first_five_bytes ==
"0106000000") {
3291 if (type ==
kTEXT) {
3306 std::vector<SQLTypes> types(row.size());
3307 for (
size_t i = 0; i < row.size(); i++) {
3314 static std::array<int, kSQLTYPE_LAST> typeorder;
3315 typeorder[
kCHAR] = 0;
3318 typeorder[
kINT] = 4;
3323 typeorder[
kTIME] = 9;
3324 typeorder[
kDATE] = 10;
3331 typeorder[
kTEXT] = 12;
3334 return typeorder[b] < typeorder[
a];
3365 const std::vector<std::vector<std::string>>& raw_rows,
3371 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3372 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3375 throw std::runtime_error(
"No rows found in: " +
3376 boost::filesystem::path(
file_path).stem().
string());
3379 size_t num_cols =
raw_rows.front().size();
3380 std::vector<SQLTypes> best_types(num_cols,
kCHAR);
3381 std::vector<size_t> non_null_col_counts(num_cols, 0);
3382 for (
auto row = row_begin; row != row_end; row++) {
3383 while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3384 best_types.push_back(
kCHAR);
3385 non_null_col_counts.push_back(0);
3387 for (
size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3389 if (row->at(col_idx) ==
"" || !row->at(col_idx).compare(copy_params.
null_str)) {
3393 non_null_col_counts[col_idx]++;
3395 best_types[col_idx] = t;
3398 if (std::chrono::steady_clock::now() >
end_time) {
3402 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3405 if (non_null_col_counts[col_idx] == 0) {
3406 best_types[col_idx] =
kTEXT;
3414 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3415 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3416 const std::vector<SQLTypes>& best_types) {
3418 throw std::runtime_error(
"No rows found in: " +
3419 boost::filesystem::path(
file_path).stem().
string());
3421 size_t num_cols = best_types.size();
3422 std::vector<EncodingType> best_encodes(num_cols,
kENCODING_NONE);
3423 std::vector<size_t> num_rows_per_col(num_cols, 1);
3424 std::vector<std::unordered_set<std::string>> count_set(num_cols);
3425 for (
auto row = row_begin; row != row_end; row++) {
3426 for (
size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3428 count_set[col_idx].insert(row->at(col_idx));
3429 num_rows_per_col[col_idx]++;
3433 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3436 static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3437 if (uniqueRatio < 0.75) {
3442 return best_encodes;
3450 const std::vector<SQLTypes>& tail_types) {
3451 if (head_types.size() != tail_types.size()) {
3455 for (
size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3456 if (head_types[col_idx] !=
kTEXT) {
3459 has_headers = has_headers || tail_types[col_idx] !=
kTEXT;
3465 #if defined(ENABLE_IMPORT_PARQUET)
3466 if (data_preview_.has_value()) {
3467 return data_preview_.value().sample_rows;
3473 std::vector<std::vector<std::string>> sample_rows(
raw_rows.begin() + offset,
3480 #if defined(ENABLE_IMPORT_PARQUET)
3481 if (data_preview_.has_value()) {
3482 return data_preview_.value().column_names;
3499 #if defined(ENABLE_IMPORT_PARQUET)
3500 if (data_preview_.has_value()) {
3501 return data_preview_.value().column_types;
3505 std::vector<SQLTypeInfo> types;
3514 void Importer::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3517 if (!
loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3525 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3530 loader->setTableEpochs(table_epochs);
3536 if (
loader->getTableDesc()->persistenceLevel ==
3543 if (!p->stringDictCheckpoint()) {
3544 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
3545 << p->getColumnDesc()->columnName <<
" failed.";
3554 LOG(
INFO) <<
"Dictionary Checkpointing took " << (double)ms / 1000.0 <<
" Seconds."
3566 std::vector<std::string> file_paths;
3580 for (
const auto&
file_path : file_paths) {
3596 #ifdef ENABLE_IMPORT_PARQUET
3597 import_parquet(file_paths, session_info);
3599 throw std::runtime_error(
"Parquet not supported!");
3609 #ifdef ENABLE_IMPORT_PARQUET
3612 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3615 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3633 const CopyParams& copy_params) {
3638 auto [foreign_server, user_mapping, foreign_table] =
3641 std::shared_ptr<arrow::fs::FileSystem> file_system;
3642 auto& server_options = foreign_server->options;
3647 file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3649 }
else if (server_options
3654 create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3660 auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3661 foreign_table.get(), file_system);
3672 #ifdef ENABLE_IMPORT_PARQUET
3674 !g_enable_legacy_parquet_import) {
3675 data_preview_ = get_parquet_data_preview(fp.string(), cp);
3684 #ifdef ENABLE_IMPORT_PARQUET
3686 std::shared_ptr<arrow::io::ReadableFile>& infile,
3687 std::unique_ptr<parquet::arrow::FileReader>& reader,
3688 std::shared_ptr<arrow::Table>& table) {
3689 using namespace parquet::arrow;
3690 auto file_result = arrow::io::ReadableFile::Open(file_path);
3691 PARQUET_THROW_NOT_OK(file_result.status());
3692 infile = file_result.ValueOrDie();
3694 PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3695 PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3696 const auto num_row_groups = reader->num_row_groups();
3697 const auto num_columns = table->num_columns();
3698 const auto num_rows = table->num_rows();
3699 LOG(
INFO) <<
"File " << file_path <<
" has " << num_rows <<
" rows and " << num_columns
3700 <<
" columns in " << num_row_groups <<
" groups.";
3701 return std::make_tuple(num_row_groups, num_columns, num_rows);
3704 void Detector::import_local_parquet(
const std::string& file_path,
3707 std::shared_ptr<arrow::io::ReadableFile> infile;
3708 std::unique_ptr<parquet::arrow::FileReader> reader;
3709 std::shared_ptr<arrow::Table> table;
3710 int num_row_groups, num_columns;
3712 std::tie(num_row_groups, num_columns, num_rows) =
3717 copy_params.line_delim =
'\n';
3718 copy_params.delimiter =
',';
3720 copy_params.quoted =
true;
3721 copy_params.quote =
'"';
3722 copy_params.escape =
'"';
3723 for (
int c = 0; c < num_columns; ++c) {
3727 raw_data += table->ColumnNames().at(c);
3729 raw_data += copy_params.line_delim;
3733 for (
int g = 0; g < num_row_groups; ++g) {
3735 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3736 std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3737 arrays.resize(num_columns);
3738 for (
int c = 0; c < num_columns; ++c) {
3739 PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3740 for (
auto chunk : arrays[c]->chunks()) {
3741 getters.push_back(
value_getter(*chunk,
nullptr,
nullptr));
3744 for (
int r = 0; r < num_rows; ++r) {
3745 for (
int c = 0; c < num_columns; ++c) {
3746 std::vector<std::string> buffer;
3747 for (
auto chunk : arrays[c]->chunks()) {
3748 DataBuffer<std::string> data(&cd, *chunk, buffer,
nullptr);
3752 if (!chunk->IsNull(r)) {
3754 raw_data += boost::replace_all_copy(
3755 (data << getters[c](*chunk, r)).buffer.front(),
"\"",
"\"\"");
3760 raw_data += copy_params.line_delim;
3772 template <
typename DATA_TYPE>
3774 std::vector<DATA_TYPE>& buffer,
3776 const auto old_size = buffer.size();
3778 for (
auto rit = bad_rows_tracker->
rows.crbegin(); rit != bad_rows_tracker->
rows.crend();
3780 buffer.erase(buffer.begin() + *rit);
3782 return std::make_tuple(old_size, buffer.size());
3786 BadRowsTracker*
const bad_rows_tracker) {
3821 throw std::runtime_error(
"Invalid Type");
3825 void Importer::import_local_parquet(
const std::string& file_path,
3827 std::shared_ptr<arrow::io::ReadableFile> infile;
3828 std::unique_ptr<parquet::arrow::FileReader> reader;
3829 std::shared_ptr<arrow::Table> table;
3830 int num_row_groups, num_columns;
3831 int64_t nrow_in_file;
3832 std::tie(num_row_groups, num_columns, nrow_in_file) =
3837 std::vector<const ColumnDescriptor*> cds;
3839 int num_physical_cols = 0;
3840 for (
auto& cd : column_list) {
3844 arrow_throw_if(num_columns != (
int)(column_list.size() - num_physical_cols),
3845 "Unmatched numbers of columns in parquet file " + file_path +
": " +
3848 " columns in table.");
3852 const int num_slices = std::max<decltype(max_threads)>(
max_threads, num_columns);
3855 size_t nrow_completed{0};
3858 auto get_physical_col_idx = [&cds](
const int logic_col_idx) ->
auto{
3859 int physical_col_idx = 0;
3860 for (
int i = 0; i < logic_col_idx; ++i) {
3861 physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3863 return physical_col_idx;
3866 auto query_session = session_info ? session_info->
get_session_id() :
"";
3868 for (
int row_group = 0; row_group < num_row_groups; ++row_group) {
3883 for (
int slice = 0; slice < num_slices; slice++) {
3885 for (
const auto cd : cds) {
3887 new TypedImportBuffer(cd,
loader->getStringDict(cd)));
3899 std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3900 for (
size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3901 auto& bad_rows_tracker = bad_rows_trackers[slice];
3903 bad_rows_tracker.row_group = slice;
3904 bad_rows_tracker.importer =
this;
3907 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3908 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3909 const auto cd = cds[physical_col_idx];
3910 std::shared_ptr<arrow::ChunkedArray> array;
3911 PARQUET_THROW_NOT_OK(
3912 reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3913 const size_t array_size = array->length();
3914 const size_t slice_size = (array_size + num_slices - 1) / num_slices;
3916 for (
int slice = 0; slice < num_slices; ++slice) {
3922 thread_controller.startThread([&, slice] {
3923 const auto slice_offset = slice % num_slices;
3925 std::min<size_t>((slice_offset + 0) * slice_size, array_size),
3926 std::min<size_t>((slice_offset + 1) * slice_size, array_size));
3927 auto& bad_rows_tracker = bad_rows_trackers[slice];
3930 import_buffer->col_idx = physical_col_idx + 1;
3931 for (
auto chunk : array->chunks()) {
3932 import_buffer->add_arrow_values(
3933 cd, *chunk,
false, slice_range, &bad_rows_tracker);
3937 thread_controller.finish();
3939 std::vector<size_t> nrow_in_slice_raw(num_slices);
3940 std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
3942 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3943 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3944 const auto cd = cds[physical_col_idx];
3945 for (
int slice = 0; slice < num_slices; ++slice) {
3946 auto& bad_rows_tracker = bad_rows_trackers[slice];
3948 std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
3953 for (
int slice = 0; slice < num_slices; ++slice) {
3955 nrow_in_slice_successfully_loaded[slice],
3959 const auto nrow_original =
3960 std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
3961 const auto nrow_imported =
3963 nrow_in_slice_successfully_loaded.end(),
3965 const auto nrow_dropped = nrow_original - nrow_imported;
3966 LOG(
INFO) <<
"row group " << row_group <<
": add " << nrow_imported
3967 <<
" rows, drop " << nrow_dropped <<
" rows.";
3975 ") rows rejected exceeded. Halting load.";
3976 LOG(
ERROR) <<
"Maximum (" << copy_params.max_reject
3977 <<
") rows rejected exceeded. Halting load.";
3982 nrow_completed += nrow_imported;
3984 nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
3986 const auto total_file_offset =
3989 if (total_file_offset) {
3995 << total_file_offset;
3999 LOG(
INFO) <<
"Import " << nrow_in_file <<
" rows of parquet file " << file_path
4000 <<
" took " << (double)ms_load_a_file / 1000.0 <<
" secs";
4003 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4005 auto importer =
dynamic_cast<Importer*
>(
this);
4006 auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4007 : std::vector<Catalog_Namespace::TableEpochInfo>{};
4009 std::exception_ptr teptr;
4012 for (
auto const& file_path : file_paths) {
4013 std::map<int, std::string> url_parts;
4017 std::vector<std::string> objkeys;
4018 std::unique_ptr<S3ParquetArchive> us3arch;
4019 if (
"s3" == url_parts[2]) {
4022 copy_params.s3_access_key,
4023 copy_params.s3_secret_key,
4024 copy_params.s3_session_token,
4025 copy_params.s3_region,
4026 copy_params.s3_endpoint,
4027 copy_params.plain_text,
4028 copy_params.regex_path_filter,
4029 copy_params.file_sort_order_by,
4030 copy_params.file_sort_regex));
4031 us3arch->init_for_read();
4033 objkeys = us3arch->get_objkeys();
4035 throw std::runtime_error(
"AWS S3 support not available");
4036 #endif // HAVE_AWS_S3
4038 objkeys.emplace_back(file_path);
4043 for (
auto const& objkey : objkeys) {
4047 ? us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this))
4049 import_local_parquet(file_path, session_info);
4051 us3arch->vacuum(objkey);
4055 us3arch->vacuum(objkey);
4067 std::rethrow_exception(teptr);
4074 }
catch (
const std::exception& e) {
4081 importer->checkpoint(table_epochs);
4084 #endif // ENABLE_IMPORT_PARQUET
4087 std::vector<std::string>& file_paths,
4097 _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4099 auto pipe_res = pipe(fd);
4102 throw std::runtime_error(std::string(
"failed to create a pipe: ") + strerror(errno));
4105 signal(SIGPIPE, SIG_IGN);
4108 std::exception_ptr teptr;
4112 auto th_pipe_reader = std::thread([&]() {
4115 if (0 == (
p_file = fdopen(fd[0],
"r"))) {
4116 throw std::runtime_error(std::string(
"failed to open a pipe: ") +
4126 teptr = std::current_exception();
4139 auto th_pipe_writer = std::thread([&]() {
4140 std::unique_ptr<S3Archive> us3arch;
4142 for (
size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4144 auto file_path = file_paths[fi];
4145 std::unique_ptr<Archive> uarch;
4146 std::map<int, std::string> url_parts;
4148 const std::string S3_objkey_url_scheme =
"s3ok";
4149 if (
"file" == url_parts[2] ||
"" == url_parts[2]) {
4151 }
else if (
"s3" == url_parts[2]) {
4156 copy_params.s3_access_key,
4157 copy_params.s3_secret_key,
4158 copy_params.s3_session_token,
4159 copy_params.s3_region,
4160 copy_params.s3_endpoint,
4161 copy_params.plain_text,
4162 copy_params.regex_path_filter,
4163 copy_params.file_sort_order_by,
4164 copy_params.file_sort_regex));
4165 us3arch->init_for_read();
4168 for (
const auto& objkey : us3arch->get_objkeys()) {
4169 file_paths.emplace_back(std::string(S3_objkey_url_scheme) +
"://" + objkey);
4173 throw std::runtime_error(
"AWS S3 support not available");
4174 #endif // HAVE_AWS_S3
4175 }
else if (S3_objkey_url_scheme == url_parts[2]) {
4177 auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4179 us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this));
4180 if (0 == file_path.size()) {
4181 throw std::runtime_error(std::string(
"failed to land s3 object: ") + objkey);
4185 us3arch->vacuum(objkey);
4187 throw std::runtime_error(
"AWS S3 support not available");
4188 #endif // HAVE_AWS_S3
4190 #if 0 // TODO(ppan): implement and enable any other archive class
4192 if (
"hdfs" == url_parts[2])
4193 uarch.reset(
new HdfsArchive(file_path));
4196 throw std::runtime_error(std::string(
"unsupported archive url: ") + file_path);
4200 auto& arch = *uarch;
4206 bool just_saw_archive_header;
4207 bool is_detecting =
nullptr !=
dynamic_cast<Detector*
>(
this);
4208 bool first_text_header_skipped =
false;
4212 size_t num_block_read = 0;
4213 while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4214 bool insert_line_delim_after_this_file =
false;
4217 auto ok = arch.read_data_block(&buf, &size, &offset);
4233 const char* buf2 = (
const char*)buf;
4236 just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4237 while (size2-- > 0) {
4238 if (*buf2++ == copy_params.line_delim) {
4243 LOG(
WARNING) <<
"No line delimiter in block." << std::endl;
4245 just_saw_archive_header =
false;
4246 first_text_header_skipped =
true;
4255 int nremaining = size2;
4256 while (nremaining > 0) {
4258 int nwritten =
write(fd[1], buf2, nremaining);
4262 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4265 }
else if (errno == EPIPE &&
4272 throw std::runtime_error(
4273 std::string(
"failed or interrupted write to pipe: ") +
4276 }
else if (nwritten == nremaining) {
4281 nremaining -= nwritten;
4292 const char* plast =
static_cast<const char*
>(buf) + (size - 1);
4293 insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4301 if (insert_line_delim_after_this_file) {
4304 int nwritten =
write(fd[1], ©_params.line_delim, 1);
4308 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4311 }
else if (errno == EPIPE &&
4318 throw std::runtime_error(
4319 std::string(
"failed or interrupted write to pipe: ") +
4322 }
else if (nwritten == 1) {
4338 if (
nullptr != dynamic_cast<Detector*>(
this)) {
4343 teptr = std::current_exception();
4352 th_pipe_reader.join();
4353 th_pipe_writer.join();
4357 std::rethrow_exception(teptr);
4366 const std::string& file_path,
4367 const bool decompressed,
4370 auto query_session = session_info ? session_info->
get_session_id() :
"";
4376 throw std::runtime_error(
"failed to open file '" + file_path +
4377 "': " + strerror(errno));
4380 if (!decompressed) {
4381 (void)fseek(
p_file, 0, SEEK_END);
4389 size_t alloc_size = copy_params.buffer_size;
4390 if (!decompressed &&
file_size < alloc_size) {
4396 for (
const auto cd :
loader->get_column_descs()) {
4398 std::make_unique<TypedImportBuffer>(cd,
loader->getStringDict(cd)));
4402 auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4403 size_t current_pos = 0;
4405 size_t begin_pos = 0;
4407 (void)fseek(
p_file, current_pos, SEEK_SET);
4409 fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size,
p_file);
4412 loader->getTableDesc()->tableId};
4413 auto table_epochs =
loader->getTableEpochs();
4416 std::list<std::future<ImportStatus>> threads;
4420 std::stack<size_t> stack_thread_ids;
4422 stack_thread_ids.push(i);
4425 size_t first_row_index_this_buffer = 0;
4428 unsigned int num_rows_this_buffer = 0;
4429 CHECK(scratch_buffer);
4434 first_row_index_this_buffer,
4435 num_rows_this_buffer,
4439 int nresidual = size - end_pos;
4440 std::unique_ptr<char[]> unbuf;
4441 if (nresidual > 0) {
4442 unbuf = std::make_unique<char[]>(nresidual);
4443 memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4447 auto thread_id = stack_thread_ids.top();
4448 stack_thread_ids.pop();
4455 std::move(scratch_buffer),
4459 first_row_index_this_buffer,
4463 first_row_index_this_buffer += num_rows_this_buffer;
4465 current_pos += end_pos;
4466 scratch_buffer = std::make_unique<char[]>(alloc_size);
4467 CHECK(scratch_buffer);
4468 memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4470 fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual,
p_file);
4473 while (threads.size() > 0) {
4475 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4476 it != threads.end();) {
4478 std::chrono::milliseconds span(0);
4479 if (p.wait_for(span) == std::future_status::ready) {
4480 auto ret_import_status = p.get();
4484 if (ret_import_status.load_failed) {
4489 size_t total_file_offset{0};
4493 total_file_offset += file_offset;
4497 if (decompressed ? total_file_offset : current_pos) {
4507 << total_file_offset;
4510 stack_thread_ids.push(ret_import_status.thread_id);
4511 threads.erase(it++);
4519 std::this_thread::yield();
4542 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";
4552 for (
auto& p : threads) {
4578 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4584 const std::string& file_name,
4593 throw std::runtime_error(
"Unexpected CopyParams.source_type (" +
4604 const std::string& file_name) {
4606 OGRLayer* poLayer =
nullptr;
4607 if (geo_layer_name.size()) {
4608 poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4609 if (poLayer ==
nullptr) {
4610 throw std::runtime_error(
"Layer '" + geo_layer_name +
"' not found in " +
4614 poLayer = poDS->GetLayer(0);
4615 if (poLayer ==
nullptr) {
4616 throw std::runtime_error(
"No layers found in " + file_name);
4626 const std::string& file_name,
4627 const std::string& geo_column_name,
4628 std::map<std::string, std::vector<std::string>>& sample_data,
4633 if (datasource ==
nullptr) {
4634 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4641 auto const* feature_defn = layer.GetLayerDefn();
4642 CHECK(feature_defn);
4645 auto const metadata_column_infos =
4650 auto const feature_count =
static_cast<uint64_t
>(layer.GetFeatureCount());
4651 auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4654 for (
int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4655 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4657 sample_data[column_name] = {};
4659 sample_data[geo_column_name] = {};
4660 for (
auto const& mci : metadata_column_infos) {
4661 sample_data[mci.column_descriptor.columnName] = {};
4665 layer.ResetReading();
4668 uint64_t feature_index{0u};
4669 while (feature_index < num_features) {
4677 auto const* geometry = feature->GetGeometryRef();
4678 if (geometry ==
nullptr) {
4683 switch (wkbFlatten(geometry->getGeometryType())) {
4687 case wkbMultiLineString:
4689 case wkbMultiPolygon:
4692 throw std::runtime_error(
"Unsupported geometry type: " +
4693 std::string(geometry->getGeometryName()));
4697 for (
int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4698 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4699 sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4703 for (
auto const& mci : metadata_column_infos) {
4704 sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4708 char* wkts =
nullptr;
4709 geometry->exportToWkt(&wkts);
4711 sample_data[geo_column_name].push_back(wkts);
4724 return std::make_pair(
kINT,
false);
4725 case OFTIntegerList:
4726 return std::make_pair(
kINT,
true);
4727 #if GDAL_VERSION_MAJOR > 1
4729 return std::make_pair(
kBIGINT,
false);
4730 case OFTInteger64List:
4731 return std::make_pair(
kBIGINT,
true);
4734 return std::make_pair(
kDOUBLE,
false);
4736 return std::make_pair(
kDOUBLE,
true);
4738 return std::make_pair(
kTEXT,
false);
4740 return std::make_pair(
kTEXT,
true);
4742 return std::make_pair(
kDATE,
false);
4744 return std::make_pair(
kTIME,
false);
4752 return std::make_pair(
kTINYINT,
true);
4756 throw std::runtime_error(
"Unknown OGR field type: " +
std::to_string(ogr_type));
4767 case wkbMultiLineString:
4771 case wkbMultiPolygon:
4776 throw std::runtime_error(
"Unknown OGR geom type: " +
std::to_string(ogr_type));
4781 switch (raster_point_type) {
4803 switch (raster_point_transform) {
4821 const std::string& file_name,
4822 const bool is_raster,
4823 const std::string& geo_column_name,
4833 const std::string& file_name,
4834 const std::string& geo_column_name,
4845 auto metadata_column_infos =
4858 metadata_column_infos);
4861 std::list<ColumnDescriptor> cds;
4867 for (
auto const& [col_name, sql_type] : point_names_and_sql_types) {
4872 if (sql_type ==
kPOINT) {
4886 for (
auto const& [band_name, sql_type] : band_names_and_types) {
4895 for (
auto& mci : metadata_column_infos) {
4896 cds.push_back(std::move(mci.column_descriptor));
4905 const std::string& file_name,
4906 const std::string& geo_column_name,
4908 std::list<ColumnDescriptor> cds;
4911 if (poDS ==
nullptr) {
4912 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4915 if (poDS->GetLayerCount() == 0) {
4916 throw std::runtime_error(
"gdalToColumnDescriptors Error: Geo file " + file_name +
4923 layer.ResetReading();
4926 if (poFeature ==
nullptr) {
4927 throw std::runtime_error(
"No features found in " + file_name);
4930 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
4933 for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
4934 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
4935 auto typePair =
ogr_to_type(poFieldDefn->GetType());
4938 cd.sourceName = poFieldDefn->GetNameRef();
4940 if (typePair.second) {
4946 if (typePair.first ==
kTEXT) {
4955 auto ogr_type = wkbFlatten(layer.GetGeomType());
4956 if (ogr_type == wkbUnknown) {
4958 auto const* ogr_geometry = poFeature->GetGeometryRef();
4960 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
4964 if (ogr_type != wkbNone) {
4971 if (ogr_type == wkbMultiPolygon) {
4972 ogr_type = wkbPolygon;
4973 }
else if (ogr_type == wkbMultiLineString) {
4974 ogr_type = wkbLineString;
4975 }
else if (ogr_type == wkbMultiPoint) {
4976 ogr_type = wkbPoint;
5009 auto metadata_column_infos =
5011 for (
auto& mci : metadata_column_infos) {
5012 cds.push_back(std::move(mci.column_descriptor));
5029 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5033 VSICurlClearCache();
5038 int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5044 if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5046 }
else if (VSI_ISREG(sb.st_mode)) {
5064 std::vector<std::string>& files) {
5066 std::vector<std::string> subdirectories;
5069 char** entries = VSIReadDir(archive_path.c_str());
5071 LOG(
WARNING) <<
"Failed to get file listing at archive: " << archive_path;
5078 ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5084 char* entry_c = entries[index++];
5088 std::string entry(entry_c);
5091 if (entry ==
"." || entry ==
"..") {
5096 std::string entry_path = archive_path + std::string(
"/") + entry;
5100 int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5105 if (VSI_ISDIR(sb.st_mode)) {
5109 if (boost::iends_with(entry_path,
".gdb")) {
5111 files.push_back(entry_path);
5114 subdirectories.push_back(entry_path);
5118 files.push_back(entry_path);
5124 for (
const auto& subdirectory : subdirectories) {
5131 const std::string& archive_path,
5142 std::vector<std::string> files;
5148 for (
auto& file : files) {
5149 file.erase(0, archive_path.size() + 1);
5158 const std::string& file_name,
5169 std::vector<GeoFileLayerInfo> layer_info;
5173 if (poDS ==
nullptr) {
5174 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5179 for (
auto&& poLayer : poDS->GetLayers()) {
5182 poLayer->ResetReading();
5184 if (poLayer->GetFeatureCount() > 0) {
5186 auto ogr_type = wkbFlatten(poLayer->GetGeomType());
5187 if (ogr_type == wkbUnknown) {
5190 CHECK(first_feature);
5191 auto const* ogr_geometry = first_feature->GetGeometryRef();
5193 ogr_type = wkbFlatten(ogr_geometry->getGeometryType());
5206 case wkbMultiLineString:
5208 case wkbMultiPolygon:
5219 layer_info.emplace_back(poLayer->GetName(), contents);
5229 const bool is_raster) {
5233 return importGDALGeo(columnNameToSourceNameMap, session_info);
5242 if (poDS ==
nullptr) {
5243 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5251 size_t numFeatures = layer.GetFeatureCount();
5256 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5258 size_t numFields = poFDefn->GetFieldCount();
5259 for (
size_t iField = 0; iField < numFields; iField++) {
5260 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5261 fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5266 poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5268 #if GDAL_VERSION_MAJOR >= 3
5272 poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5275 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5285 auto const metadata_column_infos =
5292 auto query_session = session_info ? session_info->
get_session_id() :
"";
5293 auto query_submitted_time =
::toString(std::chrono::system_clock::now());
5295 auto is_session_already_registered =
false;
5298 executor->getSessionLock());
5299 is_session_already_registered =
5300 executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5303 !is_session_already_registered) {
5304 executor->enrollQuerySession(query_session,
5306 query_submitted_time,
5308 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5310 ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5313 executor->clearQuerySessionStatus(query_session, query_submitted_time);
5321 for (
const auto cd :
loader->get_column_descs()) {
5327 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5329 std::list<std::future<ImportStatus>> threads;
5333 std::stack<size_t> stack_thread_ids;
5335 stack_thread_ids.push(i);
5340 auto table_epochs =
loader->getTableEpochs();
5343 layer.ResetReading();
5345 static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5348 std::vector<FeaturePtrVector> features(max_threads);
5351 std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5355 size_t firstFeatureThisChunk = 0;
5356 while (firstFeatureThisChunk < numFeatures) {
5358 size_t numFeaturesThisChunk =
5359 std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5362 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5365 auto thread_id = stack_thread_ids.top();
5366 stack_thread_ids.pop();
5367 CHECK(thread_id < max_threads);
5371 for (
size_t i = 0; i < numFeaturesThisChunk; i++) {
5372 features[
thread_id].emplace_back(layer.GetNextFeature());
5377 if (coordinate_transformations[thread_id] ==
nullptr) {
5378 for (
auto const& feature : features[thread_id]) {
5379 auto const* geometry = feature->GetGeometryRef();
5381 auto const* geometry_sr = geometry->getSpatialReference();
5385 #
if GDAL_VERSION_MAJOR >= 3
5386 !geometry_sr->IsEmpty() &&
5388 !geometry_sr->IsSame(poGeographicSR.get())) {
5390 if (geometry_sr->Validate() != OGRERR_NONE) {
5391 throw std::runtime_error(
"Incoming geo has invalid Spatial Reference");
5395 coordinate_transformations[
thread_id].reset(
5396 OGRCreateCoordinateTransformation(geometry_sr, poGeographicSR.get()));
5397 if (coordinate_transformations[thread_id] ==
nullptr) {
5398 throw std::runtime_error(
5399 "Failed to create a GDAL CoordinateTransformation for incoming geo");
5408 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5410 auto ret_import_status =
5413 coordinate_transformations[thread_id].
get(),
5414 std::move(features[thread_id]),
5415 firstFeatureThisChunk,
5416 numFeaturesThisChunk,
5417 fieldNameToIndexMap,
5418 columnNameToSourceNameMap,
5421 metadata_column_infos);
5422 import_status += ret_import_status;
5423 import_status.
rows_estimated = ((float)firstFeatureThisChunk / (
float)numFeatures) *
5424 import_status.rows_completed;
5432 coordinate_transformations[thread_id].
get(),
5433 std::move(features[thread_id]),
5434 firstFeatureThisChunk,
5435 numFeaturesThisChunk,
5436 fieldNameToIndexMap,
5437 columnNameToSourceNameMap,
5440 metadata_column_infos));
5443 while (threads.size() > 0) {
5445 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
5446 it != threads.end();) {
5448 std::chrono::milliseconds span(
5450 if (p.wait_for(span) == std::future_status::ready) {
5451 auto ret_import_status = p.get();
5456 ((float)firstFeatureThisChunk / (
float)numFeatures) *
5464 stack_thread_ids.push(ret_import_status.thread_id);
5466 threads.erase(it++);
5474 std::this_thread::yield();
5492 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";