25 #include <arrow/api.h>
26 #include <arrow/filesystem/localfs.h>
27 #include <arrow/io/api.h>
29 #include <ogrsf_frmts.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/dynamic_bitset.hpp>
32 #include <boost/filesystem.hpp>
33 #include <boost/geometry.hpp>
34 #include <boost/variant.hpp>
49 #include <unordered_map>
50 #include <unordered_set>
58 #ifdef ENABLE_IMPORT_PARQUET
61 #if defined(ENABLE_IMPORT_PARQUET)
65 #ifdef ENABLE_IMPORT_PARQUET
94 #include "gen-cpp/Heavy.h"
101 #define TIMER_STOP(t) \
102 (float(timer_stop<std::chrono::steady_clock::time_point, std::chrono::microseconds>( \
117 boost::filesystem::path boost_file_path{file_path};
118 boost::system::error_code ec;
120 return ec ? 0 : filesize;
128 executor->getSessionLock());
129 return executor->checkIsQuerySessionInterrupted(query_session, session_read_lock);
135 if (copy_params_threads > 0) {
136 return static_cast<size_t>(copy_params_threads);
138 return std::min(static_cast<size_t>(std::thread::hardware_concurrency()),
147 formatting_ostream& operator<<(formatting_ostream& out, std::vector<std::string>& row) {
149 for (
size_t i = 0; i < row.size(); ++i) {
150 out << (i ?
", " :
"") << row[i];
158 namespace import_export {
163 std::map<int, std::shared_ptr<RenderGroupAnalyzer>>;
166 #define DEBUG_TIMING false
167 #define DEBUG_RENDER_GROUP_ANALYZER 0
168 #define DEBUG_AWS_AUTHENTICATION 0
170 #define DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT 0
179 const std::string&
f,
193 auto is_array = std::unique_ptr<bool[]>(
new bool[
loader->get_column_descs().size()]);
195 bool has_array =
false;
197 int skip_physical_cols = 0;
198 for (
auto& p :
loader->get_column_descs()) {
200 if (skip_physical_cols-- > 0) {
205 if (p->isVirtualCol || p->isDeletedCol) {
208 skip_physical_cols = p->columnType.get_physical_cols();
209 if (p->columnType.get_type() ==
kARRAY) {
210 is_array.get()[i] =
true;
213 is_array.get()[i] =
false;
218 is_array_a = std::unique_ptr<bool[]>(is_array.release());
220 is_array_a = std::unique_ptr<bool[]>(
nullptr);
228 if (
buffer[0] !=
nullptr) {
231 if (
buffer[1] !=
nullptr) {
243 is.
end = std::chrono::steady_clock::now();
244 is.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(is.
end - is.
start);
251 while (i < j && (field[i] ==
' ' || field[i] ==
'\r')) {
254 while (i < j && (field[j - 1] ==
' ' || field[j - 1] ==
'\r')) {
257 return std::string(field + i, j - i);
308 throw std::runtime_error(
"Internal error: geometry type in NullDatum.");
310 throw std::runtime_error(
"Internal error: invalid type in NullDatum.");
349 throw std::runtime_error(
"Internal error: geometry type in NullArrayDatum.");
351 throw std::runtime_error(
"Internal error: invalid type in NullArrayDatum.");
360 if (s == copy_params.
null_str || s ==
"NULL" || s.empty()) {
367 std::vector<std::string> elem_strs;
369 for (
size_t i = s.find(copy_params.
array_delim, 1); i != std::string::npos;
371 elem_strs.push_back(s.substr(last, i - last));
374 if (last + 1 <= s.size()) {
375 elem_strs.push_back(s.substr(last, s.size() - 1 - last));
377 if (elem_strs.size() == 1) {
378 auto str = elem_strs.front();
379 auto str_trimmed =
trim_space(str.c_str(), str.length());
380 if (str_trimmed ==
"") {
385 size_t len = elem_strs.size() * elem_ti.
get_size();
386 std::unique_ptr<int8_t, FreeDeleter> buf(
388 int8_t* p = buf.get();
389 for (
auto& es : elem_strs) {
396 if (!isdigit(e[0]) && e[0] !=
'-') {
424 while ((p - buf) < len) {
428 CHECK((p - buf) == len);
445 const size_t len = compressed_null_coords.size();
447 memcpy(buf, compressed_null_coords.data(), len);
450 auto modified_ti = coords_ti;
456 const auto& arr = datum.val.arr_val;
457 for (
const auto& elem_datum : arr) {
458 string_vec.push_back(elem_datum.val.str_val);
500 throw std::runtime_error(
"Internal error: geometry type in TDatumToDatum.");
502 throw std::runtime_error(
"Internal error: invalid type in TDatumToDatum.");
516 size_t len = datum.val.arr_val.size() * elem_ti.
get_size();
519 for (
auto& e : datum.val.arr_val) {
529 std::vector<std::string_view> string_view_vec;
530 string_view_vec.reserve(string_vec.size());
531 for (
const auto& str : string_vec) {
533 std::ostringstream oss;
535 <<
" a string was detected too long for encoding, string length = "
536 << str.size() <<
", first 100 characters are '" << str.substr(0, 100) <<
"'";
537 throw std::runtime_error(oss.str());
539 string_view_vec.push_back(str);
558 }
catch (std::exception& e) {
559 std::ostringstream oss;
561 <<
" : " << e.what();
563 throw std::runtime_error(oss.str());
568 const std::string_view val,
571 const bool check_not_null) {
577 throw std::runtime_error(
"NULL for column " + cd->
columnName);
588 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
594 throw std::runtime_error(
"NULL for column " + cd->
columnName);
601 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
607 throw std::runtime_error(
"NULL for column " + cd->
columnName);
614 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
620 throw std::runtime_error(
"NULL for column " + cd->
columnName);
627 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
633 throw std::runtime_error(
"NULL for column " + cd->
columnName);
649 throw std::runtime_error(
"NULL for column " + cd->
columnName);
656 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
657 addFloat(static_cast<float>(std::atof(std::string(val).c_str())));
660 throw std::runtime_error(
"NULL for column " + cd->
columnName);
666 if (!is_null && (val[0] ==
'.' || isdigit(val[0]) || val[0] ==
'-')) {
667 addDouble(std::atof(std::string(val).c_str()));
670 throw std::runtime_error(
"NULL for column " + cd->
columnName);
681 throw std::runtime_error(
"NULL for column " + cd->
columnName);
686 throw std::runtime_error(
"String too long for column " + cd->
columnName +
697 if (!is_null && (isdigit(val[0]) || val[0] ==
'-')) {
703 throw std::runtime_error(
"NULL for column " + cd->
columnName);
710 throw std::runtime_error(
"NULL for column " + cd->
columnName);
714 std::vector<std::string> string_vec;
717 std::string(val), copy_params, string_vec);
721 size_t expected_size = ti.
get_size() / sti.get_size();
722 size_t actual_size = string_vec.size();
723 if (actual_size != expected_size) {
724 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
726 " values, received " +
741 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
742 " has incorrect length: " + std::string(val));
759 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
813 CHECK(
false) <<
"TypedImportBuffer::pop_value() does not support type " <<
type;
818 using std::runtime_error::runtime_error;
822 template <
typename DATA_TYPE>
826 std::vector<DATA_TYPE>& buffer,
830 std::make_unique<DataBuffer<DATA_TYPE>>(cd, array, buffer, bad_rows_tracker);
831 auto f_value_getter =
value_getter(array, cd, bad_rows_tracker);
832 std::function<void(const int64_t)> f_add_geo_phy_cols = [&](
const int64_t row) {};
833 if (bad_rows_tracker && cd->columnType.is_geometry()) {
834 f_add_geo_phy_cols = [&](
const int64_t row) {
836 std::vector<double> coords, bounds;
837 std::vector<int> ring_sizes, poly_rings;
838 int render_group = 0;
845 if (array.IsNull(row)) {
847 import_ti, coords, bounds, ring_sizes, poly_rings,
false);
849 arrow_throw_if<GeoImportException>(
858 arrow_throw_if<GeoImportException>(
859 cd->columnType.get_type() != ti.
get_type(),
860 error_context(cd, bad_rows_tracker) +
"Geometry type mismatch");
862 auto col_idx_workpad =
col_idx;
875 }
catch (std::runtime_error& e) {
877 }
catch (
const std::exception& e) {
884 auto f_mark_a_bad_row = [&](
const auto row) {
885 std::unique_lock<std::mutex> lck(bad_rows_tracker->
mutex);
886 bad_rows_tracker->
rows.insert(row - slice_range.first);
888 buffer.reserve(slice_range.second - slice_range.first);
889 for (
size_t row = slice_range.first; row < slice_range.second; ++row) {
891 *data << (array.IsNull(row) ?
nullptr : f_value_getter(array, row));
892 f_add_geo_phy_cols(row);
894 f_mark_a_bad_row(row);
897 if (bad_rows_tracker) {
899 f_mark_a_bad_row(row);
905 return buffer.size();
910 const bool exact_type_match,
921 if (exact_type_match) {
922 arrow_throw_if(col.type_id() != Type::BOOL,
"Expected boolean type");
927 if (exact_type_match) {
928 arrow_throw_if(col.type_id() != Type::INT8,
"Expected int8 type");
933 if (exact_type_match) {
934 arrow_throw_if(col.type_id() != Type::INT16,
"Expected int16 type");
939 if (exact_type_match) {
940 arrow_throw_if(col.type_id() != Type::INT32,
"Expected int32 type");
943 cd, col, *
int_buffer_, slice_range, bad_rows_tracker);
947 if (exact_type_match) {
948 arrow_throw_if(col.type_id() != Type::INT64,
"Expected int64 type");
953 if (exact_type_match) {
954 arrow_throw_if(col.type_id() != Type::FLOAT,
"Expected float type");
959 if (exact_type_match) {
960 arrow_throw_if(col.type_id() != Type::DOUBLE,
"Expected double type");
967 if (exact_type_match) {
969 "Expected string type");
974 if (exact_type_match) {
975 arrow_throw_if(col.type_id() != Type::TIME32 && col.type_id() != Type::TIME64,
976 "Expected time32 or time64 type");
981 if (exact_type_match) {
982 arrow_throw_if(col.type_id() != Type::TIMESTAMP,
"Expected timestamp type");
987 if (exact_type_match) {
988 arrow_throw_if(col.type_id() != Type::DATE32 && col.type_id() != Type::DATE64,
989 "Expected date32 or date64 type");
998 "Expected string type");
1002 throw std::runtime_error(
"Arrow array appends not yet supported");
1004 throw std::runtime_error(
"Invalid Type");
1010 size_t dataSize = 0;
1013 if (std::any_of(col.nulls.begin(), col.nulls.end(), [](
int i) {
return i != 0; })) {
1014 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1020 dataSize = col.data.int_col.size();
1022 for (
size_t i = 0; i < dataSize; i++) {
1032 dataSize = col.data.int_col.size();
1034 for (
size_t i = 0; i < dataSize; i++) {
1044 dataSize = col.data.int_col.size();
1046 for (
size_t i = 0; i < dataSize; i++) {
1056 dataSize = col.data.int_col.size();
1058 for (
size_t i = 0; i < dataSize; i++) {
1062 int_buffer_->push_back((int32_t)col.data.int_col[i]);
1070 dataSize = col.data.int_col.size();
1072 for (
size_t i = 0; i < dataSize; i++) {
1082 dataSize = col.data.real_col.size();
1084 for (
size_t i = 0; i < dataSize; i++) {
1094 dataSize = col.data.real_col.size();
1096 for (
size_t i = 0; i < dataSize; i++) {
1109 dataSize = col.data.str_col.size();
1111 for (
size_t i = 0; i < dataSize; i++) {
1123 dataSize = col.data.int_col.size();
1125 for (
size_t i = 0; i < dataSize; i++) {
1129 bigint_buffer_->push_back(static_cast<int64_t>(col.data.int_col[i]));
1138 dataSize = col.data.str_col.size();
1140 for (
size_t i = 0; i < dataSize; i++) {
1151 dataSize = col.data.arr_col.size();
1153 for (
size_t i = 0; i < dataSize; i++) {
1155 if (!col.nulls[i]) {
1156 size_t stringArrSize = col.data.arr_col[i].data.str_col.size();
1157 for (
size_t str_idx = 0; str_idx != stringArrSize; ++str_idx) {
1158 string_vec->push_back(col.data.arr_col[i].data.str_col[str_idx]);
1166 for (
size_t i = 0; i < dataSize; i++) {
1170 size_t len = col.data.arr_col[i].data.int_col.size();
1171 size_t byteSize = len *
sizeof(int8_t);
1174 for (
size_t j = 0; j < len; ++j) {
1178 if (col.data.arr_col[i].nulls[j]) {
1179 *p =
static_cast<int8_t
>(
1182 *(
bool*)p = static_cast<bool>(col.data.arr_col[i].data.int_col[j]);
1192 for (
size_t i = 0; i < dataSize; i++) {
1196 size_t len = col.data.arr_col[i].data.int_col.size();
1197 size_t byteSize = len *
sizeof(int8_t);
1200 for (
size_t j = 0; j < len; ++j) {
1201 *(int8_t*)p = static_cast<int8_t>(col.data.arr_col[i].data.int_col[j]);
1202 p +=
sizeof(int8_t);
1210 for (
size_t i = 0; i < dataSize; i++) {
1214 size_t len = col.data.arr_col[i].data.int_col.size();
1215 size_t byteSize = len *
sizeof(int16_t);
1218 for (
size_t j = 0; j < len; ++j) {
1220 static_cast<int16_t>(col.data.arr_col[i].data.int_col[j]);
1221 p +=
sizeof(int16_t);
1229 for (
size_t i = 0; i < dataSize; i++) {
1233 size_t len = col.data.arr_col[i].data.int_col.size();
1234 size_t byteSize = len *
sizeof(int32_t);
1237 for (
size_t j = 0; j < len; ++j) {
1239 static_cast<int32_t>(col.data.arr_col[i].data.int_col[j]);
1240 p +=
sizeof(int32_t);
1250 for (
size_t i = 0; i < dataSize; i++) {
1254 size_t len = col.data.arr_col[i].data.int_col.size();
1255 size_t byteSize = len *
sizeof(int64_t);
1258 for (
size_t j = 0; j < len; ++j) {
1260 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1261 p +=
sizeof(int64_t);
1269 for (
size_t i = 0; i < dataSize; i++) {
1273 size_t len = col.data.arr_col[i].data.real_col.size();
1274 size_t byteSize = len *
sizeof(float);
1277 for (
size_t j = 0; j < len; ++j) {
1278 *(
float*)p = static_cast<float>(col.data.arr_col[i].data.real_col[j]);
1287 for (
size_t i = 0; i < dataSize; i++) {
1291 size_t len = col.data.arr_col[i].data.real_col.size();
1292 size_t byteSize = len *
sizeof(double);
1295 for (
size_t j = 0; j < len; ++j) {
1296 *(
double*)p = static_cast<double>(col.data.arr_col[i].data.real_col[j]);
1297 p +=
sizeof(double);
1307 for (
size_t i = 0; i < dataSize; i++) {
1311 size_t len = col.data.arr_col[i].data.int_col.size();
1312 size_t byteWidth =
sizeof(int64_t);
1313 size_t byteSize = len * byteWidth;
1316 for (
size_t j = 0; j < len; ++j) {
1317 *
reinterpret_cast<int64_t*
>(p) =
1318 static_cast<int64_t>(col.data.arr_col[i].data.int_col[j]);
1319 p +=
sizeof(int64_t);
1327 throw std::runtime_error(
"Invalid Array Type");
1333 throw std::runtime_error(
"Invalid Type");
1339 const TDatum& datum,
1347 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1360 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1370 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1377 addInt((int32_t)datum.val.int_val);
1380 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1390 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1397 addFloat((
float)datum.val.real_val);
1400 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1410 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1421 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1436 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1444 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1463 throw std::runtime_error(
"NULL for column " + cd->
columnName);
1471 CHECK(
false) <<
"TypedImportBuffer::add_value() does not support type " <<
type;
1538 static_cast<float>(std::atof(std::string(val).c_str())));
1545 double_buffer_->resize(num_rows, std::atof(std::string(val).c_str()));
1557 throw std::runtime_error(
"String too long for column " + cd->
columnName +
1576 std::vector<std::string> string_vec;
1579 std::string(val), cp, string_vec);
1582 if (ti.get_size() > 0) {
1583 auto sti = ti.get_elem_type();
1584 size_t expected_size = ti.get_size() / sti.get_size();
1585 size_t actual_size = string_vec.size();
1586 if (actual_size != expected_size) {
1587 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1589 " values, received " +
1595 if (ti.get_size() > 0) {
1597 throw std::runtime_error(
"Fixed length array column " + cd->
columnName +
1598 " currently cannot accept NULL arrays");
1610 if (ti.get_size() > 0 &&
static_cast<size_t>(ti.get_size()) != d.length) {
1611 throw std::runtime_error(
"Fixed length array for column " + cd->
columnName +
1612 " has incorrect length: " + std::string(val));
1629 CHECK(
false) <<
"TypedImportBuffer::addDefaultValues() does not support type "
1636 std::vector<double>& coords,
1638 if (std::isinf(lat) || std::isnan(lat) || std::isinf(lon) || std::isnan(lon)) {
1643 if (!pt.transform(ti)) {
1649 coords.push_back(lon);
1650 coords.push_back(lat);
1657 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1659 std::vector<double>& coords,
1660 std::vector<double>& bounds,
1661 std::vector<int>& ring_sizes,
1662 std::vector<int>& poly_rings,
1664 const bool force_null) {
1666 const auto col_type = col_ti.
get_type();
1669 bool is_null_geo =
false;
1671 if (!col_ti.get_notnull()) {
1674 is_null_point =
true;
1677 is_null_geo = coords.empty();
1678 if (is_null_point) {
1683 is_null_geo =
false;
1695 tdd_coords.val.arr_val.reserve(compressed_coords.size());
1696 for (
auto cc : compressed_coords) {
1697 tdd_coords.val.arr_val.emplace_back();
1698 tdd_coords.val.arr_val.back().val.int_val = cc;
1701 tdd_coords.is_null = is_null_geo;
1702 import_buffers[col_idx++]->add_value(cd_coords, tdd_coords,
false);
1707 TDatum tdd_ring_sizes;
1708 tdd_ring_sizes.val.arr_val.reserve(ring_sizes.size());
1710 for (
auto ring_size : ring_sizes) {
1711 tdd_ring_sizes.val.arr_val.emplace_back();
1712 tdd_ring_sizes.val.arr_val.back().val.int_val = ring_size;
1715 tdd_ring_sizes.is_null = is_null_geo;
1716 import_buffers[col_idx++]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1722 TDatum tdd_poly_rings;
1723 tdd_poly_rings.val.arr_val.reserve(poly_rings.size());
1725 for (
auto num_rings : poly_rings) {
1726 tdd_poly_rings.val.arr_val.emplace_back();
1727 tdd_poly_rings.val.arr_val.back().val.int_val = num_rings;
1730 tdd_poly_rings.is_null = is_null_geo;
1731 import_buffers[col_idx++]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1737 tdd_bounds.val.arr_val.reserve(bounds.size());
1739 for (
auto b : bounds) {
1740 tdd_bounds.val.arr_val.emplace_back();
1741 tdd_bounds.val.arr_val.back().val.real_val = b;
1744 tdd_bounds.is_null = is_null_geo;
1745 import_buffers[col_idx++]->add_value(cd_bounds, tdd_bounds,
false);
1751 TDatum td_render_group;
1752 td_render_group.val.int_val = render_group;
1753 td_render_group.is_null = is_null_geo;
1754 import_buffers[col_idx++]->add_value(cd_render_group, td_render_group,
false);
1761 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
1763 std::vector<std::vector<double>>& coords_column,
1764 std::vector<std::vector<double>>& bounds_column,
1765 std::vector<std::vector<int>>& ring_sizes_column,
1766 std::vector<std::vector<int>>& poly_rings_column,
1767 std::vector<int>& render_groups_column) {
1769 const auto col_type = col_ti.
get_type();
1772 auto coords_row_count = coords_column.size();
1774 for (
auto& coords : coords_column) {
1775 bool is_null_geo =
false;
1777 if (!col_ti.get_notnull()) {
1780 is_null_point =
true;
1783 is_null_geo = coords.empty();
1784 if (is_null_point) {
1789 is_null_geo =
false;
1792 std::vector<TDatum> td_coords_data;
1794 std::vector<uint8_t> compressed_coords =
1796 for (
auto const& cc : compressed_coords) {
1798 td_byte.val.int_val = cc;
1799 td_coords_data.push_back(td_byte);
1803 tdd_coords.val.arr_val = td_coords_data;
1804 tdd_coords.is_null = is_null_geo;
1805 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
1810 if (ring_sizes_column.size() != coords_row_count) {
1811 CHECK(
false) <<
"Geometry import columnar: ring sizes column size mismatch";
1815 for (
auto const& ring_sizes : ring_sizes_column) {
1816 bool is_null_geo =
false;
1817 if (!col_ti.get_notnull()) {
1819 is_null_geo = ring_sizes.empty();
1821 std::vector<TDatum> td_ring_sizes;
1822 for (
auto const& ring_size : ring_sizes) {
1823 TDatum td_ring_size;
1824 td_ring_size.val.int_val = ring_size;
1825 td_ring_sizes.push_back(td_ring_size);
1827 TDatum tdd_ring_sizes;
1828 tdd_ring_sizes.val.arr_val = td_ring_sizes;
1829 tdd_ring_sizes.is_null = is_null_geo;
1830 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
1836 if (poly_rings_column.size() != coords_row_count) {
1837 CHECK(
false) <<
"Geometry import columnar: poly rings column size mismatch";
1841 for (
auto const& poly_rings : poly_rings_column) {
1842 bool is_null_geo =
false;
1843 if (!col_ti.get_notnull()) {
1845 is_null_geo = poly_rings.empty();
1847 std::vector<TDatum> td_poly_rings;
1848 for (
auto const& num_rings : poly_rings) {
1849 TDatum td_num_rings;
1850 td_num_rings.val.int_val = num_rings;
1851 td_poly_rings.push_back(td_num_rings);
1853 TDatum tdd_poly_rings;
1854 tdd_poly_rings.val.arr_val = td_poly_rings;
1855 tdd_poly_rings.is_null = is_null_geo;
1856 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
1862 if (bounds_column.size() != coords_row_count) {
1863 CHECK(
false) <<
"Geometry import columnar: bounds column size mismatch";
1866 for (
auto const& bounds : bounds_column) {
1867 bool is_null_geo =
false;
1868 if (!col_ti.get_notnull()) {
1872 std::vector<TDatum> td_bounds_data;
1873 for (
auto const& b : bounds) {
1875 td_double.val.real_val = b;
1876 td_bounds_data.push_back(td_double);
1879 tdd_bounds.val.arr_val = td_bounds_data;
1880 tdd_bounds.is_null = is_null_geo;
1881 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
1889 for (
auto const& render_group : render_groups_column) {
1890 TDatum td_render_group;
1891 td_render_group.val.int_val = render_group;
1892 td_render_group.is_null =
false;
1893 import_buffers[col_idx]->add_value(cd_render_group, td_render_group,
false);
1902 const std::list<const ColumnDescriptor*>& col_descs) {
1906 int collection_col_idx = -1;
1908 std::string collection_col_name;
1910 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
1911 auto const& cd = *cd_it;
1912 auto const col_type = cd->columnType.get_type();
1914 if (collection_col_idx >= 0) {
1915 throw std::runtime_error(
1916 "Explode Collections: Found more than one destination column");
1918 collection_col_idx = col_idx;
1919 collection_child_type = col_type;
1920 collection_col_name = cd->columnName;
1922 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
1927 if (collection_col_idx < 0) {
1928 throw std::runtime_error(
1929 "Explode Collections: Failed to find a supported column type to explode "
1932 return std::make_tuple(collection_col_idx, collection_child_type, collection_col_name);
1936 OGRGeometry* ogr_geometry,
1938 const std::string& collection_col_name,
1939 size_t row_or_feature_idx,
1940 std::function<
void(OGRGeometry*)> execute_import_lambda) {
1941 auto ogr_geometry_type = wkbFlatten(ogr_geometry->getGeometryType());
1942 bool is_collection =
false;
1943 switch (collection_child_type) {
1945 switch (ogr_geometry_type) {
1947 is_collection =
true;
1952 throw std::runtime_error(
1953 "Explode Collections: Source geo type must be MULTIPOINT or POINT");
1957 switch (ogr_geometry_type) {
1958 case wkbMultiLineString:
1959 is_collection =
true;
1964 throw std::runtime_error(
1965 "Explode Collections: Source geo type must be MULTILINESTRING or "
1970 switch (ogr_geometry_type) {
1971 case wkbMultiPolygon:
1972 is_collection =
true;
1977 throw std::runtime_error(
1978 "Explode Collections: Source geo type must be MULTIPOLYGON or POLYGON");
1982 CHECK(
false) <<
"Unsupported geo child type " << collection_child_type;
1988 if (is_collection) {
1990 OGRGeometryCollection* collection_geometry = ogr_geometry->toGeometryCollection();
1991 CHECK(collection_geometry);
1993 #if LOG_EXPLODE_COLLECTIONS
1995 LOG(
INFO) <<
"Exploding row/feature " << row_or_feature_idx <<
" for column '"
1996 << explode_col_name <<
"' into " << collection_geometry->getNumGeometries()
2001 uint32_t child_geometry_count = 0;
2002 auto child_geometry_it = collection_geometry->begin();
2003 while (child_geometry_it != collection_geometry->end()) {
2005 OGRGeometry* import_geometry = *child_geometry_it;
2007 [&] { execute_import_lambda(import_geometry); });
2010 child_geometry_it++;
2011 child_geometry_count++;
2016 [&] { execute_import_lambda(ogr_geometry); });
2028 std::unique_ptr<
char[]> scratch_buffer,
2033 size_t first_row_index_this_buffer,
2035 Executor* executor) {
2037 int64_t total_get_row_time_us = 0;
2038 int64_t total_str_to_val_time_us = 0;
2039 auto query_session = session_info ? session_info->
get_session_id() :
"";
2040 CHECK(scratch_buffer);
2041 auto buffer = scratch_buffer.get();
2048 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2051 const char* thread_buf = buffer + begin_pos + begin;
2052 const char* thread_buf_end = buffer + end_pos;
2053 const char* buf_end = buffer + total_size;
2054 bool try_single_thread =
false;
2055 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2060 for (
const auto cd : col_descs) {
2061 const auto& col_ti = cd->columnType;
2062 phys_cols += col_ti.get_physical_cols();
2063 if (cd->columnType.get_type() ==
kPOINT) {
2067 auto num_cols = col_descs.size() - phys_cols;
2068 for (
const auto& p : import_buffers) {
2071 std::vector<std::string_view> row;
2072 size_t row_index_plus_one = 0;
2073 for (
const char* p = thread_buf; p < thread_buf_end; p++) {
2075 std::vector<std::unique_ptr<char[]>>
2089 total_get_row_time_us += us;
2101 row_index_plus_one++;
2103 if (row.size() < num_cols || (num_cols + point_cols) < row.size()) {
2105 LOG(
ERROR) <<
"Incorrect Row (expected " << num_cols <<
" columns, has "
2117 auto execute_import_row = [&](OGRGeometry* import_geometry) {
2118 size_t import_idx = 0;
2121 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2123 const auto& col_ti = cd->columnType;
2126 (row[import_idx] == copy_params.
null_str || row[import_idx] ==
"NULL");
2136 if (!cd->columnType.is_string() && row[import_idx].empty()) {
2139 if (!cd->columnType.is_string() && !copy_params.
trim_spaces) {
2141 row[import_idx] =
sv_strip(row[import_idx]);
2144 if (col_ti.get_physical_cols() == 0) {
2147 import_buffers[col_idx]->add_value(
2148 cd, row[import_idx], is_null, copy_params);
2157 import_buffers[col_idx]->add_value(
2158 cd, copy_params.
null_str,
true, copy_params);
2161 auto const& geo_string = row[import_idx];
2167 SQLTypes col_type = col_ti.get_type();
2170 std::vector<double> coords;
2171 std::vector<double> bounds;
2172 std::vector<int> ring_sizes;
2173 std::vector<int> poly_rings;
2174 int render_group = 0;
2179 if (col_type ==
kPOINT && !is_null && geo_string.size() > 0 &&
2180 (geo_string[0] ==
'.' || isdigit(geo_string[0]) ||
2181 geo_string[0] ==
'-') &&
2182 geo_string.find_first_of(
"ABCDEFabcdef") == std::string::npos) {
2183 double lon = std::atof(std::string(geo_string).c_str());
2185 auto lat_str = row[import_idx];
2187 if (lat_str.size() > 0 &&
2188 (lat_str[0] ==
'.' || isdigit(lat_str[0]) || lat_str[0] ==
'-')) {
2189 lat = std::atof(std::string(lat_str).c_str());
2192 if (!copy_params.
lonlat) {
2203 import_ti.get_output_srid() == 4326) {
2207 import_ti.set_input_srid(srid0);
2211 throw std::runtime_error(
2212 "Cannot read lon/lat to insert into POINT column " +
2220 import_ti.get_output_srid() == 4326) {
2224 import_ti.set_input_srid(srid0);
2228 if (col_ti.get_notnull()) {
2229 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2239 if (import_geometry) {
2250 "Failed to extract valid geometry from exploded row " +
2252 row_index_plus_one) +
2253 " for column " + cd->columnName;
2254 throw std::runtime_error(msg);
2259 std::string(geo_string),
2266 std::string msg =
"Failed to extract valid geometry from row " +
2268 row_index_plus_one) +
2269 " for column " + cd->columnName;
2270 throw std::runtime_error(msg);
2275 if (col_type != import_ti.get_type()) {
2279 throw std::runtime_error(
2280 "Imported geometry doesn't match the type of column " +
2287 if (columnIdToRenderGroupAnalyzerMap.size()) {
2289 if (ring_sizes.size()) {
2291 auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2292 CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2294 (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2315 for (
int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
2324 "Table load was cancelled via Query Interrupt";
2328 }
catch (
const std::exception& e) {
2329 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2330 import_buffers[col_idx_to_pop]->pop_value();
2333 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
2336 LOG(
ERROR) <<
"Load was cancelled due to max reject rows being reached";
2339 "Load was cancelled due to max reject rows being reached";
2346 auto const [collection_col_idx, collection_child_type, collection_col_name] =
2349 CHECK_LT(collection_col_idx, (
int)row.size()) <<
"column index out of range";
2350 auto const& collection_geo_string = row[collection_col_idx];
2352 OGRGeometry* ogr_geometry =
nullptr;
2355 OGRGeometryFactory::destroyGeometry(ogr_geometry);
2359 std::string(collection_geo_string));
2362 collection_child_type,
2363 collection_col_name,
2364 first_row_index_this_buffer + row_index_plus_one,
2365 execute_import_row);
2369 [&] { execute_import_row(
nullptr); });
2376 total_str_to_val_time_us += us;
2386 LOG(
INFO) <<
"Thread" << std::this_thread::get_id() <<
":"
2388 << (double)ms / 1000.0 <<
"sec, Insert Time: " << (
double)load_ms / 1000.0
2389 <<
"sec, get_row: " << (double)total_get_row_time_us / 1000000.0
2390 <<
"sec, str_to_val: " << (
double)total_str_to_val_time_us / 1000000.0
2391 <<
"sec" << std::endl;
2394 return thread_import_status;
2400 : std::runtime_error(
"Column '" + column_name +
"' is not a geo column") {}
2406 OGRCoordinateTransformation* coordinate_transformation,
2408 size_t firstFeature,
2418 const std::list<const ColumnDescriptor*>& col_descs = importer->
get_column_descs();
2419 std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers =
2421 auto query_session = session_info ? session_info->
get_session_id() :
"";
2422 for (
const auto& p : import_buffers) {
2429 for (
size_t iFeature = 0; iFeature < numFeatures; iFeature++) {
2431 if (!features[iFeature]) {
2438 OGRGeometry* pGeometry = features[iFeature]->GetGeometryRef();
2439 if (pGeometry && coordinate_transformation) {
2440 pGeometry->transform(coordinate_transformation);
2447 auto execute_import_feature = [&](OGRGeometry* import_geometry) {
2453 thread_import_status.
load_msg =
"Table load was cancelled via Query Interrupt";
2457 uint32_t field_column_count{0u};
2458 uint32_t metadata_column_count{0u};
2460 for (
auto cd_it = col_descs.begin(); cd_it != col_descs.end(); cd_it++) {
2464 const auto& col_ti = cd->columnType;
2465 if (col_ti.is_geometry()) {
2470 SQLTypes col_type = col_ti.get_type();
2473 import_buffers[col_idx]->add_value(
2474 cd, copy_params.
null_str,
true, copy_params);
2478 std::vector<double> coords;
2479 std::vector<double> bounds;
2480 std::vector<int> ring_sizes;
2481 std::vector<int> poly_rings;
2482 int render_group = 0;
2486 bool is_null_geo = !import_geometry;
2488 if (col_ti.get_notnull()) {
2489 throw std::runtime_error(
"NULL geo for column " + cd->columnName);
2507 std::string msg =
"Failed to extract valid geometry from feature " +
2509 " for column " + cd->columnName;
2510 throw std::runtime_error(msg);
2514 if (col_type != import_ti.get_type()) {
2518 throw std::runtime_error(
2519 "Imported geometry doesn't match the type of column " +
2525 if (columnIdToRenderGroupAnalyzerMap.size()) {
2527 if (ring_sizes.size()) {
2529 auto rga_it = columnIdToRenderGroupAnalyzerMap.find(cd->columnId);
2530 CHECK(rga_it != columnIdToRenderGroupAnalyzerMap.end());
2532 (*rga_it).second->insertBoundsAndReturnRenderGroup(bounds);
2542 auto cd_coords = *cd_it;
2543 std::vector<TDatum> td_coord_data;
2545 std::vector<uint8_t> compressed_coords =
2547 for (
auto cc : compressed_coords) {
2549 td_byte.val.int_val = cc;
2550 td_coord_data.push_back(td_byte);
2554 tdd_coords.val.arr_val = td_coord_data;
2555 tdd_coords.is_null = is_null_geo;
2556 import_buffers[col_idx]->add_value(cd_coords, tdd_coords,
false);
2562 auto cd_ring_sizes = *cd_it;
2563 std::vector<TDatum> td_ring_sizes;
2565 for (
auto ring_size : ring_sizes) {
2566 TDatum td_ring_size;
2567 td_ring_size.val.int_val = ring_size;
2568 td_ring_sizes.push_back(td_ring_size);
2571 TDatum tdd_ring_sizes;
2572 tdd_ring_sizes.val.arr_val = td_ring_sizes;
2573 tdd_ring_sizes.is_null = is_null_geo;
2574 import_buffers[col_idx]->add_value(cd_ring_sizes, tdd_ring_sizes,
false);
2581 auto cd_poly_rings = *cd_it;
2582 std::vector<TDatum> td_poly_rings;
2584 for (
auto num_rings : poly_rings) {
2585 TDatum td_num_rings;
2586 td_num_rings.val.int_val = num_rings;
2587 td_poly_rings.push_back(td_num_rings);
2590 TDatum tdd_poly_rings;
2591 tdd_poly_rings.val.arr_val = td_poly_rings;
2592 tdd_poly_rings.is_null = is_null_geo;
2593 import_buffers[col_idx]->add_value(cd_poly_rings, tdd_poly_rings,
false);
2601 auto cd_bounds = *cd_it;
2602 std::vector<TDatum> td_bounds_data;
2604 for (
auto b : bounds) {
2606 td_double.val.real_val = b;
2607 td_bounds_data.push_back(td_double);
2611 tdd_bounds.val.arr_val = td_bounds_data;
2612 tdd_bounds.is_null = is_null_geo;
2613 import_buffers[col_idx]->add_value(cd_bounds, tdd_bounds,
false);
2620 auto cd_render_group = *cd_it;
2621 TDatum td_render_group;
2622 td_render_group.val.int_val = render_group;
2623 td_render_group.is_null = is_null_geo;
2624 import_buffers[col_idx]->add_value(cd_render_group, td_render_group,
false);
2627 }
else if (field_column_count < fieldNameToIndexMap.size()) {
2631 auto const cit = columnNameToSourceNameMap.find(cd->columnName);
2632 CHECK(cit != columnNameToSourceNameMap.end());
2633 auto const& field_name = cit->second;
2635 auto const fit = fieldNameToIndexMap.find(field_name);
2636 if (fit == fieldNameToIndexMap.end()) {
2640 auto const& field_index = fit->second;
2641 CHECK(field_index < fieldNameToIndexMap.size());
2643 auto const& feature = features[iFeature];
2645 auto field_defn = feature->GetFieldDefnRef(field_index);
2652 std::string value_string;
2653 int array_index = 0, array_size = 0;
2655 auto stringify_numeric_list = [&](
auto* values) {
2657 while (array_index < array_size) {
2658 auto separator = (array_index > 0) ?
"," :
"";
2662 value_string +=
"}";
2665 auto field_type = field_defn->GetType();
2666 switch (field_type) {
2675 value_string = feature->GetFieldAsString(field_index);
2677 case OFTIntegerList: {
2678 auto* values = feature->GetFieldAsIntegerList(field_index, &array_size);
2679 stringify_numeric_list(values);
2681 case OFTInteger64List: {
2682 auto* values = feature->GetFieldAsInteger64List(field_index, &array_size);
2683 stringify_numeric_list(values);
2686 auto* values = feature->GetFieldAsDoubleList(field_index, &array_size);
2687 stringify_numeric_list(values);
2689 case OFTStringList: {
2690 auto** array_of_strings = feature->GetFieldAsStringList(field_index);
2692 if (array_of_strings) {
2693 while (
auto* this_string = array_of_strings[array_index]) {
2694 auto separator = (array_index > 0) ?
"," :
"";
2695 value_string +=
separator + std::string(this_string);
2699 value_string +=
"}";
2702 throw std::runtime_error(
"Unsupported geo file field type (" +
2707 import_buffers[col_idx]->add_value(cd, value_string,
false, copy_params);
2709 field_column_count++;
2710 }
else if (metadata_column_count < metadata_column_infos.size()) {
2714 auto const& mci = metadata_column_infos[metadata_column_count];
2715 if (mci.column_descriptor.columnName != cd->columnName) {
2716 throw std::runtime_error(
"Metadata column name mismatch");
2718 import_buffers[col_idx]->add_value(cd, mci.value,
false, copy_params);
2720 metadata_column_count++;
2722 throw std::runtime_error(
"Column count mismatch");
2731 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Aborting import.";
2732 throw std::runtime_error(e.what());
2733 }
catch (
const std::exception& e) {
2734 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2735 import_buffers[col_idx_to_pop]->pop_value();
2738 LOG(
ERROR) <<
"Input exception thrown: " << e.what() <<
". Row discarded.";
2744 auto const [collection_idx_type_name, collection_child_type, collection_col_name] =
2747 collection_child_type,
2748 collection_col_name,
2749 firstFeature + iFeature + 1,
2750 execute_import_feature);
2753 execute_import_feature(pGeometry);
2759 float load_s = 0.0f;
2767 LOG(
INFO) <<
"DEBUG: Process " << convert_s <<
"s";
2768 LOG(
INFO) <<
"DEBUG: Load " << load_s <<
"s";
2769 LOG(
INFO) <<
"DEBUG: Total " << (convert_s + load_s) <<
"s";
2774 return thread_import_status;
2778 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2781 return loadImpl(import_buffers, row_count,
false, session_info);
2784 bool Loader::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
2787 return loadImpl(import_buffers, row_count,
true, session_info);
2794 const int8_t* values_buffer{
nullptr};
2795 if (ti.is_string()) {
2801 CHECK(values_buffer);
2802 const int logical_size = ti.is_string() ? ti.get_size() : ti.get_logical_size();
2803 switch (logical_size) {
2805 return values_buffer[index];
2808 return reinterpret_cast<const int16_t*
>(values_buffer)[index];
2811 return reinterpret_cast<const int32_t*
>(values_buffer)[index];
2814 return reinterpret_cast<const int64_t*
>(values_buffer)[index];
2817 LOG(
FATAL) <<
"Unexpected size for shard key: " << logical_size;
2826 const auto values_buffer = import_buffer.
getAsBytes();
2827 return reinterpret_cast<const float*
>(may_alias_ptr(values_buffer))[index];
2833 const auto values_buffer = import_buffer.
getAsBytes();
2834 return reinterpret_cast<const double*
>(may_alias_ptr(values_buffer))[index];
2842 for (
size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
2843 const auto& input_buffer = import_buffers[col_idx];
2844 const auto& col_ti = input_buffer->getTypeInfo();
2850 shard_output_buffers[col_idx]->addBoolean(
int_value_at(*input_buffer, row_index));
2853 shard_output_buffers[col_idx]->addTinyint(
int_value_at(*input_buffer, row_index));
2856 shard_output_buffers[col_idx]->addSmallint(
2860 shard_output_buffers[col_idx]->addInt(
int_value_at(*input_buffer, row_index));
2863 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2866 shard_output_buffers[col_idx]->addFloat(
float_value_at(*input_buffer, row_index));
2869 shard_output_buffers[col_idx]->addDouble(
2875 CHECK_LT(row_index, input_buffer->getStringBuffer()->size());
2876 shard_output_buffers[col_idx]->addString(
2877 (*input_buffer->getStringBuffer())[row_index]);
2883 shard_output_buffers[col_idx]->addBigint(
int_value_at(*input_buffer, row_index));
2887 CHECK(input_buffer->getStringArrayBuffer());
2888 CHECK_LT(row_index, input_buffer->getStringArrayBuffer()->size());
2889 const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[row_index];
2890 shard_output_buffers[col_idx]->addStringArray(input_arr);
2892 shard_output_buffers[col_idx]->addArray(
2893 (*input_buffer->getArrayBuffer())[row_index]);
2900 CHECK_LT(row_index, input_buffer->getGeoStringBuffer()->size());
2901 shard_output_buffers[col_idx]->addGeoString(
2902 (*input_buffer->getGeoStringBuffer())[row_index]);
2912 std::vector<OneShardBuffers>& all_shard_import_buffers,
2913 std::vector<size_t>& all_shard_row_counts,
2915 const size_t row_count,
2916 const size_t shard_count,
2923 shard_col_desc = col_desc;
2927 CHECK(shard_col_desc);
2930 const auto& shard_col_ti = shard_col_desc->columnType;
2931 CHECK(shard_col_ti.is_integer() ||
2932 (shard_col_ti.is_string() && shard_col_ti.get_compression() ==
kENCODING_DICT) ||
2933 shard_col_ti.is_time());
2934 if (shard_col_ti.is_string()) {
2935 const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
2936 CHECK(payloads_ptr);
2937 shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
2940 for (
size_t i = 0; i < row_count; ++i) {
2941 const size_t shard =
2943 auto& shard_output_buffers = all_shard_import_buffers[shard];
2945 ++all_shard_row_counts[shard];
2950 std::vector<OneShardBuffers>& all_shard_import_buffers,
2951 std::vector<size_t>& all_shard_row_counts,
2953 const size_t row_count,
2954 const size_t shard_count,
2957 CHECK(shard_tds.size() == shard_count);
2959 for (
size_t shard = 0; shard < shard_count; ++shard) {
2960 auto& shard_output_buffers = all_shard_import_buffers[shard];
2961 if (row_count != 0) {
2966 all_shard_row_counts[shard] = shard_tds[shard]->fragmenter->getNumRows();
2971 std::vector<size_t>& all_shard_row_counts,
2973 const size_t row_count,
2974 const size_t shard_count,
2976 all_shard_row_counts.resize(shard_count);
2977 for (
size_t shard_idx = 0; shard_idx < shard_count; ++shard_idx) {
2978 all_shard_import_buffers.emplace_back();
2979 for (
const auto& typed_import_buffer : import_buffers) {
2980 all_shard_import_buffers.back().emplace_back(
2982 typed_import_buffer->getStringDictionary()));
2988 all_shard_row_counts,
2995 all_shard_row_counts,
3004 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3013 std::vector<OneShardBuffers> all_shard_import_buffers;
3014 std::vector<size_t> all_shard_row_counts;
3017 all_shard_row_counts,
3020 shard_tables.size(),
3022 bool success =
true;
3023 for (
size_t shard_idx = 0; shard_idx < shard_tables.size(); ++shard_idx) {
3024 success = success &&
loadToShard(all_shard_import_buffers[shard_idx],
3025 all_shard_row_counts[shard_idx],
3026 shard_tables[shard_idx],
3036 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers) {
3038 std::vector<std::pair<const size_t, std::future<int8_t*>>>
3039 encoded_data_block_ptrs_futures;
3041 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
3044 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
3047 encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
3050 import_buffers[buf_idx]->addDictEncodedString(*string_payload_ptr);
3056 for (
size_t buf_idx = 0; buf_idx <
import_buffers.size(); buf_idx++) {
3063 auto string_payload_ptr =
import_buffers[buf_idx]->getStringBuffer();
3073 auto geo_payload_ptr =
import_buffers[buf_idx]->getGeoStringBuffer();
3090 for (
auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
3091 result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
3097 const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3105 bool success =
false;
3108 }
catch (std::exception& e) {
3109 std::ostringstream oss;
3110 oss <<
"Exception when loading Table " << shard_table->
tableName <<
", issue was "
3121 for (
auto& buffer : import_buffers) {
3122 ins_data.
columnIds.push_back(buffer->getColumnDesc()->columnId);
3130 loader_lock.unlock();
3135 shard_table->
fragmenter->insertData(ins_data);
3137 shard_table->
fragmenter->insertDataNoCheckpoint(ins_data);
3139 }
catch (std::exception& e) {
3140 std::ostringstream oss;
3141 oss <<
"Fragmenter Insert Exception when processing Table "
3142 << shard_table->
tableName <<
" issue was " << e.what();
3154 std::vector<const TableDescriptor*> table_descs(1,
table_desc_);
3158 for (
auto table_desc : table_descs) {
3159 table_desc->fragmenter->dropColumns(columnIds);
3169 CHECK(cd->columnType.is_string() || cd->columnType.is_string_array());
3172 dict_map_[cd->columnId] = dd->stringDict.get();
3185 const std::string& file_path,
3186 const bool decompressed,
3193 throw std::runtime_error(
"failed to open file '" + file_path +
3194 "': " + strerror(errno));
3200 line.reserve(1 * 1024 * 1024);
3201 auto end_time = std::chrono::steady_clock::now() +
3202 timeout * (boost::istarts_with(file_path,
"s3://") ? 3 : 1);
3208 if (n++ >= line.capacity()) {
3220 if (
line1.empty()) {
3222 }
else if (line ==
line1) {
3231 if (std::chrono::steady_clock::now() >
end_time) {
3237 }
catch (std::exception& e) {
3256 if (boost::filesystem::extension(
file_path) ==
".tsv") {
3263 const char* buf =
raw_data.c_str();
3264 const char* buf_end = buf +
raw_data.size();
3265 bool try_single_thread =
false;
3266 for (
const char* p = buf; p < buf_end; p++) {
3267 std::vector<std::string> row;
3268 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3279 if (try_single_thread) {
3283 if (try_single_thread) {
3286 for (
const char* p = buf; p < buf_end; p++) {
3287 std::vector<std::string> row;
3288 std::vector<std::unique_ptr<char[]>> tmp_buffers;
3306 boost::lexical_cast<
T>(str);
3307 }
catch (
const boost::bad_lexical_cast& e) {
3315 if (try_cast<double>(str)) {
3320 if (try_cast<int16_t>(str)) {
3322 }
else if (try_cast<int32_t>(str)) {
3324 }
else if (try_cast<int64_t>(str)) {
3326 }
else if (try_cast<float>(str)) {
3332 if (type ==
kTEXT) {
3334 std::string str_upper_case = str;
3336 str_upper_case.begin(), str_upper_case.end(), str_upper_case.begin(), ::toupper);
3339 if (str_upper_case.find(
"POINT") == 0) {
3341 }
else if (str_upper_case.find(
"LINESTRING") == 0) {
3343 }
else if (str_upper_case.find(
"POLYGON") == 0) {
3349 }
else if (str_upper_case.find(
"MULTIPOLYGON") == 0) {
3351 }
else if (str_upper_case.find_first_not_of(
"0123456789ABCDEF") ==
3352 std::string::npos &&
3353 (str_upper_case.size() % 2) == 0) {
3355 if (str_upper_case.size() >= 10) {
3360 auto first_five_bytes = str_upper_case.substr(0, 10);
3361 if (first_five_bytes ==
"0000000001" || first_five_bytes ==
"0101000000") {
3363 }
else if (first_five_bytes ==
"0000000002" || first_five_bytes ==
"0102000000") {
3365 }
else if (first_five_bytes ==
"0000000003" || first_five_bytes ==
"0103000000") {
3367 }
else if (first_five_bytes ==
"0000000006" || first_five_bytes ==
"0106000000") {
3381 if (type ==
kTEXT) {
3396 std::vector<SQLTypes> types(row.size());
3397 for (
size_t i = 0; i < row.size(); i++) {
3404 static std::array<int, kSQLTYPE_LAST> typeorder;
3405 typeorder[
kCHAR] = 0;
3408 typeorder[
kINT] = 4;
3413 typeorder[
kTIME] = 9;
3414 typeorder[
kDATE] = 10;
3419 typeorder[
kTEXT] = 12;
3422 return typeorder[b] < typeorder[
a];
3453 const std::vector<std::vector<std::string>>& raw_rows,
3459 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3460 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3463 throw std::runtime_error(
"No rows found in: " +
3464 boost::filesystem::basename(
file_path));
3467 size_t num_cols =
raw_rows.front().size();
3468 std::vector<SQLTypes> best_types(num_cols,
kCHAR);
3469 std::vector<size_t> non_null_col_counts(num_cols, 0);
3470 for (
auto row = row_begin; row != row_end; row++) {
3471 while (best_types.size() < row->size() || non_null_col_counts.size() < row->size()) {
3472 best_types.push_back(
kCHAR);
3473 non_null_col_counts.push_back(0);
3475 for (
size_t col_idx = 0; col_idx < row->size(); col_idx++) {
3477 if (row->at(col_idx) ==
"" || !row->at(col_idx).compare(copy_params.
null_str)) {
3481 non_null_col_counts[col_idx]++;
3483 best_types[col_idx] = t;
3486 if (std::chrono::steady_clock::now() >
end_time) {
3490 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3493 if (non_null_col_counts[col_idx] == 0) {
3494 best_types[col_idx] =
kTEXT;
3502 const std::vector<std::vector<std::string>>::const_iterator& row_begin,
3503 const std::vector<std::vector<std::string>>::const_iterator& row_end,
3504 const std::vector<SQLTypes>& best_types) {
3506 throw std::runtime_error(
"No rows found in: " +
3507 boost::filesystem::basename(
file_path));
3509 size_t num_cols = best_types.size();
3510 std::vector<EncodingType> best_encodes(num_cols,
kENCODING_NONE);
3511 std::vector<size_t> num_rows_per_col(num_cols, 1);
3512 std::vector<std::unordered_set<std::string>> count_set(num_cols);
3513 for (
auto row = row_begin; row != row_end; row++) {
3514 for (
size_t col_idx = 0; col_idx < row->size() && col_idx < num_cols; col_idx++) {
3516 count_set[col_idx].insert(row->at(col_idx));
3517 num_rows_per_col[col_idx]++;
3521 for (
size_t col_idx = 0; col_idx < num_cols; col_idx++) {
3524 static_cast<float>(count_set[col_idx].size()) / num_rows_per_col[col_idx];
3525 if (uniqueRatio < 0.75) {
3530 return best_encodes;
3538 const std::vector<SQLTypes>& tail_types) {
3539 if (head_types.size() != tail_types.size()) {
3543 for (
size_t col_idx = 0; col_idx < tail_types.size(); col_idx++) {
3544 if (head_types[col_idx] !=
kTEXT) {
3547 has_headers = has_headers || tail_types[col_idx] !=
kTEXT;
3553 #if defined(ENABLE_IMPORT_PARQUET)
3554 if (data_preview_.has_value()) {
3555 return data_preview_.value().sample_rows;
3561 std::vector<std::vector<std::string>> sample_rows(
raw_rows.begin() + offset,
3568 #if defined(ENABLE_IMPORT_PARQUET)
3569 if (data_preview_.has_value()) {
3570 return data_preview_.value().column_names;
3587 #if defined(ENABLE_IMPORT_PARQUET)
3588 if (data_preview_.has_value()) {
3589 return data_preview_.value().column_types;
3593 std::vector<SQLTypeInfo> types;
3602 void Importer::load(
const std::vector<std::unique_ptr<TypedImportBuffer>>& import_buffers,
3605 if (!
loader->loadNoCheckpoint(import_buffers, row_count, session_info)) {
3613 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
3618 loader->setTableEpochs(table_epochs);
3624 if (
loader->getTableDesc()->persistenceLevel ==
3631 if (!p->stringDictCheckpoint()) {
3632 LOG(
ERROR) <<
"Checkpointing Dictionary for Column "
3633 << p->getColumnDesc()->columnName <<
" failed.";
3642 LOG(
INFO) <<
"Dictionary Checkpointing took " << (double)ms / 1000.0 <<
" Seconds."
3654 std::vector<std::string> file_paths;
3668 for (
const auto&
file_path : file_paths) {
3684 #ifdef ENABLE_IMPORT_PARQUET
3685 import_parquet(file_paths, session_info);
3687 throw std::runtime_error(
"Parquet not supported!");
3697 #ifdef ENABLE_IMPORT_PARQUET
3700 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3703 foreign_storage::ParquetS3DetectFileSystem::ParquetS3DetectFileSystemConfiguration
3721 const CopyParams& copy_params) {
3726 auto [foreign_server, user_mapping, foreign_table] =
3729 std::shared_ptr<arrow::fs::FileSystem> file_system;
3730 auto& server_options = foreign_server->options;
3735 file_system = std::make_shared<arrow::fs::LocalFileSystem>();
3737 }
else if (server_options
3742 create_parquet_s3_detect_filesystem_config(foreign_server.get(), copy_params));
3748 auto parquet_data_wrapper = std::make_unique<foreign_storage::ParquetDataWrapper>(
3749 foreign_table.get(), file_system);
3760 #ifdef ENABLE_IMPORT_PARQUET
3762 !g_enable_legacy_parquet_import) {
3763 data_preview_ = get_parquet_data_preview(fp.string(), cp);
3772 #ifdef ENABLE_IMPORT_PARQUET
3774 std::shared_ptr<arrow::io::ReadableFile>& infile,
3775 std::unique_ptr<parquet::arrow::FileReader>& reader,
3776 std::shared_ptr<arrow::Table>& table) {
3777 using namespace parquet::arrow;
3778 auto file_result = arrow::io::ReadableFile::Open(file_path);
3779 PARQUET_THROW_NOT_OK(file_result.status());
3780 infile = file_result.ValueOrDie();
3782 PARQUET_THROW_NOT_OK(OpenFile(infile, arrow::default_memory_pool(), &reader));
3783 PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
3784 const auto num_row_groups = reader->num_row_groups();
3785 const auto num_columns = table->num_columns();
3786 const auto num_rows = table->num_rows();
3787 LOG(
INFO) <<
"File " << file_path <<
" has " << num_rows <<
" rows and " << num_columns
3788 <<
" columns in " << num_row_groups <<
" groups.";
3789 return std::make_tuple(num_row_groups, num_columns, num_rows);
3792 void Detector::import_local_parquet(
const std::string& file_path,
3795 std::shared_ptr<arrow::io::ReadableFile> infile;
3796 std::unique_ptr<parquet::arrow::FileReader> reader;
3797 std::shared_ptr<arrow::Table> table;
3798 int num_row_groups, num_columns;
3800 std::tie(num_row_groups, num_columns, num_rows) =
3805 copy_params.line_delim =
'\n';
3806 copy_params.delimiter =
',';
3808 copy_params.quoted =
true;
3809 copy_params.quote =
'"';
3810 copy_params.escape =
'"';
3811 for (
int c = 0; c < num_columns; ++c) {
3815 raw_data += table->ColumnNames().at(c);
3817 raw_data += copy_params.line_delim;
3821 for (
int g = 0; g < num_row_groups; ++g) {
3823 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
3824 std::vector<VarValue (*)(const Array&, const int64_t)> getters;
3825 arrays.resize(num_columns);
3826 for (
int c = 0; c < num_columns; ++c) {
3827 PARQUET_THROW_NOT_OK(reader->RowGroup(g)->Column(c)->Read(&arrays[c]));
3828 for (
auto chunk : arrays[c]->chunks()) {
3829 getters.push_back(
value_getter(*chunk,
nullptr,
nullptr));
3832 for (
int r = 0; r < num_rows; ++r) {
3833 for (
int c = 0; c < num_columns; ++c) {
3834 std::vector<std::string> buffer;
3835 for (
auto chunk : arrays[c]->chunks()) {
3836 DataBuffer<std::string> data(&cd, *chunk, buffer,
nullptr);
3840 if (!chunk->IsNull(r)) {
3842 raw_data += boost::replace_all_copy(
3843 (data << getters[c](*chunk, r)).buffer.front(),
"\"",
"\"\"");
3848 raw_data += copy_params.line_delim;
3860 template <
typename DATA_TYPE>
3862 std::vector<DATA_TYPE>& buffer,
3864 const auto old_size = buffer.size();
3866 for (
auto rit = bad_rows_tracker->
rows.crbegin(); rit != bad_rows_tracker->
rows.crend();
3868 buffer.erase(buffer.begin() + *rit);
3870 return std::make_tuple(old_size, buffer.size());
3874 BadRowsTracker*
const bad_rows_tracker) {
3907 throw std::runtime_error(
"Invalid Type");
3911 void Importer::import_local_parquet(
const std::string& file_path,
3913 std::shared_ptr<arrow::io::ReadableFile> infile;
3914 std::unique_ptr<parquet::arrow::FileReader> reader;
3915 std::shared_ptr<arrow::Table> table;
3916 int num_row_groups, num_columns;
3917 int64_t nrow_in_file;
3918 std::tie(num_row_groups, num_columns, nrow_in_file) =
3923 std::vector<const ColumnDescriptor*> cds;
3925 int num_physical_cols = 0;
3926 for (
auto& cd : column_list) {
3930 arrow_throw_if(num_columns != (
int)(column_list.size() - num_physical_cols),
3931 "Unmatched numbers of columns in parquet file " + file_path +
": " +
3934 " columns in table.");
3938 const int num_slices = std::max<decltype(max_threads)>(
max_threads, num_columns);
3941 size_t nrow_completed{0};
3944 auto get_physical_col_idx = [&cds](
const int logic_col_idx) ->
auto {
3945 int physical_col_idx = 0;
3946 for (
int i = 0; i < logic_col_idx; ++i) {
3947 physical_col_idx += 1 + cds[physical_col_idx]->columnType.get_physical_cols();
3949 return physical_col_idx;
3952 auto query_session = session_info ? session_info->
get_session_id() :
"";
3954 for (
int row_group = 0; row_group < num_row_groups; ++row_group) {
3969 for (
int slice = 0; slice < num_slices; slice++) {
3971 for (
const auto cd : cds) {
3973 new TypedImportBuffer(cd,
loader->getStringDict(cd)));
3985 std::vector<BadRowsTracker> bad_rows_trackers(num_slices);
3986 for (
size_t slice = 0; slice < bad_rows_trackers.size(); ++slice) {
3987 auto& bad_rows_tracker = bad_rows_trackers[slice];
3989 bad_rows_tracker.row_group = slice;
3990 bad_rows_tracker.importer =
this;
3993 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
3994 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
3995 const auto cd = cds[physical_col_idx];
3996 std::shared_ptr<arrow::ChunkedArray> array;
3997 PARQUET_THROW_NOT_OK(
3998 reader->RowGroup(row_group)->Column(logic_col_idx)->Read(&array));
3999 const size_t array_size = array->length();
4000 const size_t slice_size = (array_size + num_slices - 1) / num_slices;
4002 for (
int slice = 0; slice < num_slices; ++slice) {
4008 thread_controller.startThread([&, slice] {
4009 const auto slice_offset = slice % num_slices;
4011 std::min<size_t>((slice_offset + 0) * slice_size, array_size),
4012 std::min<size_t>((slice_offset + 1) * slice_size, array_size));
4013 auto& bad_rows_tracker = bad_rows_trackers[slice];
4016 import_buffer->col_idx = physical_col_idx + 1;
4017 for (
auto chunk : array->chunks()) {
4018 import_buffer->add_arrow_values(
4019 cd, *chunk,
false, slice_range, &bad_rows_tracker);
4023 thread_controller.finish();
4025 std::vector<size_t> nrow_in_slice_raw(num_slices);
4026 std::vector<size_t> nrow_in_slice_successfully_loaded(num_slices);
4028 for (
int logic_col_idx = 0; logic_col_idx < num_columns; ++logic_col_idx) {
4029 const auto physical_col_idx = get_physical_col_idx(logic_col_idx);
4030 const auto cd = cds[physical_col_idx];
4031 for (
int slice = 0; slice < num_slices; ++slice) {
4032 auto& bad_rows_tracker = bad_rows_trackers[slice];
4034 std::tie(nrow_in_slice_raw[slice], nrow_in_slice_successfully_loaded[slice]) =
4039 for (
int slice = 0; slice < num_slices; ++slice) {
4041 nrow_in_slice_successfully_loaded[slice],
4045 const auto nrow_original =
4046 std::accumulate(nrow_in_slice_raw.begin(), nrow_in_slice_raw.end(), 0);
4047 const auto nrow_imported =
4049 nrow_in_slice_successfully_loaded.end(),
4051 const auto nrow_dropped = nrow_original - nrow_imported;
4052 LOG(
INFO) <<
"row group " << row_group <<
": add " << nrow_imported
4053 <<
" rows, drop " << nrow_dropped <<
" rows.";
4061 ") rows rejected exceeded. Halting load.";
4062 LOG(
ERROR) <<
"Maximum (" << copy_params.max_reject
4063 <<
") rows rejected exceeded. Halting load.";
4068 nrow_completed += nrow_imported;
4070 nrow_in_file ? (float)filesize * nrow_completed / nrow_in_file : 0;
4072 const auto total_file_offset =
4075 if (total_file_offset) {
4081 << total_file_offset;
4085 LOG(
INFO) <<
"Import " << nrow_in_file <<
" rows of parquet file " << file_path
4086 <<
" took " << (double)ms_load_a_file / 1000.0 <<
" secs";
4089 void DataStreamSink::import_parquet(std::vector<std::string>& file_paths,
4091 auto importer =
dynamic_cast<Importer*
>(
this);
4092 auto table_epochs = importer ? importer->getLoader()->getTableEpochs()
4093 : std::vector<Catalog_Namespace::TableEpochInfo>{};
4095 std::exception_ptr teptr;
4098 for (
auto const& file_path : file_paths) {
4099 std::map<int, std::string> url_parts;
4103 std::vector<std::string> objkeys;
4104 std::unique_ptr<S3ParquetArchive> us3arch;
4105 if (
"s3" == url_parts[2]) {
4108 copy_params.s3_access_key,
4109 copy_params.s3_secret_key,
4110 copy_params.s3_session_token,
4111 copy_params.s3_region,
4112 copy_params.s3_endpoint,
4113 copy_params.plain_text,
4114 copy_params.regex_path_filter,
4115 copy_params.file_sort_order_by,
4116 copy_params.file_sort_regex));
4117 us3arch->init_for_read();
4119 objkeys = us3arch->get_objkeys();
4121 throw std::runtime_error(
"AWS S3 support not available");
4122 #endif // HAVE_AWS_S3
4124 objkeys.emplace_back(file_path);
4129 for (
auto const& objkey : objkeys) {
4133 ? us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this))
4135 import_local_parquet(file_path, session_info);
4137 us3arch->vacuum(objkey);
4141 us3arch->vacuum(objkey);
4153 std::rethrow_exception(teptr);
4160 }
catch (
const std::exception& e) {
4167 importer->checkpoint(table_epochs);
4170 #endif // ENABLE_IMPORT_PARQUET
4173 std::vector<std::string>& file_paths,
4183 _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4185 auto pipe_res = pipe(fd);
4188 throw std::runtime_error(std::string(
"failed to create a pipe: ") + strerror(errno));
4191 signal(SIGPIPE, SIG_IGN);
4194 std::exception_ptr teptr;
4198 auto th_pipe_reader = std::thread([&]() {
4201 if (0 == (
p_file = fdopen(fd[0],
"r"))) {
4202 throw std::runtime_error(std::string(
"failed to open a pipe: ") +
4212 teptr = std::current_exception();
4225 auto th_pipe_writer = std::thread([&]() {
4226 std::unique_ptr<S3Archive> us3arch;
4228 for (
size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4230 auto file_path = file_paths[fi];
4231 std::unique_ptr<Archive> uarch;
4232 std::map<int, std::string> url_parts;
4234 const std::string S3_objkey_url_scheme =
"s3ok";
4235 if (
"file" == url_parts[2] ||
"" == url_parts[2]) {
4237 }
else if (
"s3" == url_parts[2]) {
4242 copy_params.s3_access_key,
4243 copy_params.s3_secret_key,
4244 copy_params.s3_session_token,
4245 copy_params.s3_region,
4246 copy_params.s3_endpoint,
4247 copy_params.plain_text,
4248 copy_params.regex_path_filter,
4249 copy_params.file_sort_order_by,
4250 copy_params.file_sort_regex));
4251 us3arch->init_for_read();
4254 for (
const auto& objkey : us3arch->get_objkeys()) {
4255 file_paths.emplace_back(std::string(S3_objkey_url_scheme) +
"://" + objkey);
4259 throw std::runtime_error(
"AWS S3 support not available");
4260 #endif // HAVE_AWS_S3
4261 }
else if (S3_objkey_url_scheme == url_parts[2]) {
4263 auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4265 us3arch->land(objkey, teptr,
nullptr != dynamic_cast<Detector*>(
this));
4266 if (0 == file_path.size()) {
4267 throw std::runtime_error(std::string(
"failed to land s3 object: ") + objkey);
4271 us3arch->vacuum(objkey);
4273 throw std::runtime_error(
"AWS S3 support not available");
4274 #endif // HAVE_AWS_S3
4276 #if 0 // TODO(ppan): implement and enable any other archive class
4278 if (
"hdfs" == url_parts[2])
4279 uarch.reset(
new HdfsArchive(file_path));
4282 throw std::runtime_error(std::string(
"unsupported archive url: ") + file_path);
4286 auto& arch = *uarch;
4292 bool just_saw_archive_header;
4293 bool is_detecting =
nullptr !=
dynamic_cast<Detector*
>(
this);
4294 bool first_text_header_skipped =
false;
4298 size_t num_block_read = 0;
4299 while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4300 bool insert_line_delim_after_this_file =
false;
4303 auto ok = arch.read_data_block(&buf, &size, &offset);
4319 const char* buf2 = (
const char*)buf;
4322 just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4323 while (size2-- > 0) {
4324 if (*buf2++ == copy_params.line_delim) {
4329 LOG(
WARNING) <<
"No line delimiter in block." << std::endl;
4331 just_saw_archive_header =
false;
4332 first_text_header_skipped =
true;
4341 int nremaining = size2;
4342 while (nremaining > 0) {
4344 int nwritten =
write(fd[1], buf2, nremaining);
4348 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4353 throw std::runtime_error(
4354 std::string(
"failed or interrupted write to pipe: ") +
4357 }
else if (nwritten == nremaining) {
4362 nremaining -= nwritten;
4373 const char* plast =
static_cast<const char*
>(buf) + (size - 1);
4374 insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4382 if (insert_line_delim_after_this_file) {
4385 int nwritten =
write(fd[1], ©_params.line_delim, 1);
4389 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4394 throw std::runtime_error(
4395 std::string(
"failed or interrupted write to pipe: ") +
4398 }
else if (nwritten == 1) {
4414 if (
nullptr != dynamic_cast<Detector*>(
this)) {
4419 teptr = std::current_exception();
4428 th_pipe_reader.join();
4429 th_pipe_writer.join();
4433 std::rethrow_exception(teptr);
4442 const std::string& file_path,
4443 const bool decompressed,
4446 auto query_session = session_info ? session_info->
get_session_id() :
"";
4452 throw std::runtime_error(
"failed to open file '" + file_path +
4453 "': " + strerror(errno));
4456 if (!decompressed) {
4457 (void)fseek(
p_file, 0, SEEK_END);
4465 size_t alloc_size = copy_params.buffer_size;
4466 if (!decompressed &&
file_size < alloc_size) {
4472 for (
const auto cd :
loader->get_column_descs()) {
4474 std::make_unique<TypedImportBuffer>(cd,
loader->getStringDict(cd)));
4478 auto scratch_buffer = std::make_unique<char[]>(alloc_size);
4479 size_t current_pos = 0;
4481 size_t begin_pos = 0;
4483 (void)fseek(
p_file, current_pos, SEEK_SET);
4485 fread(reinterpret_cast<void*>(scratch_buffer.get()), 1, alloc_size,
p_file);
4491 auto* td =
loader->getTableDesc();
4493 auto column_descriptors =
4494 cat.getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
4495 for (
auto const& cd : column_descriptors) {
4497 auto rga = std::make_shared<RenderGroupAnalyzer>();
4499 columnIdToRenderGroupAnalyzerMap[cd->
columnId] = rga;
4505 loader->getTableDesc()->tableId};
4506 auto table_epochs =
loader->getTableEpochs();
4509 std::list<std::future<ImportStatus>> threads;
4513 std::stack<size_t> stack_thread_ids;
4515 stack_thread_ids.push(i);
4518 size_t first_row_index_this_buffer = 0;
4521 unsigned int num_rows_this_buffer = 0;
4522 CHECK(scratch_buffer);
4527 first_row_index_this_buffer,
4528 num_rows_this_buffer,
4532 int nresidual = size - end_pos;
4533 std::unique_ptr<char[]> unbuf;
4534 if (nresidual > 0) {
4535 unbuf = std::make_unique<char[]>(nresidual);
4536 memcpy(unbuf.get(), scratch_buffer.get() + end_pos, nresidual);
4540 auto thread_id = stack_thread_ids.top();
4541 stack_thread_ids.pop();
4548 std::move(scratch_buffer),
4552 columnIdToRenderGroupAnalyzerMap,
4553 first_row_index_this_buffer,
4557 first_row_index_this_buffer += num_rows_this_buffer;
4559 current_pos += end_pos;
4560 scratch_buffer = std::make_unique<char[]>(alloc_size);
4561 CHECK(scratch_buffer);
4562 memcpy(scratch_buffer.get(), unbuf.get(), nresidual);
4564 fread(scratch_buffer.get() + nresidual, 1, alloc_size - nresidual,
p_file);
4567 while (threads.size() > 0) {
4569 for (std::list<std::future<ImportStatus>>::iterator it = threads.begin();
4570 it != threads.end();) {
4572 std::chrono::milliseconds span(0);
4573 if (p.wait_for(span) == std::future_status::ready) {
4574 auto ret_import_status = p.get();
4578 if (ret_import_status.load_failed) {
4583 size_t total_file_offset{0};
4587 total_file_offset += file_offset;
4591 if (decompressed ? total_file_offset : current_pos) {
4600 << total_file_offset;
4603 stack_thread_ids.push(ret_import_status.thread_id);
4604 threads.erase(it++);
4612 std::this_thread::yield();
4635 LOG(
ERROR) <<
"Maximum rows rejected exceeded. Halting load";
4645 for (
auto& p : threads) {
4671 const std::vector<Catalog_Namespace::TableEpochInfo>& table_epochs) {
4677 const std::string& file_name,
4686 throw std::runtime_error(
"Unexpected CopyParams.source_type (" +
4697 const std::string& file_name) {
4699 OGRLayer* poLayer =
nullptr;
4700 if (geo_layer_name.size()) {
4701 poLayer = poDS->GetLayerByName(geo_layer_name.c_str());
4702 if (poLayer ==
nullptr) {
4703 throw std::runtime_error(
"Layer '" + geo_layer_name +
"' not found in " +
4707 poLayer = poDS->GetLayer(0);
4708 if (poLayer ==
nullptr) {
4709 throw std::runtime_error(
"No layers found in " + file_name);
4719 const std::string& file_name,
4720 const std::string& geo_column_name,
4721 std::map<std::string, std::vector<std::string>>& sample_data,
4726 if (datasource ==
nullptr) {
4727 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
4734 auto const* feature_defn = layer.GetLayerDefn();
4735 CHECK(feature_defn);
4738 auto const metadata_column_infos =
4743 auto const feature_count =
static_cast<uint64_t
>(layer.GetFeatureCount());
4744 auto const num_features = std::min(static_cast<uint64_t>(row_limit), feature_count);
4747 for (
int field_index = 0; field_index < feature_defn->GetFieldCount(); field_index++) {
4748 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4750 sample_data[column_name] = {};
4752 sample_data[geo_column_name] = {};
4753 for (
auto const& mci : metadata_column_infos) {
4754 sample_data[mci.column_descriptor.columnName] = {};
4758 layer.ResetReading();
4761 uint64_t feature_index{0u};
4762 while (feature_index < num_features) {
4770 auto const* geometry = feature->GetGeometryRef();
4771 if (geometry ==
nullptr) {
4776 switch (wkbFlatten(geometry->getGeometryType())) {
4780 case wkbMultiPolygon:
4783 case wkbMultiLineString:
4786 throw std::runtime_error(
"Unsupported geometry type: " +
4787 std::string(geometry->getGeometryName()));
4791 throw std::runtime_error(
"Unsupported geometry type: " +
4792 std::string(geometry->getGeometryName()));
4796 for (
int field_index = 0; field_index < feature->GetFieldCount(); field_index++) {
4797 auto const* column_name = feature_defn->GetFieldDefn(field_index)->GetNameRef();
4798 sample_data[column_name].push_back(feature->GetFieldAsString(field_index));
4802 for (
auto const& mci : metadata_column_infos) {
4803 sample_data[mci.column_descriptor.columnName].push_back(mci.value);
4807 char* wkts =
nullptr;
4808 geometry->exportToWkt(&wkts);
4810 sample_data[geo_column_name].push_back(wkts);
4823 return std::make_pair(
kINT,
false);
4824 case OFTIntegerList:
4825 return std::make_pair(
kINT,
true);
4826 #if GDAL_VERSION_MAJOR > 1
4828 return std::make_pair(
kBIGINT,
false);
4829 case OFTInteger64List:
4830 return std::make_pair(
kBIGINT,
true);
4833 return std::make_pair(
kDOUBLE,
false);
4835 return std::make_pair(
kDOUBLE,
true);
4837 return std::make_pair(
kTEXT,
false);
4839 return std::make_pair(
kTEXT,
true);
4841 return std::make_pair(
kDATE,
false);
4843 return std::make_pair(
kTIME,
false);
4851 return std::make_pair(
kTINYINT,
true);
4855 throw std::runtime_error(
"Unknown OGR field type: " +
std::to_string(ogr_type));
4866 case wkbMultiPolygon:
4871 throw std::runtime_error(
"Unknown OGR geom type: " +
std::to_string(ogr_type));
4876 switch (raster_point_type) {
4898 switch (raster_point_transform) {
4916 const std::string& file_name,
4917 const bool is_raster,
4918 const std::string& geo_column_name,
4928 const std::string& file_name,
4929 const std::string& geo_column_name,
4940 auto metadata_column_infos =
4953 metadata_column_infos);
4956 std::list<ColumnDescriptor> cds;
4962 for (
auto const& [col_name, sql_type] : point_names_and_sql_types) {
4967 if (sql_type ==
kPOINT) {
4981 for (
auto const& [band_name, sql_type] : band_names_and_types) {
4990 for (
auto& mci : metadata_column_infos) {
4991 cds.push_back(std::move(mci.column_descriptor));
5000 const std::string& file_name,
5001 const std::string& geo_column_name,
5003 std::list<ColumnDescriptor> cds;
5006 if (poDS ==
nullptr) {
5007 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5010 if (poDS->GetLayerCount() == 0) {
5011 throw std::runtime_error(
"gdalToColumnDescriptors Error: Geo file " + file_name +
5018 layer.ResetReading();
5021 if (poFeature ==
nullptr) {
5022 throw std::runtime_error(
"No features found in " + file_name);
5025 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5028 for (iField = 0; iField < poFDefn->GetFieldCount(); iField++) {
5029 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5030 auto typePair =
ogr_to_type(poFieldDefn->GetType());
5033 cd.sourceName = poFieldDefn->GetNameRef();
5035 if (typePair.second) {
5041 if (typePair.first ==
kTEXT) {
5050 OGRGeometry* poGeometry = poFeature->GetGeometryRef();
5057 auto ogr_type = wkbFlatten(poGeometry->getGeometryType());
5061 if (ogr_type == wkbMultiPolygon) {
5062 ogr_type = wkbPolygon;
5063 }
else if (ogr_type == wkbMultiLineString) {
5064 ogr_type = wkbLineString;
5065 }
else if (ogr_type == wkbMultiPoint) {
5066 ogr_type = wkbPoint;
5092 auto metadata_column_infos =
5094 for (
auto& mci : metadata_column_infos) {
5095 cds.push_back(std::move(mci.column_descriptor));
5112 #if (GDAL_VERSION_MAJOR > 2) || (GDAL_VERSION_MAJOR == 2 && GDAL_VERSION_MINOR >= 3)
5116 VSICurlClearCache();
5121 int result = VSIStatExL(path.c_str(), &sb, VSI_STAT_EXISTS_FLAG);
5127 if (also_dir && (VSI_ISREG(sb.st_mode) || VSI_ISDIR(sb.st_mode))) {
5129 }
else if (VSI_ISREG(sb.st_mode)) {
5147 std::vector<std::string>& files) {
5149 std::vector<std::string> subdirectories;
5152 char** entries = VSIReadDir(archive_path.c_str());
5154 LOG(
WARNING) <<
"Failed to get file listing at archive: " << archive_path;
5161 ScopeGuard entries_guard = [&] { CSLDestroy(entries); };
5167 char* entry_c = entries[index++];
5171 std::string entry(entry_c);
5174 if (entry ==
"." || entry ==
"..") {
5179 std::string entry_path = archive_path + std::string(
"/") + entry;
5183 int result = VSIStatExL(entry_path.c_str(), &sb, VSI_STAT_NATURE_FLAG);
5188 if (VSI_ISDIR(sb.st_mode)) {
5192 if (boost::iends_with(entry_path,
".gdb")) {
5194 files.push_back(entry_path);
5197 subdirectories.push_back(entry_path);
5201 files.push_back(entry_path);
5207 for (
const auto& subdirectory : subdirectories) {
5214 const std::string& archive_path,
5225 std::vector<std::string> files;
5231 for (
auto& file : files) {
5232 file.erase(0, archive_path.size() + 1);
5241 const std::string& file_name,
5252 std::vector<GeoFileLayerInfo> layer_info;
5256 if (poDS ==
nullptr) {
5257 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5262 for (
auto&& poLayer : poDS->GetLayers()) {
5265 poLayer->ResetReading();
5267 if (poLayer->GetFeatureCount() > 0) {
5270 CHECK(first_feature);
5272 const OGRGeometry* geometry = first_feature->GetGeometryRef();
5278 const OGRwkbGeometryType geometry_type = geometry->getGeometryType();
5279 switch (wkbFlatten(geometry_type)) {
5283 case wkbMultiPolygon:
5288 case wkbMultiLineString:
5302 layer_info.emplace_back(poLayer->GetName(), contents);
5312 const bool is_raster) {
5316 return importGDALGeo(columnNameToSourceNameMap, session_info);
5325 if (poDS ==
nullptr) {
5326 throw std::runtime_error(
"openGDALDataSource Error: Unable to open geo file " +
5334 size_t numFeatures = layer.GetFeatureCount();
5339 OGRFeatureDefn* poFDefn = layer.GetLayerDefn();
5341 size_t numFields = poFDefn->GetFieldCount();
5342 for (
size_t iField = 0; iField < numFields; iField++) {
5343 OGRFieldDefn* poFieldDefn = poFDefn->GetFieldDefn(iField);
5344 fieldNameToIndexMap.emplace(std::make_pair(poFieldDefn->GetNameRef(), iField));
5349 poGeographicSR->importFromEPSG(copy_params.geo_coords_srid);
5351 #if GDAL_VERSION_MAJOR >= 3
5355 poGeographicSR->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
5358 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5368 auto const metadata_column_infos =
5375 auto query_session = session_info ? session_info->
get_session_id() :
"";
5376 auto query_submitted_time =
::toString(std::chrono::system_clock::now());
5378 auto is_session_already_registered =
false;
5381 executor->getSessionLock());
5382 is_session_already_registered =
5383 executor->checkIsQuerySessionEnrolled(query_session, session_read_lock);
5386 !is_session_already_registered) {
5387 executor->enrollQuerySession(query_session,
5389 query_submitted_time,
5391 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5393 ScopeGuard clearInterruptStatus = [executor, &query_session, &query_submitted_time] {
5396 executor->clearQuerySessionStatus(query_session, query_submitted_time);
5404 for (
const auto cd :
loader->get_column_descs()) {
5414 auto* td =
loader->getTableDesc();
5416 auto column_descriptors =
5417 cat.getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5418 for (
auto const& cd : column_descriptors) {
5420 auto rga = std::make_shared<RenderGroupAnalyzer>();
5422 columnIdToRenderGroupAnalyzerMap[cd->
columnId] = rga;
5427 #if !DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5429 std::list<std::future<ImportStatus>> threads;
5433 std::stack<size_t> stack_thread_ids;
5435 stack_thread_ids.push(i);
5440 auto table_epochs =
loader->getTableEpochs();
5443 layer.ResetReading();
5445 static const size_t MAX_FEATURES_PER_CHUNK = 1000;
5448 std::vector<FeaturePtrVector> features(max_threads);
5451 std::vector<std::unique_ptr<OGRCoordinateTransformation>> coordinate_transformations(
5455 size_t firstFeatureThisChunk = 0;
5456 while (firstFeatureThisChunk < numFeatures) {
5458 size_t numFeaturesThisChunk =
5459 std::min(MAX_FEATURES_PER_CHUNK, numFeatures - firstFeatureThisChunk);
5462 #if DISABLE_MULTI_THREADED_SHAPEFILE_IMPORT
5465 auto thread_id = stack_thread_ids.top();
5466 stack_thread_ids.pop();
5467 CHECK(thread_id < max_threads);
5471 for (
size_t i = 0; i < numFeaturesThisChunk; i++) {
5472 features[
thread_id].emplace_back(layer.GetNextFeature());
5477 if (coordinate_transformations[thread_id] ==
nullptr) {
5478 for (
auto const& feature : features[thread_id]) {
5479 auto const* geometry = feature->GetGeometryRef();
5481 auto const* geometry_sr = geometry->getSpatialReference();
5485 #
if GDAL_VERSION_MAJOR >= 3
5486 !geometry_sr->IsEmpty() &&
5488 !geometry_sr->IsSame(poGeographicSR.get())) {